monolith/modules/chankeep.py

312 lines
10 KiB
Python
Raw Normal View History

2019-10-08 20:10:42 +00:00
from copy import deepcopy
from math import ceil
2022-07-21 12:40:09 +00:00
from twisted.internet.threads import deferToThread
2022-07-21 12:39:59 +00:00
2022-07-21 12:40:09 +00:00
import main
import modules.provision
2022-08-11 20:44:19 +00:00
from utils.logging.debug import debug, trace
2022-07-21 12:40:09 +00:00
from utils.logging.log import error, log, warn
2022-07-21 12:39:41 +00:00
2022-08-11 18:22:09 +00:00
def getActiveRelays(net):
activeRelays = [x for x in main.network[net].relays.keys() if main.network[net].relays[x]["enabled"]]
return activeRelays
def allRelaysActive(net):
2022-08-11 18:22:09 +00:00
"""
Check if all enabled relays are active and authenticated.
"""
activeRelays = getActiveRelays(net)
debug(f"allRelaysActive() active relays for {net}: {activeRelays}")
relayNum = len(activeRelays)
existNum = 0
2022-08-11 18:22:09 +00:00
for i in activeRelays:
2022-07-21 12:39:41 +00:00
name = net + str(i)
if name in main.IRCPool.keys():
if main.IRCPool[name].authenticated:
existNum += 1
debug(f"allRelaysActive() finished, {existNum}/{relayNum} relays active for {net}")
if existNum == relayNum:
return True
return False
2022-07-21 12:39:41 +00:00
def getChanFree(net, new):
"""
Get a dictionary with the free channel spaces for
each relay, and a channel limit.
Example return:
({1: 99}, 100)
:param net: network
:param new: list of newly provisioned relays to skip
:return: ({relay: channel spaces}, channel limit)
"""
chanfree = {}
chanlimits = set()
2022-08-11 18:22:09 +00:00
for i in getActiveRelays(net):
if i in new:
continue
2022-07-21 12:39:41 +00:00
name = net + str(i)
if name not in main.IRCPool.keys():
continue
if not main.IRCPool[name].isconnected:
continue
2022-07-21 12:39:41 +00:00
chanfree[i] = main.IRCPool[name].chanlimit - len(main.IRCPool[name].channels)
chanlimits.add(main.IRCPool[name].chanlimit)
if not len(chanlimits) == 1:
error("Network %s has servers with different CHANLIMIT values" % net)
return False
return (chanfree, chanlimits.pop())
2022-08-12 22:32:00 +00:00
def getTotalChans(net):
total = 0
for i in getActiveRelays(net):
name = net + str(i)
if name in main.IRCPool.keys():
total += len(main.IRCPool[name].channels)
return total
2022-07-21 12:39:41 +00:00
def emptyChanAllocate(net, flist, new):
chanfree = getChanFree(net, new)
if not chanfree:
return
for i in new:
chanfree[0][i] = chanfree[1]
allocated = {}
toalloc = len(flist)
2022-08-11 19:51:41 +00:00
# Used to correct allocations and provision additional relays
# if the math since the last LIST is a bit wrong
# toalloc:2148 free:{1: 250} chanlimit:250 correction:2147
if toalloc > sum(chanfree[0].values()):
2022-08-11 19:51:41 +00:00
sum_free = sum(chanfree[0].values()) # 250
chans_not_covered = toalloc - sum_free # 2148 - 250 = 1898
relays_needed = chans_not_covered / chanfree[1] # 1898 / 250 = 7.592
correction = ceil(relays_needed)
2022-08-11 20:44:19 +00:00
if main.config["ChanKeep"]["Provision"]:
debug(
(
f"emptyChanAllocate() secondary allocation sum_free:{sum_free} "
f"chans_not_covered:{chans_not_covered} relays_needed:{relays_needed} "
f"correction:{correction}"
)
2022-08-11 19:51:41 +00:00
)
2022-08-11 20:44:19 +00:00
debug(
(
f"emptyChanAllocate() not enough free channels: toalloc:{toalloc} "
f"free:{chanfree[0]} chanlimit:{chanfree[1]} correction:{correction}"
)
2022-08-11 19:51:41 +00:00
)
2022-08-11 20:44:19 +00:00
warn("Ran out of channel spaces, provisioning additional %i relays for %s" % (correction, net))
modules.provision.provisionMultipleRelays(net, correction)
return False
else:
# We don't have enough spaces and we can't add any.
# Let's do the best we can in the circumstances.
debug(f"emptyChanAllocate() cannot create additional relays for {net}")
debug(f"emptyChanAllocate() {chans_not_covered} channels cannot be covered")
flist = flist[:sum_free]
debug(f"emptyChanAllocate() flist truncated to {sum_free}, length nis now {len(flist)}")
trace(f"emptyChanAllocate() best effort allocation: {flist}")
newlist = list(flist)
for i in chanfree[0].keys():
for x in range(chanfree[0][i]):
if not len(newlist):
break
if i in allocated.keys():
allocated[i].append(newlist.pop())
else:
allocated[i] = [newlist.pop()]
return allocated
2022-07-21 12:39:41 +00:00
def populateChans(net, clist, new):
2022-07-21 12:39:41 +00:00
# divided = array_split(clist, relay)
allocated = emptyChanAllocate(net, clist, new)
if not allocated:
return
for i in allocated.keys():
if net in main.TempChan.keys():
main.TempChan[net][i] = allocated[i]
else:
main.TempChan[net] = {i: allocated[i]}
2022-07-21 12:39:41 +00:00
def notifyJoin(net):
2022-08-11 18:22:09 +00:00
for i in getActiveRelays(net):
2022-07-21 12:39:41 +00:00
name = net + str(i)
if name in main.IRCPool.keys():
main.IRCPool[name].checkChannels()
2022-07-21 12:39:41 +00:00
def minifyChans(net, listinfo):
if not allRelaysActive(net):
error("All relays for %s are not active, cannot minify list" % net)
return False
2022-08-11 18:22:09 +00:00
for i in getActiveRelays(net):
2022-07-21 12:39:41 +00:00
name = net + str(i)
for x in main.IRCPool[name].channels:
for y in listinfo:
if y[0] == x:
listinfo.remove(y)
if not listinfo:
log("We're on all the channels we want to be on, dropping LIST")
return False
return listinfo
2022-07-21 12:39:41 +00:00
def keepChannels(net, listinfo, mean, sigrelay, relay, chanlimit):
listinfo = minifyChans(net, listinfo)
if not listinfo:
return
if relay <= main.config["ChanKeep"]["SigSwitch"]: # we can cover all of the channels
coverAll = True
elif relay > main.config["ChanKeep"]["SigSwitch"]: # we cannot cover all of the channels
coverAll = False
if not sigrelay <= main.config["ChanKeep"]["MaxRelay"]:
error("Network %s is too big to cover: %i relays required" % (net, sigrelay))
return
num_instances = len(getActiveRelays(net))
debug(f"keepChannels() {net} instances:{num_instances} chanlimit:{chanlimit}")
chan_slots_used = getTotalChans(net)
debug(f"keepChannels() slots_used:{chan_slots_used}")
max_chans = (chanlimit * num_instances) - chan_slots_used
debug(f"keepChannels() max_chans:{max_chans}")
if coverAll:
2022-08-11 18:22:09 +00:00
needed = relay - len(getActiveRelays(net))
debug(f"keepChannels() coverAll asking to provision {needed} relays for {net} relay:{relay}")
newNums = modules.provision.provisionMultipleRelays(net, needed)
flist = [i[0] for i in listinfo]
chosen = sorted(flist, reverse=True, key=lambda x: x[1])[:max_chans]
populateChans(net, chosen, newNums)
else:
2022-08-11 18:22:09 +00:00
needed = sigrelay - len(getActiveRelays(net))
debug(f"keepChannels() NOT coverAll asking to provision {needed} relays for {net} sigrelay:{sigrelay}")
newNums = modules.provision.provisionMultipleRelays(net, needed)
siglist = [i[0] for i in listinfo if int(i[1]) > mean]
chosen = sorted(siglist, reverse=True, key=lambda x: x[1])[:max_chans]
populateChans(net, chosen, newNums)
notifyJoin(net)
2022-07-21 12:39:41 +00:00
2020-11-02 20:14:02 +00:00
def joinSingle(net, channel):
eca = emptyChanAllocate(net, [channel], [])
if not eca:
return False
if not len(eca.keys()) == 1:
return False
num = list(eca.keys())[0]
name = f"{net}{num}"
if name not in main.IRCPool:
2020-11-02 20:14:02 +00:00
return False
main.IRCPool[name].join(channel)
return num
2020-11-02 20:14:02 +00:00
2022-07-21 12:39:41 +00:00
def partSingle(net, channel):
"""
Iterate over all the relays of net and part channels matching channel.
:param net:
:param channel:
:return:
"""
parted = []
2022-08-11 18:22:09 +00:00
for i in getActiveRelays(net):
name = f"{net}{i}"
if name in main.IRCPool.keys():
if channel in main.IRCPool[name].channels:
main.IRCPool[name].part(channel)
parted.append(str(i))
return parted
2019-10-08 20:10:42 +00:00
def nukeNetwork(net):
2022-07-21 12:39:41 +00:00
# purgeRecords(net)
# p = main.g.pipeline()
main.g.delete("analytics.list." + net)
# p.delete("list."+net)
# p.execute()
2022-07-21 12:39:41 +00:00
# def nukeNetwork(net):
# deferToThread(_nukeNetwork, net)
2019-10-08 20:10:42 +00:00
def _initialList(net, num, listinfo, chanlimit):
listlength = len(listinfo)
cumul = 0
try:
cumul += sum(int(i[1]) for i in listinfo)
except TypeError:
warn("Bad LIST data received from %s - %i" % (net, num))
return
2022-07-21 12:39:41 +00:00
mean = round(cumul / listlength, 2)
2019-10-08 20:10:42 +00:00
siglength = 0
insiglength = 0
sigcumul = 0
insigcumul = 0
for i in listinfo:
if int(i[1]) > mean:
2019-10-08 20:10:42 +00:00
siglength += 1
sigcumul += int(i[1])
elif int(i[1]) < mean:
2019-10-08 20:10:42 +00:00
insiglength += 1
insigcumul += int(i[1])
2022-07-21 12:39:41 +00:00
sigrelay = ceil(siglength / chanlimit)
relay = ceil(listlength / chanlimit)
2022-07-21 12:40:05 +00:00
# netbase = "list.%s" % net
2019-10-08 20:10:42 +00:00
abase = "analytics.list.%s" % net
p = main.g.pipeline()
p.hset(abase, "mean", mean)
p.hset(abase, "total", listlength)
p.hset(abase, "sigtotal", siglength)
p.hset(abase, "insigtotal", insiglength)
2022-07-21 12:39:41 +00:00
p.hset(abase, "sigperc", round(siglength / listlength * 100, 2))
p.hset(abase, "insigperc", round(insiglength / listlength * 100, 2))
2019-10-08 20:10:42 +00:00
p.hset(abase, "cumul", cumul)
p.hset(abase, "sigcumul", sigcumul)
p.hset(abase, "insigcumul", insigcumul)
p.hset(abase, "relay", relay)
2019-10-08 20:10:42 +00:00
p.hset(abase, "sigrelay", sigrelay)
2022-07-21 12:39:41 +00:00
p.hset(abase, "insigrelay", ceil(insiglength / chanlimit))
2022-08-11 19:32:49 +00:00
debug(
(
f"_initialList() net:{net} num:{num} listlength:{listlength} "
f"mean:{mean} siglength:{siglength} insiglength:{insiglength} "
f"sigrelay:{sigrelay} relay:{relay} chanlimit:{chanlimit}"
)
)
2019-10-08 20:10:42 +00:00
# Purge existing records before writing
2022-07-21 12:39:41 +00:00
# purgeRecords(net)
# for i in listinfo:
# p.rpush(netbase+"."+i[0], i[1])
# p.rpush(netbase+"."+i[0], i[2])
# p.sadd(netbase, i[0])
2019-10-08 20:10:42 +00:00
p.execute()
debug("List parsing completed on %s" % net)
keepChannels(net, listinfo, mean, sigrelay, relay, chanlimit)
# return (listinfo, mean, sigrelay, relay)
2019-10-08 20:10:42 +00:00
2022-07-21 12:39:41 +00:00
2019-10-08 20:10:42 +00:00
def initialList(net, num, listinfo, chanlimit):
deferToThread(_initialList, net, num, deepcopy(listinfo), chanlimit)
def chankeep_handler(net, num, listinfo, chanlimit):
"""
Handle a channel keep request.
:param net:
:param num:
:param listinfo:
:param chanlimit:
:return:
"""
2022-08-12 22:32:00 +00:00
listinfo, mean, sigrelay, relay = _initialList(net, num, listinfo, chanlimit)