monolith/modules/chankeep.py

310 lines
10 KiB
Python

from copy import deepcopy
from math import ceil
from twisted.internet.threads import deferToThread
import main
import modules.provision
from utils.logging.debug import debug, trace
from utils.logging.log import error, log, warn
def getActiveRelays(net):
activeRelays = [x for x in main.network[net].relays.keys() if main.network[net].relays[x]["enabled"]]
return activeRelays
def allRelaysActive(net):
"""
Check if all enabled relays are active and authenticated.
"""
activeRelays = getActiveRelays(net)
debug(f"allRelaysActive() active relays for {net}: {activeRelays}")
relayNum = len(activeRelays)
existNum = 0
for i in activeRelays:
name = net + str(i)
if name in main.IRCPool.keys():
if main.IRCPool[name].authenticated:
existNum += 1
debug(f"allRelaysActive() finished, {existNum}/{relayNum} relays active for {net}")
if existNum == relayNum:
return True
return False
def getChanFree(net, new):
"""
Get a dictionary with the free channel spaces for
each relay, and a channel limit.
Example return:
({1: 99}, 100)
:param net: network
:param new: list of newly provisioned relays to skip
:return: ({relay: channel spaces}, channel limit)
"""
chanfree = {}
chanlimits = set()
for i in getActiveRelays(net):
if i in new:
continue
name = net + str(i)
if name not in main.IRCPool.keys():
continue
if not main.IRCPool[name].isconnected:
continue
chanfree[i] = main.IRCPool[name].chanlimit - len(main.IRCPool[name].channels)
chanlimits.add(main.IRCPool[name].chanlimit)
if not len(chanlimits) == 1:
error("Network %s has servers with different CHANLIMIT values" % net)
return False
return (chanfree, chanlimits.pop())
def getTotalChans(net):
total = 0
for i in getActiveRelays(net):
name = net + str(i)
if name in main.IRCPool.keys():
total += len(main.IRCPool[name].channels)
return total
def emptyChanAllocate(net, flist, new):
chanfree = getChanFree(net, new)
if not chanfree:
return
for i in new:
chanfree[0][i] = chanfree[1]
allocated = {}
toalloc = len(flist)
# Used to correct allocations and provision additional relays
# if the math since the last LIST is a bit wrong
# toalloc:2148 free:{1: 250} chanlimit:250 correction:2147
if toalloc > sum(chanfree[0].values()):
sum_free = sum(chanfree[0].values()) # 250
chans_not_covered = toalloc - sum_free # 2148 - 250 = 1898
relays_needed = chans_not_covered / chanfree[1] # 1898 / 250 = 7.592
correction = ceil(relays_needed)
if main.config["ChanKeep"]["Provision"]:
debug(
(
f"emptyChanAllocate() secondary allocation sum_free:{sum_free} "
f"chans_not_covered:{chans_not_covered} relays_needed:{relays_needed} "
f"correction:{correction}"
)
)
debug(
(
f"emptyChanAllocate() not enough free channels: toalloc:{toalloc} "
f"free:{chanfree[0]} chanlimit:{chanfree[1]} correction:{correction}"
)
)
warn("Ran out of channel spaces, provisioning additional %i relays for %s" % (correction, net))
modules.provision.provisionMultipleRelays(net, correction)
return False
else:
# We don't have enough spaces and we can't add any.
# Let's do the best we can in the circumstances.
debug(f"emptyChanAllocate() cannot create additional relays for {net}")
debug(f"emptyChanAllocate() {chans_not_covered} channels cannot be covered")
flist = flist[:sum_free]
debug(f"emptyChanAllocate() flist truncated to {sum_free}, length nis now {len(flist)}")
trace(f"emptyChanAllocate() best effort allocation: {flist}")
newlist = list(flist)
for i in chanfree[0].keys():
for x in range(chanfree[0][i]):
if not len(newlist):
break
if i in allocated.keys():
allocated[i].append(newlist.pop())
else:
allocated[i] = [newlist.pop()]
return allocated
def populateChans(net, clist, new):
# divided = array_split(clist, relay)
allocated = emptyChanAllocate(net, clist, new)
if not allocated:
return
for i in allocated.keys():
if net in main.TempChan.keys():
main.TempChan[net][i] = allocated[i]
else:
main.TempChan[net] = {i: allocated[i]}
def notifyJoin(net):
for i in getActiveRelays(net):
name = net + str(i)
if name in main.IRCPool.keys():
main.IRCPool[name].checkChannels()
def minifyChans(net, listinfo):
if not allRelaysActive(net):
error("All relays for %s are not active, cannot minify list" % net)
return False
for i in getActiveRelays(net):
name = net + str(i)
for x in main.IRCPool[name].channels:
for y in listinfo:
if y[0] == x:
listinfo.remove(y)
if not listinfo:
log("We're on all the channels we want to be on, dropping LIST")
return False
return listinfo
def keepChannels(net, listinfo, mean, sigrelay, relay, chanlimit):
listinfo = minifyChans(net, listinfo)
if not listinfo:
return
if relay <= main.config["ChanKeep"]["SigSwitch"]: # we can cover all of the channels
coverAll = True
elif relay > main.config["ChanKeep"]["SigSwitch"]: # we cannot cover all of the channels
coverAll = False
if not sigrelay <= main.config["ChanKeep"]["MaxRelay"]:
error("Network %s is too big to cover: %i relays required" % (net, sigrelay))
return
num_instances = len(getActiveRelays(net))
debug(f"keepChannels() {net} instances:{num_instances} chanlimit:{chanlimit}")
chan_slots_used = getTotalChans(net)
debug(f"keepChannels() slots_used:{chan_slots_used}")
max_chans = (chanlimit * num_instances) - chan_slots_used
debug(f"keepChannels() max_chans:{max_chans}")
if coverAll:
needed = relay - len(getActiveRelays(net))
debug(f"keepChannels() coverAll asking to provision {needed} relays for {net} relay:{relay}")
newNums = modules.provision.provisionMultipleRelays(net, needed)
flist = [i[0] for i in listinfo]
chosen = sorted(flist, reverse=True, key=lambda x: x[1])[:max_chans]
populateChans(net, chosen, newNums)
else:
needed = sigrelay - len(getActiveRelays(net))
debug(f"keepChannels() NOT coverAll asking to provision {needed} relays for {net} sigrelay:{sigrelay}")
newNums = modules.provision.provisionMultipleRelays(net, needed)
siglist = [i[0] for i in listinfo if int(i[1]) > mean]
chosen = sorted(siglist, reverse=True, key=lambda x: x[1])[:max_chans]
populateChans(net, chosen, newNums)
notifyJoin(net)
def joinSingle(net, channel):
eca = emptyChanAllocate(net, [channel], [])
if not eca:
return False
if not len(eca.keys()) == 1:
return False
num = list(eca.keys())[0]
name = f"{net}{num}"
if name not in main.IRCPool:
return False
main.IRCPool[name].join(channel)
return num
def partSingle(net, channel):
"""
Iterate over all the relays of net and part channels matching channel.
:param net:
:param channel:
:return:
"""
parted = []
for i in getActiveRelays(net):
name = f"{net}{i}"
if name in main.IRCPool.keys():
if channel in main.IRCPool[name].channels:
main.IRCPool[name].part(channel)
parted.append(str(i))
return parted
def nukeNetwork(net):
# purgeRecords(net)
# p = main.g.pipeline()
main.g.delete("analytics.list." + net)
# p.delete("list."+net)
# p.execute()
# def nukeNetwork(net):
# deferToThread(_nukeNetwork, net)
def _initialList(net, num, listinfo, chanlimit):
listlength = len(listinfo)
cumul = 0
try:
cumul += sum(int(i[1]) for i in listinfo)
except TypeError:
warn("Bad LIST data received from %s - %i" % (net, num))
return
mean = round(cumul / listlength, 2)
siglength = 0
insiglength = 0
sigcumul = 0
insigcumul = 0
for i in listinfo:
if int(i[1]) > mean:
siglength += 1
sigcumul += int(i[1])
elif int(i[1]) < mean:
insiglength += 1
insigcumul += int(i[1])
sigrelay = ceil(siglength / chanlimit)
relay = ceil(listlength / chanlimit)
# netbase = "list.%s" % net
abase = "analytics.list.%s" % net
p = main.g.pipeline()
p.hset(abase, "mean", mean)
p.hset(abase, "total", listlength)
p.hset(abase, "sigtotal", siglength)
p.hset(abase, "insigtotal", insiglength)
p.hset(abase, "sigperc", round(siglength / listlength * 100, 2))
p.hset(abase, "insigperc", round(insiglength / listlength * 100, 2))
p.hset(abase, "cumul", cumul)
p.hset(abase, "sigcumul", sigcumul)
p.hset(abase, "insigcumul", insigcumul)
p.hset(abase, "relay", relay)
p.hset(abase, "sigrelay", sigrelay)
p.hset(abase, "insigrelay", ceil(insiglength / chanlimit))
debug(
(
f"_initialList() net:{net} num:{num} listlength:{listlength} "
f"mean:{mean} siglength:{siglength} insiglength:{insiglength} "
f"sigrelay:{sigrelay} relay:{relay} chanlimit:{chanlimit}"
)
)
# Purge existing records before writing
# purgeRecords(net)
# for i in listinfo:
# p.rpush(netbase+"."+i[0], i[1])
# p.rpush(netbase+"."+i[0], i[2])
# p.sadd(netbase, i[0])
p.execute()
debug("List parsing completed on %s" % net)
keepChannels(net, listinfo, mean, sigrelay, relay, chanlimit)
# return (listinfo, mean, sigrelay, relay)
def initialList(net, num, listinfo, chanlimit):
deferToThread(_initialList, net, num, deepcopy(listinfo), chanlimit)
def chankeep_handler(net, num, listinfo, chanlimit):
"""
Handle a channel keep request.
:param net:
:param num:
:param listinfo:
:param chanlimit:
:return:
"""
listinfo, mean, sigrelay, relay = _initialList(net, num, listinfo, chanlimit)