From 7a6e3338c0b89c6a8973539e87fc9a741ace7996 Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Fri, 11 Oct 2019 13:07:57 +0100 Subject: [PATCH] Implement ChanKeep joining functions * Low-key channel joining with incrementally increasing delay * Spin up needed instances to be able to cover a certain channel space * Fix provisioning functions to prevent race conditions with lots of relays being created at once * Tweakable switchover from covering all channels to only covering channels with more users than the mean of the cumulative user count --- conf/example/config.json | 4 +++ conf/help.json | 1 + core/bot.py | 63 +++++++++++++++++++++++++++++---------- main.py | 1 + modules/chankeep.py | 64 ++++++++++++++++++++++++++++++++++++---- modules/network.py | 6 ++-- modules/provision.py | 24 +++++++-------- 7 files changed, 126 insertions(+), 37 deletions(-) diff --git a/conf/example/config.json b/conf/example/config.json index 0e6117d..7026833 100644 --- a/conf/example/config.json +++ b/conf/example/config.json @@ -22,6 +22,10 @@ "User": "sir", "Password": "sir" }, + "ChanKeep": { + "MaxRelay": 30, + "SigSwitch": 20 + }, "Dist": { "Enabled": true, "SendOutput": false, diff --git a/conf/help.json b/conf/help.json index 3c41b22..0f561c4 100644 --- a/conf/help.json +++ b/conf/help.json @@ -27,5 +27,6 @@ "allc": "allc <(network)|(alias)> ", "admall": "admall ", "swho": "swho []", + "list": "list ", "exec": "exec " } diff --git a/core/bot.py b/core/bot.py index 8089d5e..c8c35fd 100644 --- a/core/bot.py +++ b/core/bot.py @@ -1,8 +1,8 @@ from twisted.internet.protocol import ReconnectingClientFactory from twisted.words.protocols.irc import IRCClient from twisted.internet.defer import Deferred -from twisted.internet.task import LoopingCall, deferLater -from twisted.internet import reactor +from twisted.internet.task import LoopingCall +from twisted.internet import reactor, task from string import digits from random import randint @@ -25,7 +25,6 @@ from utils.logging.send import * from twisted.internet.ssl import DefaultOpenSSLContextFactory def deliverRelayCommands(num, relayCommands, user=None, stage2=None): - # where relay is a dictionary extracted from the Network object keyFN = main.certPath+main.config["Key"] certFN = main.certPath+main.config["Certificate"] contextFactory = DefaultOpenSSLContextFactory(keyFN.encode("utf-8", "replace"), @@ -53,6 +52,7 @@ class IRCRelay(IRCClient): self.relayCommands = relayCommands self.num = num self.stage2 = stage2 + self.loop = None def parsen(self, user): step = user.split("!") @@ -68,6 +68,10 @@ class IRCRelay(IRCClient): def privmsg(self, user, channel, msg): nick, ident, host = self.parsen(user) + if "does not exist" in msg or "doesn't exist" in msg: + error("ZNC issue:", msg) + if "Unable to load" in msg: + error("ZNC issue:", msg) if nick[0] == main.config["Tweaks"]["ZNC"]["Prefix"]: nick = nick[1:] if nick in self.relayCommands.keys(): @@ -77,20 +81,27 @@ class IRCRelay(IRCClient): log("%s: relay password mismatch" % self.num) sendAll("%s: relay password mismatch" % self.num) - def signedOn(self): - self.connected = True - log("signed on as a relay: %s" % self.num) - #sendRelayNotification("Relay", {"type": "conn", "status": "connected"}) nobody actually cares - for i in self.relayCommands.keys(): - for x in self.relayCommands[i]: - self.msg(main.config["Tweaks"]["ZNC"]["Prefix"]+i, x) + def sendStage2(self): if not self.stage2 == None: # [["user", {"sasl": ["message1", "message2"]}], []] if not len(self.stage2) == 0: user = self.stage2[0].pop(0) commands = self.stage2[0].pop(0) del self.stage2[0] deliverRelayCommands(self.num, commands, user, self.stage2) - deferLater(reactor, 1, self.transport.loseConnection) + + def signedOn(self): + self.connected = True + log("signed on as a relay: %s" % self.num) + #sendRelayNotification("Relay", {"type": "conn", "status": "connected"}) nobody actually cares + sleeptime = 0 + increment = 0.8 + for i in self.relayCommands.keys(): + for x in self.relayCommands[i]: + reactor.callLater(sleeptime, self.msg, main.config["Tweaks"]["ZNC"]["Prefix"]+i, x) + sleeptime += increment + increment += 0.8 + reactor.callLater(sleeptime, self.sendStage2) + reactor.callLater(sleeptime+5, self.transport.loseConnection) return class IRCBot(IRCClient): @@ -140,6 +151,26 @@ class IRCBot(IRCClient): return [nick, ident, host] + def joinChannels(self, channels): + sleeptime = 0.0 + increment = 0.8 + for i in channels: + if not i in self.channels: + debug(self.net, "-", self.num, ": joining", i, "in", sleeptime, "seconds") + reactor.callLater(sleeptime, self.join, i) + sleeptime += increment + if sleeptime == 10: + sleeptime = 0.0 + increment = 0.7 + increment += 0.1 + else: + print("Already on %s, skipping." % i) + + def checkChannels(self): + if self.net in main.TempChan.keys(): + if self.num in main.TempChan[self.net].keys(): + self.joinChannels(main.TempChan[self.net][self.num]) + def event(self, **cast): for i in list(cast.keys()): # Make a copy of the .keys() as Python 3 cannot handle iterating over if cast[i] == "": # a dictionary that changes length with each iteration @@ -157,6 +188,8 @@ class IRCBot(IRCClient): del cast["ident"] del cast["host"] del cast["channel"] + if "Disconnected from IRC" in cast["msg"]: + self.connected = False if not cast["type"] in ["query", "self", "highlight", "znc", "who"]: if "channel" in cast.keys() and not cast["type"] == "mode": # don't handle modes here if cast["channel"].lower() == self.nickname.lower(): # as they are channel == nickname @@ -230,8 +263,6 @@ class IRCBot(IRCClient): self.setNick(self._attemptedNick) def irc_ERR_PASSWDMISMATCH(self, prefix, params): - print(locals()) - print(globals()) log("%s - %i: password mismatch" % (self.net, self.num)) sendAll("%s - %i: password mismatch" % (self.net, self.num)) @@ -364,9 +395,6 @@ class IRCBot(IRCClient): chankeep.initialList(self.net, self.num, listinfo, self.chanlimit) - def irc_unknown(self, prefix, command, params): - debug("Unknown message: %s - %s - %s" % (prefix, command, params)) - def isupport(self, options): for i in options: if i.startswith("CHANLIMIT"): @@ -379,6 +407,9 @@ class IRCBot(IRCClient): return except TypeError: warn("Invalid CHANLIMIT: %s" % i) + if self.num == 1: # Only one instance should do a list, so + self.list() # why not this one? :P + self.checkChannels() #twisted sucks so i have to do this to actually get the user info def irc_JOIN(self, prefix, params): diff --git a/main.py b/main.py index 6bb26ad..1e2a767 100644 --- a/main.py +++ b/main.py @@ -28,6 +28,7 @@ relayConnections = {} IRCPool = {} ReactorPool = {} FactoryPool = {} +TempChan = {} MonitorPool = [] diff --git a/modules/chankeep.py b/modules/chankeep.py index 0ed89ac..0248bb0 100644 --- a/modules/chankeep.py +++ b/modules/chankeep.py @@ -2,11 +2,61 @@ import main from utils.logging.log import * from utils.logging.debug import * from copy import deepcopy +from math import ceil +from modules.provision import provisionMultipleRelays from twisted.internet.threads import deferToThread +from numpy import array_split -def provisionInstances(net, relaysNeeded): - #num, alias = - pass +def allRelaysActive(net): + relayNum = len(main.network[net].relays.keys()) + existNum = 0 + for i in main.network[net].relays.keys(): + name = net+str(i) + if name in main.IRCPool.keys(): + existNum += 1 + if existNum == relayNum: + return True + return False + +def populateChans(net, clist, relay): + divided = array_split(clist, relay) + for i in range(0, len(divided)): + if net in main.TempChan.keys(): + main.TempChan[net][i] = divided[i] + else: + main.TempChan[net] = {i: divided[i]} + +def notifyJoin(net): + for i in main.network[net].relays.keys(): + name = net+str(i) + if name in main.IRCPool.keys(): + main.IRCPool[name].checkChannels() + +def keepChannels(net, listinfo, mean, sigrelay, relay): + #print("list", listinfo) + #print("sigrelay", sigrelay) + #print("cur", len(main.network[net].relays.keys())) + if relay <= main.config["ChanKeep"]["SigSwitch"]: # we can cover all of the channels + coverAll = True + elif relay > main.config["ChanKeep"]["SigSwitch"]: # we cannot cover all of the channels + coverAll = False + if not sigrelay <= main.config["ChanKeep"]["MaxRelay"]: + error("Network %s is too big to cover: %i relays required" % (net, sigrelay)) + return + if coverAll: + needed = relay-len(main.network[net].relays.keys()) + flist = [i[0] for i in listinfo] + populateChans(net, flist, relay) + else: + needed = sigrelay-len(main.network[net].relays.keys()) + siglist = [i[0] for i in listinfo if int(i[1]) > mean] + populateChans(net, siglist, sigrelay) + notifyJoin(net) + if needed > 0: + provisionMultipleRelays(net, needed) + + #print("coverall", coverAll) + #print("needed", needed) def purgeRecords(net): base = "list.%s" % net @@ -51,7 +101,8 @@ def _initialList(net, num, listinfo, chanlimit): if not net in main.network.keys(): warn("Cannot write list info - no network entry for %s" % net) return - sigrelay = round(siglength/chanlimit, 2) + sigrelay = ceil(siglength/chanlimit) + relay = ceil(listlength/chanlimit) netbase = "list.%s" % net abase = "analytics.list.%s" % net p = main.g.pipeline() @@ -64,9 +115,9 @@ def _initialList(net, num, listinfo, chanlimit): p.hset(abase, "cumul", cumul) p.hset(abase, "sigcumul", sigcumul) p.hset(abase, "insigcumul", insigcumul) - p.hset(abase, "relay", round(listlength/chanlimit, 2)) + p.hset(abase, "relay", relay) p.hset(abase, "sigrelay", sigrelay) - p.hset(abase, "insigrelay", round(insiglength/chanlimit, 2)) + p.hset(abase, "insigrelay", ceil(insiglength/chanlimit)) # Purge existing records before writing purgeRecords(net) for i in listinfo: @@ -76,6 +127,7 @@ def _initialList(net, num, listinfo, chanlimit): p.execute() debug("List parsing completed on %s" % net) + keepChannels(net, listinfo, mean, sigrelay, relay) def initialList(net, num, listinfo, chanlimit): deferToThread(_initialList, net, num, deepcopy(listinfo), chanlimit) diff --git a/modules/network.py b/modules/network.py index 3758460..9e235a1 100644 --- a/modules/network.py +++ b/modules/network.py @@ -17,14 +17,16 @@ class Network: self.security = security self.auth = auth - self.last = 0 + self.last = 1 self.relays = {} self.aliases = {} def add_relay(self, num=None): if not num: - self.last += 1 num = self.last + self.last += 1 + elif num == self.last: + self.last += 1 self.relays[num] = { "enabled": main.config["ConnectOnCreate"], "net": self.net, diff --git a/modules/provision.py b/modules/provision.py index 41d669c..d55635e 100644 --- a/modules/provision.py +++ b/modules/provision.py @@ -1,8 +1,10 @@ import main from core.bot import deliverRelayCommands from utils.logging.log import * +from twisted.internet import reactor -def provisionUserData(num, nick, altnick, ident, realname, unused): # last field is password, which we don't want to inherit here, but still want to use * expansion, so this is a bit of a hack +def provisionUserData(num, nick, altnick, ident, realname, unused): # last field is password, + # which we don't want to inherit here, but still want to use * expansion, so this is a bit of a hack commands = {} commands["controlpanel"] = [] commands["controlpanel"].append("AddUser %s %s" % (nick, main.config["Relay"]["Password"])) @@ -16,7 +18,7 @@ def provisionUserData(num, nick, altnick, ident, realname, unused): # last field def provisionNetworkData(num, nick, network, host, port, security, auth, password): commands = {} stage2commands = {} - stage3commands = {} + stage2commands["status"] = [] commands["controlpanel"] = [] commands["controlpanel"].append("AddNetwork %s %s" % (nick, network)) if security == "ssl": @@ -25,26 +27,21 @@ def provisionNetworkData(num, nick, network, host, port, security, auth, passwor elif security == "plain": commands["controlpanel"].append("AddServer %s %s %s %s" % (nick, network, host, port)) if auth == "sasl": - stage2commands["status"] = [] stage2commands["sasl"] = [] stage2commands["status"].append("LoadMod sasl") stage2commands["sasl"].append("Mechanism plain") stage2commands["sasl"].append("Set %s %s" % (nick, password)) elif auth == "ns": - stage2commands["status"] = [] stage2commands["nickserv"] = [] stage2commands["status"].append("LoadMod nickserv") stage2commands["nickserv"].append("Set %s" % password) if not main.config["ConnectOnCreate"]: - stage3commands["status"] = [] - stage3commands["status"].append("Disconnect") + stage2commands["status"].append("Disconnect") if main.config["Toggles"]["CycleChans"]: - stage2commands["status"] = [] stage2commands["status"].append("LoadMod disconkick") stage2commands["status"].append("LoadMod chansaver") deliverRelayCommands(num, commands, - stage2=[[nick+"/"+network, stage2commands], - [nick+"/"+network, stage3commands]]) + stage2=[[nick+"/"+network, stage2commands]]) return def provisionRelayForNetwork(num, alias, network): @@ -60,14 +57,15 @@ def provisionRelay(num, network): aliasObj = main.alias[num] alias = aliasObj["nick"] provisionUserData(num, *aliasObj.values()) - provisionRelayForNetwork(num, alias, network) + reactor.callLater(5, provisionRelayForNetwork, num, alias, network) if main.config["ConnectOnCreate"]: - main.network[network].start_bot(num) - return alias + reactor.callLater(10, main.network[network].start_bot, num) + return def provisionMultipleRelays(net, relaysNeeded): - for i in range(1, relaysNeeded): + for i in range(relaysNeeded): num, alias = main.network[net].add_relay() + print(relaysNeeded, "for", net, ":", num, alias) provisionRelay(num, net) main.saveConf("network")