From 496a3d03745b6c64bd1e1103f1ec6390583f9fdf Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Sat, 13 Aug 2022 19:20:29 +0100 Subject: [PATCH] Implement ChanKeep without requiring persistent chanlimits on all networks --- core/bot.py | 2 +- modules/chankeep.py | 143 ++++++++++++++++------------------------- modules/provision.py | 1 + tests/test_chankeep.py | 12 ++-- 4 files changed, 64 insertions(+), 94 deletions(-) diff --git a/core/bot.py b/core/bot.py index 0808ee9..e6b1009 100644 --- a/core/bot.py +++ b/core/bot.py @@ -473,7 +473,7 @@ class IRCBot(IRCClient): if len(listinfo) == 0: # probably ngircd not supporting LIST >0 return if main.config["ChanKeep"]["Enabled"]: - chankeep.initialList(self.net, self.num, listinfo, self.chanlimit) + chankeep.initialList(self.net, self.num, listinfo) def recheckList(self): allRelays = chankeep.allRelaysActive(self.net) diff --git a/modules/chankeep.py b/modules/chankeep.py index ea09274..402de69 100644 --- a/modules/chankeep.py +++ b/modules/chankeep.py @@ -4,10 +4,10 @@ from math import ceil from twisted.internet.threads import deferToThread import main -import modules.provision from utils.logging.debug import debug, trace from utils.logging.log import error, log, warn + def getEnabledRelays(net): """ Get a list of enabled relays for a network. @@ -18,6 +18,7 @@ def getEnabledRelays(net): enabledRelays = [x for x in main.network[net].relays.keys() if main.network[net].relays[x]["enabled"]] return enabledRelays + def getActiveRelays(net): """ Get a list of active relays for a network. @@ -48,6 +49,7 @@ def allRelaysActive(net): debug(f"allRelaysActive() {net}: {relaysActive} ({activeRelays}/{enabledRelays})") return relaysActive + def getAverageChanlimit(net): """ Get the average channel limit for a network. @@ -64,6 +66,7 @@ def getAverageChanlimit(net): debug(f"getAverageChanlimit() {net}: {avg_chanlimit}") return avg_chanlimit + def getSumChanlimit(net): """ Get the sum of all channel limits for a network. @@ -78,32 +81,26 @@ def getSumChanlimit(net): total += main.IRCPool[name].chanlimit return total -def getChanFree(net, new): + +def getChanFree(net): """ Get a dictionary with the free channel spaces for each relay, and a channel limit. Example return: ({1: 99}, 100) :param net: network - :param new: list of newly provisioned relays to skip :return: ({relay: channel spaces}, channel limit) """ chanfree = {} - chanlimits = set() for i in getActiveRelays(net): - if i in new: - continue name = net + str(i) if name not in main.IRCPool.keys(): continue if not main.IRCPool[name].isconnected: continue chanfree[i] = main.IRCPool[name].chanlimit - len(main.IRCPool[name].channels) - chanlimits.add(main.IRCPool[name].chanlimit) - if not len(chanlimits) == 1: - error("Network %s has servers with different CHANLIMIT values" % net) - return False - return (chanfree, chanlimits.pop()) + + return chanfree def getTotalChans(net): @@ -121,7 +118,7 @@ def getTotalChans(net): return total -def emptyChanAllocate(net, flist, new): +def emptyChanAllocate(net, flist): """ Allocate channels to relays. :param net: network @@ -131,64 +128,32 @@ def emptyChanAllocate(net, flist, new): :return: dictionary of {relay: list of channels}""" # Get the free channel spaces for each relay - chanfree = getChanFree(net, new) + chanfree = getChanFree(net) if not chanfree: return # Pretend the newly provisioned relays are already on the network - for i in new: - chanfree[0][i] = chanfree[1] + # for i in new: + # chanfree[0][i] = chanfree[1] allocated = {} - # Copy the list since we're going to mutate it - toalloc = len(flist) - - # Used to correct allocations and provision additional relays - # if the math since the last LIST is a bit wrong - # toalloc:2148 free:{1: 250} chanlimit:250 correction:2147 newlist = list(flist) - if toalloc > sum(chanfree[0].values()): - sum_free = sum(chanfree[0].values()) # 250 - chans_not_covered = toalloc - sum_free # 2148 - 250 = 1898 - relays_needed = chans_not_covered / chanfree[1] # 1898 / 250 = 7.592 - correction = ceil(relays_needed) - if main.config["ChanKeep"]["Provision"]: - debug( - ( - f"emptyChanAllocate() secondary allocation sum_free:{sum_free} " - f"chans_not_covered:{chans_not_covered} relays_needed:{relays_needed} " - f"correction:{correction}" - ) - ) - debug( - ( - f"emptyChanAllocate() not enough free channels: toalloc:{toalloc} " - f"free:{chanfree[0]} chanlimit:{chanfree[1]} correction:{correction}" - ) - ) - warn("Ran out of channel spaces, provisioning additional %i relays for %s" % (correction, net)) - modules.provision.provisionMultipleRelays(net, correction) - return False - else: - # We don't have enough spaces and we can't add any. - # Let's do the best we can in the circumstances. - debug(f"emptyChanAllocate() cannot create additional relays for {net}") - debug(f"emptyChanAllocate() {chans_not_covered} channels cannot be covered") - newlist = newlist[:sum_free] - debug(f"emptyChanAllocate() flist truncated to {sum_free}, length nis now {len(flist)}") - trace(f"emptyChanAllocate() best effort allocation: {flist}") - for i in chanfree[0].keys(): - for x in range(chanfree[0][i]): - if not len(newlist): + sum_free = sum(chanfree.values()) # 250 + trunc_list = newlist[:sum_free] + debug(f"emptyChanAllocate() {net}: newlist:{len(newlist)} trunc_list:{len(trunc_list)}") + + for i in chanfree.keys(): + for x in range(chanfree[i]): + if not len(trunc_list): break if i in allocated.keys(): - allocated[i].append(newlist.pop()) + allocated[i].append(trunc_list.pop()) else: - allocated[i] = [newlist.pop()] + allocated[i] = [trunc_list.pop()] return allocated -def populateChans(net, clist, new): +def populateChans(net, clist): """ Populate channels on relays. Stores channels to join in a list in main.TempChan[net][num] @@ -196,7 +161,7 @@ def populateChans(net, clist, new): :param clist: list of channels to join :param new: list of newly provisioned relays to account for""" # divided = array_split(clist, relay) - allocated = emptyChanAllocate(net, clist, new) + allocated = emptyChanAllocate(net, clist) if not allocated: return for i in allocated.keys(): @@ -242,7 +207,7 @@ def minifyChans(net, listinfo): return listinfo -def keepChannels(net, listinfo, mean, sigrelay, relay, chanlimit): +def keepChannels(net, listinfo, mean, sigrelay, relay): """ Minify channels, determine whether we can cover all the channels on the network, or need to use 'significant' mode. @@ -263,39 +228,40 @@ def keepChannels(net, listinfo, mean, sigrelay, relay, chanlimit): coverAll = True elif relay > main.config["ChanKeep"]["SigSwitch"]: # we cannot cover all of the channels coverAll = False - if not sigrelay <= main.config["ChanKeep"]["MaxRelay"]: - error("Network %s is too big to cover: %i relays required" % (net, sigrelay)) - return + # if not sigrelay <= main.config["ChanKeep"]["MaxRelay"]: + # error("Network %s is too big to cover: %i relays required" % (net, sigrelay)) + # return num_instances = len(getActiveRelays(net)) - debug(f"keepChannels() {net} instances:{num_instances} chanlimit:{chanlimit}") + debug(f"keepChannels() {net} instances:{num_instances}") chan_slots_used = getTotalChans(net) debug(f"keepChannels() slots_used:{chan_slots_used}") - max_chans = (chanlimit * num_instances) - chan_slots_used + # max_chans = (chanlimit * num_instances) - chan_slots_used + max_chans = getSumChanlimit(net) - chan_slots_used debug(f"keepChannels() max_chans:{max_chans}") if coverAll: - needed = relay - len(getActiveRelays(net)) - if needed: - debug(f"keepChannels() coverAll asking to provision {needed} relays for {net} relay:{relay}") - newNums = modules.provision.provisionMultipleRelays(net, needed) - else: - newNums = [] + # needed = relay - len(getActiveRelays(net)) + # if needed: + # debug(f"keepChannels() coverAll asking to provision {needed} relays for {net} relay:{relay}") + # newNums = modules.provision.provisionMultipleRelays(net, needed) + # else: + # newNums = [] flist = [i[0] for i in listinfo] chosen = sorted(flist, reverse=True, key=lambda x: x[1])[:max_chans] debug(f"keepChannels() {net}: joining {len(chosen)}/{len(flist)} channels") trace(f"keepChannels() {net}: joining:{chosen}") - populateChans(net, chosen, newNums) + populateChans(net, chosen) else: - needed = sigrelay - len(getActiveRelays(net)) - if needed: - debug(f"keepChannels() NOT coverAll asking to provision {needed} relays for {net} sigrelay:{sigrelay}") - newNums = modules.provision.provisionMultipleRelays(net, needed) - else: - newNums = [] + # needed = sigrelay - len(getActiveRelays(net)) + # if needed: + # debug(f"keepChannels() NOT coverAll asking to provision {needed} relays for {net} sigrelay:{sigrelay}") + # newNums = modules.provision.provisionMultipleRelays(net, needed) + # else: + # newNums = [] siglist = [i[0] for i in listinfo if int(i[1]) > mean] - chosen = sorted(flist, reverse=True, key=lambda x: x[1])[:max_chans] + chosen = sorted(siglist, reverse=True, key=lambda x: x[1])[:max_chans] debug(f"keepChannels() {net}: joining {len(chosen)}/{len(flist)} channels") trace(f"keepChannels() {net}: joining:{chosen}") - populateChans(net, chosen, newNums) + populateChans(net, chosen) notifyJoin(net) @@ -308,7 +274,7 @@ def joinSingle(net, channel): :return: relay number that joined the channel :rtype: int """ - eca = emptyChanAllocate(net, [channel], []) + eca = emptyChanAllocate(net, [channel]) if not eca: return False if not len(eca.keys()) == 1: @@ -354,7 +320,7 @@ def nukeNetwork(net): # deferToThread(_nukeNetwork, net) -def _initialList(net, num, listinfo, chanlimit): +def _initialList(net, num, listinfo): """ Called when a relay receives a full LIST response. Run statistics to determine how many channels are significant. @@ -390,8 +356,9 @@ def _initialList(net, num, listinfo, chanlimit): insiglength += 1 insigcumul += int(i[1]) - sigrelay = ceil(siglength / chanlimit) - relay = ceil(listlength / chanlimit) + avg_chanlimit = getAverageChanlimit(net) + sigrelay = ceil(siglength / avg_chanlimit) + relay = ceil(listlength / avg_chanlimit) abase = "analytics.list.%s" % net main.g.delete(abase) @@ -409,12 +376,12 @@ def _initialList(net, num, listinfo, chanlimit): p.hset(abase, "small_chan_cumul_mem", insigcumul) p.hset(abase, "relays_for_all_chans", relay) p.hset(abase, "relays_for_big_chans", sigrelay) - p.hset(abase, "relays_for_small_chans", ceil(insiglength / chanlimit)) + p.hset(abase, "relays_for_small_chans", ceil(insiglength / avg_chanlimit)) debug( ( f"_initialList() net:{net} num:{num} listlength:{listlength} " f"mean:{mean} siglength:{siglength} insiglength:{insiglength} " - f"sigrelay:{sigrelay} relay:{relay} chanlimit:{chanlimit}" + f"sigrelay:{sigrelay} relay:{relay} avg_chanlimit:{avg_chanlimit}" ) ) @@ -427,7 +394,7 @@ def _initialList(net, num, listinfo, chanlimit): p.execute() debug("List parsing completed on %s" % net) - keepChannels(net, listinfo, mean, sigrelay, relay, chanlimit) + keepChannels(net, listinfo, mean, sigrelay, relay) # return (listinfo, mean, sigrelay, relay) @@ -453,9 +420,9 @@ def getListInfo(net): return convert(info) -def initialList(net, num, listinfo, chanlimit): +def initialList(net, num, listinfo): """ Run _initialList in a thread. See above docstring. """ - deferToThread(_initialList, net, num, deepcopy(listinfo), chanlimit) + deferToThread(_initialList, net, num, deepcopy(listinfo)) diff --git a/modules/provision.py b/modules/provision.py index ca62585..f517236 100644 --- a/modules/provision.py +++ b/modules/provision.py @@ -52,6 +52,7 @@ def provisionAuthenticationData(num, nick, network, security, auth, password): commands["nickserv"].append("Set %s" % password) inst = modules.regproc.selectInst(network) if "setmode" in inst.keys(): + # perform is loaded above # commands["status"].append("LoadMod perform") commands["perform"] = ["add mode %nick% +" + inst["setmode"]] deliverRelayCommands(num, commands, user=user + "/" + network) diff --git a/tests/test_chankeep.py b/tests/test_chankeep.py index 13c8918..38a0d04 100644 --- a/tests/test_chankeep.py +++ b/tests/test_chankeep.py @@ -16,6 +16,8 @@ class TestChanKeep(TestCase): chankeep.main.g = MagicMock() chankeep.main.g.pipeline = MagicMock() chankeep.main.config["ChanKeep"]["Provision"] = False + chankeep.getAverageChanlimit = MagicMock() + chankeep.getAverageChanlimit.return_value = self.chanlimit self.listinfo = self.generate_listinfo() self.chan_name_list = [x[0] for x in self.listinfo] @@ -89,17 +91,17 @@ class TestChanKeep(TestCase): @patch("modules.chankeep.keepChannels") def test__initialList(self, keepchannels): - chankeep._initialList(self.net, self.num, self.listinfo, self.chanlimit) - net, passed_list, mean, sigrelay, relay, chanlimit = keepchannels.call_args_list[0][0] + chankeep._initialList(self.net, self.num, self.listinfo) + net, passed_list, mean, sigrelay, relay = keepchannels.call_args_list[0][0] self.assertEqual(net, self.net) self.assertEqual(passed_list, self.listinfo) - self.assertEqual(chanlimit, self.chanlimit) + # self.assertEqual(chanlimit, self.chanlimit) # print(net, mean, sigrelay, relay) @patch("modules.chankeep.getChanFree") def test_empty_chan_allocate(self, getchanfree): getchanfree.return_value = ({1: 600}, 600) # pretend we have 600 channels free - eca = chankeep.emptyChanAllocate(self.net, self.chan_name_list, []) + eca = chankeep.emptyChanAllocate(self.net, self.chan_name_list) self.assertEqual(len(eca), 1) num = list(eca.keys())[0] chans = eca[list(eca.keys())[0]] @@ -107,7 +109,7 @@ class TestChanKeep(TestCase): self.assertCountEqual(chans, self.chan_name_list) getchanfree.return_value = ({1: 100}, 10) - eca = chankeep.emptyChanAllocate(self.net, self.chan_name_list, []) + eca = chankeep.emptyChanAllocate(self.net, self.chan_name_list) self.assertEqual(len(eca), 1) num = list(eca.keys())[0] chans = eca[list(eca.keys())[0]]