Implement ChanKeep joining functions

* Low-key channel joining with incrementally increasing delay
* Spin up needed instances to be able to cover a certain channel space
* Fix provisioning functions to prevent race conditions with lots of
relays being created at once
* Tweakable switchover from covering all channels to only covering
channels with more users than the mean of the cumulative user count
This commit is contained in:
Mark Veidemanis 2019-10-11 13:07:57 +01:00
parent c3d0cb04b6
commit 7a6e3338c0
7 changed files with 126 additions and 37 deletions

View File

@ -22,6 +22,10 @@
"User": "sir",
"Password": "sir"
},
"ChanKeep": {
"MaxRelay": 30,
"SigSwitch": 20
},
"Dist": {
"Enabled": true,
"SendOutput": false,

View File

@ -27,5 +27,6 @@
"allc": "allc <network|alias> <(network)|(alias)> <entity> <text ...>",
"admall": "admall <entity> <text ...>",
"swho": "swho <network> [<channel>]",
"list": "list <network>",
"exec": "exec <expr ...>"
}

View File

@ -1,8 +1,8 @@
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.words.protocols.irc import IRCClient
from twisted.internet.defer import Deferred
from twisted.internet.task import LoopingCall, deferLater
from twisted.internet import reactor
from twisted.internet.task import LoopingCall
from twisted.internet import reactor, task
from string import digits
from random import randint
@ -25,7 +25,6 @@ from utils.logging.send import *
from twisted.internet.ssl import DefaultOpenSSLContextFactory
def deliverRelayCommands(num, relayCommands, user=None, stage2=None):
# where relay is a dictionary extracted from the Network object
keyFN = main.certPath+main.config["Key"]
certFN = main.certPath+main.config["Certificate"]
contextFactory = DefaultOpenSSLContextFactory(keyFN.encode("utf-8", "replace"),
@ -53,6 +52,7 @@ class IRCRelay(IRCClient):
self.relayCommands = relayCommands
self.num = num
self.stage2 = stage2
self.loop = None
def parsen(self, user):
step = user.split("!")
@ -68,6 +68,10 @@ class IRCRelay(IRCClient):
def privmsg(self, user, channel, msg):
nick, ident, host = self.parsen(user)
if "does not exist" in msg or "doesn't exist" in msg:
error("ZNC issue:", msg)
if "Unable to load" in msg:
error("ZNC issue:", msg)
if nick[0] == main.config["Tweaks"]["ZNC"]["Prefix"]:
nick = nick[1:]
if nick in self.relayCommands.keys():
@ -77,20 +81,27 @@ class IRCRelay(IRCClient):
log("%s: relay password mismatch" % self.num)
sendAll("%s: relay password mismatch" % self.num)
def signedOn(self):
self.connected = True
log("signed on as a relay: %s" % self.num)
#sendRelayNotification("Relay", {"type": "conn", "status": "connected"}) nobody actually cares
for i in self.relayCommands.keys():
for x in self.relayCommands[i]:
self.msg(main.config["Tweaks"]["ZNC"]["Prefix"]+i, x)
def sendStage2(self):
if not self.stage2 == None: # [["user", {"sasl": ["message1", "message2"]}], []]
if not len(self.stage2) == 0:
user = self.stage2[0].pop(0)
commands = self.stage2[0].pop(0)
del self.stage2[0]
deliverRelayCommands(self.num, commands, user, self.stage2)
deferLater(reactor, 1, self.transport.loseConnection)
def signedOn(self):
self.connected = True
log("signed on as a relay: %s" % self.num)
#sendRelayNotification("Relay", {"type": "conn", "status": "connected"}) nobody actually cares
sleeptime = 0
increment = 0.8
for i in self.relayCommands.keys():
for x in self.relayCommands[i]:
reactor.callLater(sleeptime, self.msg, main.config["Tweaks"]["ZNC"]["Prefix"]+i, x)
sleeptime += increment
increment += 0.8
reactor.callLater(sleeptime, self.sendStage2)
reactor.callLater(sleeptime+5, self.transport.loseConnection)
return
class IRCBot(IRCClient):
@ -140,6 +151,26 @@ class IRCBot(IRCClient):
return [nick, ident, host]
def joinChannels(self, channels):
sleeptime = 0.0
increment = 0.8
for i in channels:
if not i in self.channels:
debug(self.net, "-", self.num, ": joining", i, "in", sleeptime, "seconds")
reactor.callLater(sleeptime, self.join, i)
sleeptime += increment
if sleeptime == 10:
sleeptime = 0.0
increment = 0.7
increment += 0.1
else:
print("Already on %s, skipping." % i)
def checkChannels(self):
if self.net in main.TempChan.keys():
if self.num in main.TempChan[self.net].keys():
self.joinChannels(main.TempChan[self.net][self.num])
def event(self, **cast):
for i in list(cast.keys()): # Make a copy of the .keys() as Python 3 cannot handle iterating over
if cast[i] == "": # a dictionary that changes length with each iteration
@ -157,6 +188,8 @@ class IRCBot(IRCClient):
del cast["ident"]
del cast["host"]
del cast["channel"]
if "Disconnected from IRC" in cast["msg"]:
self.connected = False
if not cast["type"] in ["query", "self", "highlight", "znc", "who"]:
if "channel" in cast.keys() and not cast["type"] == "mode": # don't handle modes here
if cast["channel"].lower() == self.nickname.lower(): # as they are channel == nickname
@ -230,8 +263,6 @@ class IRCBot(IRCClient):
self.setNick(self._attemptedNick)
def irc_ERR_PASSWDMISMATCH(self, prefix, params):
print(locals())
print(globals())
log("%s - %i: password mismatch" % (self.net, self.num))
sendAll("%s - %i: password mismatch" % (self.net, self.num))
@ -364,9 +395,6 @@ class IRCBot(IRCClient):
chankeep.initialList(self.net, self.num, listinfo, self.chanlimit)
def irc_unknown(self, prefix, command, params):
debug("Unknown message: %s - %s - %s" % (prefix, command, params))
def isupport(self, options):
for i in options:
if i.startswith("CHANLIMIT"):
@ -379,6 +407,9 @@ class IRCBot(IRCClient):
return
except TypeError:
warn("Invalid CHANLIMIT: %s" % i)
if self.num == 1: # Only one instance should do a list, so
self.list() # why not this one? :P
self.checkChannels()
#twisted sucks so i have to do this to actually get the user info
def irc_JOIN(self, prefix, params):

View File

@ -28,6 +28,7 @@ relayConnections = {}
IRCPool = {}
ReactorPool = {}
FactoryPool = {}
TempChan = {}
MonitorPool = []

View File

@ -2,11 +2,61 @@ import main
from utils.logging.log import *
from utils.logging.debug import *
from copy import deepcopy
from math import ceil
from modules.provision import provisionMultipleRelays
from twisted.internet.threads import deferToThread
from numpy import array_split
def provisionInstances(net, relaysNeeded):
#num, alias =
pass
def allRelaysActive(net):
relayNum = len(main.network[net].relays.keys())
existNum = 0
for i in main.network[net].relays.keys():
name = net+str(i)
if name in main.IRCPool.keys():
existNum += 1
if existNum == relayNum:
return True
return False
def populateChans(net, clist, relay):
divided = array_split(clist, relay)
for i in range(0, len(divided)):
if net in main.TempChan.keys():
main.TempChan[net][i] = divided[i]
else:
main.TempChan[net] = {i: divided[i]}
def notifyJoin(net):
for i in main.network[net].relays.keys():
name = net+str(i)
if name in main.IRCPool.keys():
main.IRCPool[name].checkChannels()
def keepChannels(net, listinfo, mean, sigrelay, relay):
#print("list", listinfo)
#print("sigrelay", sigrelay)
#print("cur", len(main.network[net].relays.keys()))
if relay <= main.config["ChanKeep"]["SigSwitch"]: # we can cover all of the channels
coverAll = True
elif relay > main.config["ChanKeep"]["SigSwitch"]: # we cannot cover all of the channels
coverAll = False
if not sigrelay <= main.config["ChanKeep"]["MaxRelay"]:
error("Network %s is too big to cover: %i relays required" % (net, sigrelay))
return
if coverAll:
needed = relay-len(main.network[net].relays.keys())
flist = [i[0] for i in listinfo]
populateChans(net, flist, relay)
else:
needed = sigrelay-len(main.network[net].relays.keys())
siglist = [i[0] for i in listinfo if int(i[1]) > mean]
populateChans(net, siglist, sigrelay)
notifyJoin(net)
if needed > 0:
provisionMultipleRelays(net, needed)
#print("coverall", coverAll)
#print("needed", needed)
def purgeRecords(net):
base = "list.%s" % net
@ -51,7 +101,8 @@ def _initialList(net, num, listinfo, chanlimit):
if not net in main.network.keys():
warn("Cannot write list info - no network entry for %s" % net)
return
sigrelay = round(siglength/chanlimit, 2)
sigrelay = ceil(siglength/chanlimit)
relay = ceil(listlength/chanlimit)
netbase = "list.%s" % net
abase = "analytics.list.%s" % net
p = main.g.pipeline()
@ -64,9 +115,9 @@ def _initialList(net, num, listinfo, chanlimit):
p.hset(abase, "cumul", cumul)
p.hset(abase, "sigcumul", sigcumul)
p.hset(abase, "insigcumul", insigcumul)
p.hset(abase, "relay", round(listlength/chanlimit, 2))
p.hset(abase, "relay", relay)
p.hset(abase, "sigrelay", sigrelay)
p.hset(abase, "insigrelay", round(insiglength/chanlimit, 2))
p.hset(abase, "insigrelay", ceil(insiglength/chanlimit))
# Purge existing records before writing
purgeRecords(net)
for i in listinfo:
@ -76,6 +127,7 @@ def _initialList(net, num, listinfo, chanlimit):
p.execute()
debug("List parsing completed on %s" % net)
keepChannels(net, listinfo, mean, sigrelay, relay)
def initialList(net, num, listinfo, chanlimit):
deferToThread(_initialList, net, num, deepcopy(listinfo), chanlimit)

View File

@ -17,14 +17,16 @@ class Network:
self.security = security
self.auth = auth
self.last = 0
self.last = 1
self.relays = {}
self.aliases = {}
def add_relay(self, num=None):
if not num:
self.last += 1
num = self.last
self.last += 1
elif num == self.last:
self.last += 1
self.relays[num] = {
"enabled": main.config["ConnectOnCreate"],
"net": self.net,

View File

@ -1,8 +1,10 @@
import main
from core.bot import deliverRelayCommands
from utils.logging.log import *
from twisted.internet import reactor
def provisionUserData(num, nick, altnick, ident, realname, unused): # last field is password, which we don't want to inherit here, but still want to use * expansion, so this is a bit of a hack
def provisionUserData(num, nick, altnick, ident, realname, unused): # last field is password,
# which we don't want to inherit here, but still want to use * expansion, so this is a bit of a hack
commands = {}
commands["controlpanel"] = []
commands["controlpanel"].append("AddUser %s %s" % (nick, main.config["Relay"]["Password"]))
@ -16,7 +18,7 @@ def provisionUserData(num, nick, altnick, ident, realname, unused): # last field
def provisionNetworkData(num, nick, network, host, port, security, auth, password):
commands = {}
stage2commands = {}
stage3commands = {}
stage2commands["status"] = []
commands["controlpanel"] = []
commands["controlpanel"].append("AddNetwork %s %s" % (nick, network))
if security == "ssl":
@ -25,26 +27,21 @@ def provisionNetworkData(num, nick, network, host, port, security, auth, passwor
elif security == "plain":
commands["controlpanel"].append("AddServer %s %s %s %s" % (nick, network, host, port))
if auth == "sasl":
stage2commands["status"] = []
stage2commands["sasl"] = []
stage2commands["status"].append("LoadMod sasl")
stage2commands["sasl"].append("Mechanism plain")
stage2commands["sasl"].append("Set %s %s" % (nick, password))
elif auth == "ns":
stage2commands["status"] = []
stage2commands["nickserv"] = []
stage2commands["status"].append("LoadMod nickserv")
stage2commands["nickserv"].append("Set %s" % password)
if not main.config["ConnectOnCreate"]:
stage3commands["status"] = []
stage3commands["status"].append("Disconnect")
stage2commands["status"].append("Disconnect")
if main.config["Toggles"]["CycleChans"]:
stage2commands["status"] = []
stage2commands["status"].append("LoadMod disconkick")
stage2commands["status"].append("LoadMod chansaver")
deliverRelayCommands(num, commands,
stage2=[[nick+"/"+network, stage2commands],
[nick+"/"+network, stage3commands]])
stage2=[[nick+"/"+network, stage2commands]])
return
def provisionRelayForNetwork(num, alias, network):
@ -60,14 +57,15 @@ def provisionRelay(num, network):
aliasObj = main.alias[num]
alias = aliasObj["nick"]
provisionUserData(num, *aliasObj.values())
provisionRelayForNetwork(num, alias, network)
reactor.callLater(5, provisionRelayForNetwork, num, alias, network)
if main.config["ConnectOnCreate"]:
main.network[network].start_bot(num)
return alias
reactor.callLater(10, main.network[network].start_bot, num)
return
def provisionMultipleRelays(net, relaysNeeded):
for i in range(1, relaysNeeded):
for i in range(relaysNeeded):
num, alias = main.network[net].add_relay()
print(relaysNeeded, "for", net, ":", num, alias)
provisionRelay(num, net)
main.saveConf("network")