from copy import deepcopy from math import ceil from twisted.internet.threads import deferToThread import main import modules.provision from utils.logging.debug import debug, trace from utils.logging.log import error, log, warn def getActiveRelays(net): """ Get a list of active relays for a network. :param net: network :rtype: list of int :return: list of active relay numbers""" activeRelays = [x for x in main.network[net].relays.keys() if main.network[net].relays[x]["enabled"]] return activeRelays def allRelaysActive(net): """ Check if all enabled relays are active and authenticated. :param net: network :rtype: bool :return: True if all relays are active and authenticated, False otherwise """ activeRelays = getActiveRelays(net) debug(f"allRelaysActive() active relays for {net}: {activeRelays}") relayNum = len(activeRelays) existNum = 0 for i in activeRelays: name = net + str(i) if name in main.IRCPool.keys(): if main.IRCPool[name].authenticated: existNum += 1 debug(f"allRelaysActive() finished, {existNum}/{relayNum} relays active for {net}") if existNum == relayNum: return True return False def getChanFree(net, new): """ Get a dictionary with the free channel spaces for each relay, and a channel limit. Example return: ({1: 99}, 100) :param net: network :param new: list of newly provisioned relays to skip :return: ({relay: channel spaces}, channel limit) """ chanfree = {} chanlimits = set() for i in getActiveRelays(net): if i in new: continue name = net + str(i) if name not in main.IRCPool.keys(): continue if not main.IRCPool[name].isconnected: continue chanfree[i] = main.IRCPool[name].chanlimit - len(main.IRCPool[name].channels) chanlimits.add(main.IRCPool[name].chanlimit) if not len(chanlimits) == 1: error("Network %s has servers with different CHANLIMIT values" % net) return False return (chanfree, chanlimits.pop()) def getTotalChans(net): """ Get the total number of channels on all relays for a network. :param net: network :rtype: int :return: total number of channels """ total = 0 for i in getActiveRelays(net): name = net + str(i) if name in main.IRCPool.keys(): total += len(main.IRCPool[name].channels) return total def emptyChanAllocate(net, flist, new): """ Allocate channels to relays. :param net: network :param flist: list of channels to allocate :param new: list of newly provisioned relays to account for :rtype: dict :return: dictionary of {relay: list of channels}""" # Get the free channel spaces for each relay chanfree = getChanFree(net, new) if not chanfree: return # Pretend the newly provisioned relays are already on the network for i in new: chanfree[0][i] = chanfree[1] allocated = {} # Copy the list since we're going to mutate it toalloc = len(flist) # Used to correct allocations and provision additional relays # if the math since the last LIST is a bit wrong # toalloc:2148 free:{1: 250} chanlimit:250 correction:2147 newlist = list(flist) if toalloc > sum(chanfree[0].values()): sum_free = sum(chanfree[0].values()) # 250 chans_not_covered = toalloc - sum_free # 2148 - 250 = 1898 relays_needed = chans_not_covered / chanfree[1] # 1898 / 250 = 7.592 correction = ceil(relays_needed) if main.config["ChanKeep"]["Provision"]: debug( ( f"emptyChanAllocate() secondary allocation sum_free:{sum_free} " f"chans_not_covered:{chans_not_covered} relays_needed:{relays_needed} " f"correction:{correction}" ) ) debug( ( f"emptyChanAllocate() not enough free channels: toalloc:{toalloc} " f"free:{chanfree[0]} chanlimit:{chanfree[1]} correction:{correction}" ) ) warn("Ran out of channel spaces, provisioning additional %i relays for %s" % (correction, net)) modules.provision.provisionMultipleRelays(net, correction) return False else: # We don't have enough spaces and we can't add any. # Let's do the best we can in the circumstances. debug(f"emptyChanAllocate() cannot create additional relays for {net}") debug(f"emptyChanAllocate() {chans_not_covered} channels cannot be covered") newlist = newlist[:sum_free] debug(f"emptyChanAllocate() flist truncated to {sum_free}, length nis now {len(flist)}") trace(f"emptyChanAllocate() best effort allocation: {flist}") for i in chanfree[0].keys(): for x in range(chanfree[0][i]): if not len(newlist): break if i in allocated.keys(): allocated[i].append(newlist.pop()) else: allocated[i] = [newlist.pop()] return allocated def populateChans(net, clist, new): """ Populate channels on relays. Stores channels to join in a list in main.TempChan[net][num] :param net: network :param clist: list of channels to join :param new: list of newly provisioned relays to account for""" # divided = array_split(clist, relay) allocated = emptyChanAllocate(net, clist, new) if not allocated: return for i in allocated.keys(): if net in main.TempChan.keys(): main.TempChan[net][i] = allocated[i] else: main.TempChan[net] = {i: allocated[i]} def notifyJoin(net): """ Notify relays to join channels. They will pull from main.TempChan and remove channels they join. :param net: network """ for i in getActiveRelays(net): name = net + str(i) if name in main.IRCPool.keys(): main.IRCPool[name].checkChannels() def minifyChans(net, listinfo): """ Remove channels from listinfo that are already covered by a relay. :param net: network :param listinfo: list of channels to check :type listinfo: list of [channel, num_users] :return: list of channels with joined channels removed :rtype: list of [channel, num_users] """ if not allRelaysActive(net): error("All relays for %s are not active, cannot minify list" % net) return False for i in getActiveRelays(net): name = net + str(i) for x in main.IRCPool[name].channels: for y in listinfo: if y[0] == x: listinfo.remove(y) if not listinfo: log("We're on all the channels we want to be on, dropping LIST") return False return listinfo def keepChannels(net, listinfo, mean, sigrelay, relay, chanlimit): """ Minify channels, determine whether we can cover all the channels on the network, or need to use 'significant' mode. Truncate the channel list to available channel spaces. Allocate these channels to relays. Notify relays that they should pull from TempChan to join. :param net: network :param listinfo: list of [channel, num_users] lists :param mean: mean of channel population :param sigrelay: number of relays needed to cover significant channels :param relay: number of relays needed to cover all channels :param chanlimit: maximum number of channels to allocate to a relay """ listinfo = minifyChans(net, listinfo) if not listinfo: return if relay <= main.config["ChanKeep"]["SigSwitch"]: # we can cover all of the channels coverAll = True elif relay > main.config["ChanKeep"]["SigSwitch"]: # we cannot cover all of the channels coverAll = False if not sigrelay <= main.config["ChanKeep"]["MaxRelay"]: error("Network %s is too big to cover: %i relays required" % (net, sigrelay)) return num_instances = len(getActiveRelays(net)) debug(f"keepChannels() {net} instances:{num_instances} chanlimit:{chanlimit}") chan_slots_used = getTotalChans(net) debug(f"keepChannels() slots_used:{chan_slots_used}") max_chans = (chanlimit * num_instances) - chan_slots_used debug(f"keepChannels() max_chans:{max_chans}") if coverAll: needed = relay - len(getActiveRelays(net)) debug(f"keepChannels() coverAll asking to provision {needed} relays for {net} relay:{relay}") newNums = modules.provision.provisionMultipleRelays(net, needed) flist = [i[0] for i in listinfo] chosen = sorted(flist, reverse=True, key=lambda x: x[1])[:max_chans] populateChans(net, chosen, newNums) else: needed = sigrelay - len(getActiveRelays(net)) debug(f"keepChannels() NOT coverAll asking to provision {needed} relays for {net} sigrelay:{sigrelay}") newNums = modules.provision.provisionMultipleRelays(net, needed) siglist = [i[0] for i in listinfo if int(i[1]) > mean] chosen = sorted(siglist, reverse=True, key=lambda x: x[1])[:max_chans] populateChans(net, chosen, newNums) notifyJoin(net) def joinSingle(net, channel): """ Join a channel on a relay. Use ECA to determine which relay to join on. :param net: network :param channel: channel to join :return: relay number that joined the channel :rtype: int """ eca = emptyChanAllocate(net, [channel], []) if not eca: return False if not len(eca.keys()) == 1: return False num = list(eca.keys())[0] name = f"{net}{num}" if name not in main.IRCPool: return False main.IRCPool[name].join(channel) return num def partSingle(net, channel): """ Iterate over all the relays of net and part channels matching channel. :param net: network :param channel: channel to part :return: list of relays that parted the channel :rtype: list of str """ parted = [] for i in getActiveRelays(net): name = f"{net}{i}" if name in main.IRCPool.keys(): if channel in main.IRCPool[name].channels: main.IRCPool[name].part(channel) parted.append(str(i)) return parted def nukeNetwork(net): """ Remove network records. :param net: network""" # purgeRecords(net) # p = main.g.pipeline() main.g.delete("analytics.list." + net) # p.delete("list."+net) # p.execute() # def nukeNetwork(net): # deferToThread(_nukeNetwork, net) def _initialList(net, num, listinfo, chanlimit): """ Called when a relay receives a full LIST response. Run statistics to determine how many channels are significant. This is done by adding all the numbers of users on the channels together, then dividing by the number of channels. * cumul - cumulative sum of all channel membership * siglength - number of significant channels * listlength - number of channels in the list * sigrelay - number of relays needed to cover siglength * relay - number of relays needed to cover all channels :param net: network :param num: relay number :param listinfo: list of [channel, num_users] lists :param chanlimit: maximum number of channels the relay can join """ listlength = len(listinfo) cumul = 0 try: cumul += sum(int(i[1]) for i in listinfo) except TypeError: warn("Bad LIST data received from %s - %i" % (net, num)) return mean = round(cumul / listlength, 2) siglength = 0 insiglength = 0 sigcumul = 0 insigcumul = 0 for i in listinfo: if int(i[1]) > mean: siglength += 1 sigcumul += int(i[1]) elif int(i[1]) < mean: insiglength += 1 insigcumul += int(i[1]) sigrelay = ceil(siglength / chanlimit) relay = ceil(listlength / chanlimit) abase = "analytics.list.%s" % net p = main.g.pipeline() # See docstring for meanings p.hset(abase, "mean", mean) p.hset(abase, "total", listlength) p.hset(abase, "sigtotal", siglength) p.hset(abase, "insigtotal", insiglength) p.hset(abase, "sigperc", round(siglength / listlength * 100, 2)) p.hset(abase, "insigperc", round(insiglength / listlength * 100, 2)) p.hset(abase, "cumul", cumul) p.hset(abase, "sigcumul", sigcumul) p.hset(abase, "insigcumul", insigcumul) p.hset(abase, "relay", relay) p.hset(abase, "sigrelay", sigrelay) p.hset(abase, "insigrelay", ceil(insiglength / chanlimit)) debug( ( f"_initialList() net:{net} num:{num} listlength:{listlength} " f"mean:{mean} siglength:{siglength} insiglength:{insiglength} " f"sigrelay:{sigrelay} relay:{relay} chanlimit:{chanlimit}" ) ) # Purge existing records before writing # purgeRecords(net) # for i in listinfo: # p.rpush(netbase+"."+i[0], i[1]) # p.rpush(netbase+"."+i[0], i[2]) # p.sadd(netbase, i[0]) p.execute() debug("List parsing completed on %s" % net) keepChannels(net, listinfo, mean, sigrelay, relay, chanlimit) # return (listinfo, mean, sigrelay, relay) def initialList(net, num, listinfo, chanlimit): """ Run _initialList in a thread. See above docstring. """ deferToThread(_initialList, net, num, deepcopy(listinfo), chanlimit)