from copy import deepcopy from math import ceil from twisted.internet.threads import deferToThread import main import modules.provision from utils.logging.debug import debug from utils.logging.log import error, log, warn def allRelaysActive(net): relayNum = len(main.network[net].relays.keys()) existNum = 0 for i in main.network[net].relays.keys(): name = net + str(i) if name in main.IRCPool.keys(): if main.IRCPool[name].authenticated: existNum += 1 if existNum == relayNum: return True return False def getChanFree(net, new): chanfree = {} chanlimits = set() for i in main.network[net].relays.keys(): if i in new: continue name = net + str(i) chanfree[i] = main.IRCPool[name].chanlimit - len(main.IRCPool[name].channels) chanlimits.add(main.IRCPool[name].chanlimit) if not len(chanlimits) == 1: error("Network %s has servers with different CHANLIMIT values" % net) return False return (chanfree, chanlimits.pop()) def emptyChanAllocate(net, flist, relay, new): chanfree = getChanFree(net, new) if not chanfree: return for i in new: chanfree[0][i] = chanfree[1] allocated = {} toalloc = len(flist) if toalloc > sum(chanfree[0].values()): correction = round(toalloc - sum(chanfree[0].values()) / chanfree[1]) # print("correction", correction) warn("Ran out of channel spaces, provisioning additional %i relays for %s" % (correction, net)) # newNums = modules.provision.provisionMultipleRelays(net, correction) return False for i in chanfree[0].keys(): for x in range(chanfree[0][i]): if not len(flist): break if i in allocated.keys(): allocated[i].append(flist.pop()) else: allocated[i] = [flist.pop()] return allocated def populateChans(net, clist, relay, new): # divided = array_split(clist, relay) allocated = emptyChanAllocate(net, clist, relay, new) if not allocated: return for i in allocated.keys(): if net in main.TempChan.keys(): main.TempChan[net][i] = allocated[i] else: main.TempChan[net] = {i: allocated[i]} def notifyJoin(net): for i in main.network[net].relays.keys(): name = net + str(i) if name in main.IRCPool.keys(): main.IRCPool[name].checkChannels() def minifyChans(net, listinfo): if not allRelaysActive(net): error("All relays for %s are not active, cannot minify list" % net) return False for i in main.network[net].relays.keys(): name = net + str(i) for x in main.IRCPool[name].channels: for y in listinfo: if y[0] == x: listinfo.remove(y) if not listinfo: log("We're on all the channels we want to be on, dropping LIST") return False return listinfo def keepChannels(net, listinfo, mean, sigrelay, relay): listinfo = minifyChans(net, listinfo) if not listinfo: return if relay <= main.config["ChanKeep"]["SigSwitch"]: # we can cover all of the channels coverAll = True elif relay > main.config["ChanKeep"]["SigSwitch"]: # we cannot cover all of the channels coverAll = False if not sigrelay <= main.config["ChanKeep"]["MaxRelay"]: error("Network %s is too big to cover: %i relays required" % (net, sigrelay)) return if coverAll: needed = relay - len(main.network[net].relays.keys()) newNums = modules.provision.provisionMultipleRelays(net, needed) flist = [i[0] for i in listinfo] populateChans(net, flist, relay, newNums) else: needed = sigrelay - len(main.network[net].relays.keys()) newNums = modules.provision.provisionMultipleRelays(net, needed) siglist = [i[0] for i in listinfo if int(i[1]) > mean] populateChans(net, siglist, sigrelay, newNums) notifyJoin(net) def joinSingle(net, channel): if allRelaysActive(net): chanfree = getChanFree(net, []) print("chanfree", chanfree) for i in chanfree[0]: if chanfree[0][i] < 0: print("JOIN CHAN") else: error("All relays for %s are not active" % net) return False def nukeNetwork(net): # purgeRecords(net) # p = main.g.pipeline() main.g.delete("analytics.list." + net) # p.delete("list."+net) # p.execute() # def nukeNetwork(net): # deferToThread(_nukeNetwork, net) def _initialList(net, num, listinfo, chanlimit): listlength = len(listinfo) cumul = 0 try: cumul += sum(int(i[1]) for i in listinfo) except TypeError: warn("Bad LIST data received from %s - %i" % (net, num)) return mean = round(cumul / listlength, 2) siglength = 0 insiglength = 0 sigcumul = 0 insigcumul = 0 for i in listinfo: if int(i[1]) > mean: siglength += 1 sigcumul += int(i[1]) elif int(i[1]) < mean: insiglength += 1 insigcumul += int(i[1]) sigrelay = ceil(siglength / chanlimit) relay = ceil(listlength / chanlimit) # netbase = "list.%s" % net abase = "analytics.list.%s" % net p = main.g.pipeline() p.hset(abase, "mean", mean) p.hset(abase, "total", listlength) p.hset(abase, "sigtotal", siglength) p.hset(abase, "insigtotal", insiglength) p.hset(abase, "sigperc", round(siglength / listlength * 100, 2)) p.hset(abase, "insigperc", round(insiglength / listlength * 100, 2)) p.hset(abase, "cumul", cumul) p.hset(abase, "sigcumul", sigcumul) p.hset(abase, "insigcumul", insigcumul) p.hset(abase, "relay", relay) p.hset(abase, "sigrelay", sigrelay) p.hset(abase, "insigrelay", ceil(insiglength / chanlimit)) # Purge existing records before writing # purgeRecords(net) # for i in listinfo: # p.rpush(netbase+"."+i[0], i[1]) # p.rpush(netbase+"."+i[0], i[2]) # p.sadd(netbase, i[0]) p.execute() debug("List parsing completed on %s" % net) keepChannels(net, listinfo, mean, sigrelay, relay) def initialList(net, num, listinfo, chanlimit): deferToThread(_initialList, net, num, deepcopy(listinfo), chanlimit)