from copy import deepcopy from math import ceil from twisted.internet.threads import deferToThread import main import modules.provision from utils.logging.debug import debug from utils.logging.log import error, log, warn def getActiveRelays(net): activeRelays = [x for x in main.network[net].relays.keys() if main.network[net].relays[x]["enabled"]] return activeRelays def allRelaysActive(net): """ Check if all enabled relays are active and authenticated. """ activeRelays = getActiveRelays(net) debug(f"allRelaysActive() active relays for {net}: {activeRelays}") relayNum = len(activeRelays) existNum = 0 for i in activeRelays: name = net + str(i) if name in main.IRCPool.keys(): if main.IRCPool[name].authenticated: existNum += 1 debug(f"allRelaysActive() finished, {existNum}/{relayNum} relays active for {net}") if existNum == relayNum: return True return False def getChanFree(net, new): """ Get a dictionary with the free channel spaces for each relay, and a channel limit. Example return: ({1: 99}, 100) :param net: network :param new: list of newly provisioned relays to skip :return: ({relay: channel spaces}, channel limit) """ chanfree = {} chanlimits = set() for i in getActiveRelays(net): if i in new: continue name = net + str(i) if name not in main.IRCPool.keys(): continue if not main.IRCPool[name].isconnected: continue chanfree[i] = main.IRCPool[name].chanlimit - len(main.IRCPool[name].channels) chanlimits.add(main.IRCPool[name].chanlimit) if not len(chanlimits) == 1: error("Network %s has servers with different CHANLIMIT values" % net) return False return (chanfree, chanlimits.pop()) def emptyChanAllocate(net, flist, relay, new): chanfree = getChanFree(net, new) if not chanfree: return for i in new: chanfree[0][i] = chanfree[1] allocated = {} toalloc = len(flist) # toalloc:2148 free:{1: 250} chanlimit:250 correction:2147 if toalloc > sum(chanfree[0].values()): sum_free = sum(chanfree[0].values()) # 250 chans_not_covered = toalloc - sum_free # 2148 - 250 = 1898 relays_needed = chans_not_covered / chanfree[1] # 1898 / 250 = 7.592 relays_needed_rounded = ceil(relays_needed) debug(f"emptyChanAllocate() secondary allocation sum_free:{sum_free} chans_not_covered:{chans_not_covered} relays_needed:{relays_needed} relays_needed_rounded:{relays_needed_rounded}") #correction = round(toalloc - sum(chanfree[0].values()) / chanfree[1]) correction = relays_needed_rounded debug(f"emptyChanAllocate() not enough free channels: toalloc:{toalloc} free:{chanfree[0]} chanlimit:{chanfree[1]} correction:{correction}") warn("Ran out of channel spaces, provisioning additional %i relays for %s" % (correction, net)) modules.provision.provisionMultipleRelays(net, correction) return False for i in chanfree[0].keys(): for x in range(chanfree[0][i]): if not len(flist): break if i in allocated.keys(): allocated[i].append(flist.pop()) else: allocated[i] = [flist.pop()] return allocated def populateChans(net, clist, relay, new): # divided = array_split(clist, relay) allocated = emptyChanAllocate(net, clist, relay, new) if not allocated: return for i in allocated.keys(): if net in main.TempChan.keys(): main.TempChan[net][i] = allocated[i] else: main.TempChan[net] = {i: allocated[i]} def notifyJoin(net): for i in getActiveRelays(net): name = net + str(i) if name in main.IRCPool.keys(): main.IRCPool[name].checkChannels() def minifyChans(net, listinfo): if not allRelaysActive(net): error("All relays for %s are not active, cannot minify list" % net) return False for i in getActiveRelays(net): name = net + str(i) for x in main.IRCPool[name].channels: for y in listinfo: if y[0] == x: listinfo.remove(y) if not listinfo: log("We're on all the channels we want to be on, dropping LIST") return False return listinfo def keepChannels(net, listinfo, mean, sigrelay, relay): listinfo = minifyChans(net, listinfo) if not listinfo: return if relay <= main.config["ChanKeep"]["SigSwitch"]: # we can cover all of the channels coverAll = True elif relay > main.config["ChanKeep"]["SigSwitch"]: # we cannot cover all of the channels coverAll = False if not sigrelay <= main.config["ChanKeep"]["MaxRelay"]: error("Network %s is too big to cover: %i relays required" % (net, sigrelay)) return if coverAll: needed = relay - len(getActiveRelays(net)) debug(f"keepChannels() coverAll asking to provision {needed} relays for {net} relay:{relay}") newNums = modules.provision.provisionMultipleRelays(net, needed) flist = [i[0] for i in listinfo] populateChans(net, flist, relay, newNums) else: needed = sigrelay - len(getActiveRelays(net)) debug(f"keepChannels() NOT coverAll asking to provision {needed} relays for {net} sigrelay:{sigrelay}") newNums = modules.provision.provisionMultipleRelays(net, needed) siglist = [i[0] for i in listinfo if int(i[1]) > mean] populateChans(net, siglist, sigrelay, newNums) notifyJoin(net) def joinSingle(net, channel): eca = emptyChanAllocate(net, [channel], None, []) if not eca: return False if not len(eca.keys()) == 1: return False num = list(eca.keys())[0] name = f"{net}{num}" if name not in main.IRCPool: return False main.IRCPool[name].join(channel) return num def partSingle(net, channel): """ Iterate over all the relays of net and part channels matching channel. :param net: :param channel: :return: """ parted = [] for i in getActiveRelays(net): name = f"{net}{i}" if name in main.IRCPool.keys(): if channel in main.IRCPool[name].channels: main.IRCPool[name].part(channel) parted.append(str(i)) return parted def nukeNetwork(net): # purgeRecords(net) # p = main.g.pipeline() main.g.delete("analytics.list." + net) # p.delete("list."+net) # p.execute() # def nukeNetwork(net): # deferToThread(_nukeNetwork, net) def _initialList(net, num, listinfo, chanlimit): listlength = len(listinfo) cumul = 0 try: cumul += sum(int(i[1]) for i in listinfo) except TypeError: warn("Bad LIST data received from %s - %i" % (net, num)) return mean = round(cumul / listlength, 2) siglength = 0 insiglength = 0 sigcumul = 0 insigcumul = 0 for i in listinfo: if int(i[1]) > mean: siglength += 1 sigcumul += int(i[1]) elif int(i[1]) < mean: insiglength += 1 insigcumul += int(i[1]) sigrelay = ceil(siglength / chanlimit) relay = ceil(listlength / chanlimit) # netbase = "list.%s" % net abase = "analytics.list.%s" % net p = main.g.pipeline() p.hset(abase, "mean", mean) p.hset(abase, "total", listlength) p.hset(abase, "sigtotal", siglength) p.hset(abase, "insigtotal", insiglength) p.hset(abase, "sigperc", round(siglength / listlength * 100, 2)) p.hset(abase, "insigperc", round(insiglength / listlength * 100, 2)) p.hset(abase, "cumul", cumul) p.hset(abase, "sigcumul", sigcumul) p.hset(abase, "insigcumul", insigcumul) p.hset(abase, "relay", relay) p.hset(abase, "sigrelay", sigrelay) p.hset(abase, "insigrelay", ceil(insiglength / chanlimit)) debug( ( f"_initialList() net:{net} num:{num} listlength:{listlength} " f"mean:{mean} siglength:{siglength} insiglength:{insiglength} " f"sigrelay:{sigrelay} relay:{relay} chanlimit:{chanlimit}" ) ) # Purge existing records before writing # purgeRecords(net) # for i in listinfo: # p.rpush(netbase+"."+i[0], i[1]) # p.rpush(netbase+"."+i[0], i[2]) # p.sadd(netbase, i[0]) p.execute() debug("List parsing completed on %s" % net) keepChannels(net, listinfo, mean, sigrelay, relay) def initialList(net, num, listinfo, chanlimit): deferToThread(_initialList, net, num, deepcopy(listinfo), chanlimit)