From 0321651c200f1d9ecb5e63caa52317defa63f874 Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Sat, 12 Oct 2019 21:05:55 +0100 Subject: [PATCH] Implement fair channel allocation in ChanKeep * Allocate channels to relays only if they have free space based on their chanlimit value * Minify channels by removing ones that are already covered before passing them off to be joined --- commands/list.py | 26 +++++++++++ core/bot.py | 6 ++- modules/chankeep.py | 101 +++++++++++++++++++++++++++++++------------ modules/provision.py | 1 - 4 files changed, 103 insertions(+), 31 deletions(-) create mode 100644 commands/list.py diff --git a/commands/list.py b/commands/list.py new file mode 100644 index 0000000..44cd366 --- /dev/null +++ b/commands/list.py @@ -0,0 +1,26 @@ +import main + +class ListCommand: + def __init__(self, *args): + self.list(*args) + + def list(self, addr, authed, data, obj, spl, success, failure, info, incUsage, length): + if authed: + if length == 2: + if not spl[1] in main.network.keys(): + failure("No such network: %s" % spl[1]) + return + if not 1 in main.network[spl[1]].relays.keys(): + failure("Network has no first instance") + return + if not spl[1]+"1" in main.IRCPool.keys(): + failure("No IRC instance: %s - 1" % spl[1]) + return + main.IRCPool[spl[1]+"1"].list() + success("Requested list with first instance of %s" % spl[1]) + return + else: + incUsage("list") + return + else: + incUsage(None) diff --git a/core/bot.py b/core/bot.py index c8c35fd..fe46749 100644 --- a/core/bot.py +++ b/core/bot.py @@ -164,12 +164,15 @@ class IRCBot(IRCClient): increment = 0.7 increment += 0.1 else: - print("Already on %s, skipping." % i) + error("%s - %i - Cannot join channel we are already on - %s" % (self.net, self.num, i)) def checkChannels(self): if self.net in main.TempChan.keys(): if self.num in main.TempChan[self.net].keys(): self.joinChannels(main.TempChan[self.net][self.num]) + del main.TempChan[self.net][self.num] + if not main.TempChan[self.net]: + del main.TempChan[self.net] def event(self, **cast): for i in list(cast.keys()): # Make a copy of the .keys() as Python 3 cannot handle iterating over @@ -392,7 +395,6 @@ class IRCBot(IRCClient): def got_list(self, listinfo): if len(listinfo) == 0: # probably ngircd not supporting LIST >0 return - chankeep.initialList(self.net, self.num, listinfo, self.chanlimit) def isupport(self, options): diff --git a/modules/chankeep.py b/modules/chankeep.py index 0248bb0..2db206d 100644 --- a/modules/chankeep.py +++ b/modules/chankeep.py @@ -13,18 +13,46 @@ def allRelaysActive(net): for i in main.network[net].relays.keys(): name = net+str(i) if name in main.IRCPool.keys(): - existNum += 1 + if main.IRCPool[name].connected: + existNum += 1 if existNum == relayNum: return True return False +def getChanFree(net): + chanfree = {} + for i in main.network[net].relays.keys(): + name = net+str(i) + chanfree[i] = main.IRCPool[name].chanlimit-len(main.IRCPool[name].channels) + return chanfree + +def emptyChanAllocate(net, flist, relay): + chanfree = getChanFree(net) + allocated = {} + toalloc = len(flist) + if toalloc > sum(chanfree.values()): + error("Too many channels to allocate for %s - this is probably a bug" % net) + return False + for i in chanfree.keys(): + for x in range(chanfree[i]): + if not len(flist): + break + if i in allocated.keys(): + allocated[i].append(flist.pop()) + else: + allocated[i] = [flist.pop()] + return allocated + def populateChans(net, clist, relay): - divided = array_split(clist, relay) - for i in range(0, len(divided)): + #divided = array_split(clist, relay) + allocated = emptyChanAllocate(net, clist, relay) + if not allocated: + return + for i in allocated.keys(): if net in main.TempChan.keys(): - main.TempChan[net][i] = divided[i] + main.TempChan[net][i] = allocated[i] else: - main.TempChan[net] = {i: divided[i]} + main.TempChan[net] = {i: allocated[i]} def notifyJoin(net): for i in main.network[net].relays.keys(): @@ -32,10 +60,28 @@ def notifyJoin(net): if name in main.IRCPool.keys(): main.IRCPool[name].checkChannels() +def minifyChans(net, listinfo): + if not allRelaysActive(net): + error("All relays for %s are not active, cannot minify list") + return False + for i in main.network[net].relays.keys(): + name = net+str(i) + for x in main.IRCPool[name].channels: + for y in listinfo: + if y[0] == x: + listinfo.remove(y) + if not listinfo: + log("We're on all the channels we want to be on, dropping LIST") + return False + return listinfo + def keepChannels(net, listinfo, mean, sigrelay, relay): #print("list", listinfo) #print("sigrelay", sigrelay) #print("cur", len(main.network[net].relays.keys())) + listinfo = minifyChans(net, listinfo) + if not listinfo: + return if relay <= main.config["ChanKeep"]["SigSwitch"]: # we can cover all of the channels coverAll = True elif relay > main.config["ChanKeep"]["SigSwitch"]: # we cannot cover all of the channels @@ -58,23 +104,24 @@ def keepChannels(net, listinfo, mean, sigrelay, relay): #print("coverall", coverAll) #print("needed", needed) -def purgeRecords(net): - base = "list.%s" % net - p = main.g.pipeline() - existingChans = main.g.smembers("list."+net) - for i in existingChans: - p.delete(base+"."+i.decode("utf-8")) - p.execute() - -def _nukeNetwork(net): - purgeRecords(net) - p = main.g.pipeline() - p.delete("analytics.list."+net) - p.delete("list."+net) - p.execute() +#def purgeRecords(net): +# base = "list.%s" % net +# p = main.g.pipeline() +# existingChans = main.g.smembers("list."+net) +# for i in existingChans: +# p.delete(base+"."+i.decode("utf-8")) +# p.execute() def nukeNetwork(net): - deferToThread(_nukeNetwork, net) + #purgeRecords(net) + #p = main.g.pipeline() + main.g.delete("analytics.list."+net) + #p.delete("list."+net) + #p.execute() + +#def nukeNetwork(net): +# deferToThread(_nukeNetwork, net) + def _initialList(net, num, listinfo, chanlimit): #listinfo = sorted(listinfo, key=lambda x: xdd[0]) @@ -98,9 +145,6 @@ def _initialList(net, num, listinfo, chanlimit): insiglength += 1 insigcumul += int(i[1]) - if not net in main.network.keys(): - warn("Cannot write list info - no network entry for %s" % net) - return sigrelay = ceil(siglength/chanlimit) relay = ceil(listlength/chanlimit) netbase = "list.%s" % net @@ -118,12 +162,13 @@ def _initialList(net, num, listinfo, chanlimit): p.hset(abase, "relay", relay) p.hset(abase, "sigrelay", sigrelay) p.hset(abase, "insigrelay", ceil(insiglength/chanlimit)) + # Purge existing records before writing - purgeRecords(net) - for i in listinfo: - p.rpush(netbase+"."+i[0], i[1]) - p.rpush(netbase+"."+i[0], i[2]) - p.sadd(netbase, i[0]) + #purgeRecords(net) + #for i in listinfo: + # p.rpush(netbase+"."+i[0], i[1]) + # p.rpush(netbase+"."+i[0], i[2]) + # p.sadd(netbase, i[0]) p.execute() debug("List parsing completed on %s" % net) diff --git a/modules/provision.py b/modules/provision.py index d55635e..3b99cfe 100644 --- a/modules/provision.py +++ b/modules/provision.py @@ -65,7 +65,6 @@ def provisionRelay(num, network): def provisionMultipleRelays(net, relaysNeeded): for i in range(relaysNeeded): num, alias = main.network[net].add_relay() - print(relaysNeeded, "for", net, ":", num, alias) provisionRelay(num, net) main.saveConf("network")