monolith/modules/chankeep.py

243 lines
7.4 KiB
Python
Raw Normal View History

2019-10-08 20:10:42 +00:00
from copy import deepcopy
from math import ceil
2022-07-21 12:40:09 +00:00
from twisted.internet.threads import deferToThread
2022-07-21 12:39:59 +00:00
2022-07-21 12:40:09 +00:00
import main
import modules.provision
from utils.logging.debug import debug
from utils.logging.log import error, log, warn
2022-07-21 12:39:41 +00:00
2022-08-11 18:22:09 +00:00
def getActiveRelays(net):
activeRelays = [x for x in main.network[net].relays.keys() if main.network[net].relays[x]["enabled"]]
return activeRelays
def allRelaysActive(net):
2022-08-11 18:22:09 +00:00
"""
Check if all enabled relays are active and authenticated.
"""
activeRelays = getActiveRelays(net)
debug(f"allRelaysActive() active relays for {net}: {activeRelays}")
relayNum = len(activeRelays)
existNum = 0
2022-08-11 18:22:09 +00:00
for i in activeRelays:
2022-07-21 12:39:41 +00:00
name = net + str(i)
if name in main.IRCPool.keys():
if main.IRCPool[name].authenticated:
existNum += 1
debug(f"allRelaysActive() finished, {existNum}/{relayNum} relays active for {net}")
if existNum == relayNum:
return True
return False
2022-07-21 12:39:41 +00:00
def getChanFree(net, new):
"""
Get a dictionary with the free channel spaces for
each relay, and a channel limit.
Example return:
({1: 99}, 100)
:param net: network
:param new: list of newly provisioned relays to skip
:return: ({relay: channel spaces}, channel limit)
"""
chanfree = {}
chanlimits = set()
2022-08-11 18:22:09 +00:00
for i in getActiveRelays(net):
if i in new:
continue
2022-07-21 12:39:41 +00:00
name = net + str(i)
if name not in main.IRCPool.keys():
continue
if not main.IRCPool[name].isconnected:
continue
2022-07-21 12:39:41 +00:00
chanfree[i] = main.IRCPool[name].chanlimit - len(main.IRCPool[name].channels)
chanlimits.add(main.IRCPool[name].chanlimit)
if not len(chanlimits) == 1:
error("Network %s has servers with different CHANLIMIT values" % net)
return False
return (chanfree, chanlimits.pop())
2022-07-21 12:39:41 +00:00
def emptyChanAllocate(net, flist, relay, new):
chanfree = getChanFree(net, new)
if not chanfree:
return
for i in new:
chanfree[0][i] = chanfree[1]
allocated = {}
toalloc = len(flist)
if toalloc > sum(chanfree[0].values()):
2022-07-21 12:39:41 +00:00
correction = round(toalloc - sum(chanfree[0].values()) / chanfree[1])
warn("Ran out of channel spaces, provisioning additional %i relays for %s" % (correction, net))
2022-08-11 18:22:09 +00:00
modules.provision.provisionMultipleRelays(net, correction)
return False
for i in chanfree[0].keys():
for x in range(chanfree[0][i]):
if not len(flist):
break
if i in allocated.keys():
allocated[i].append(flist.pop())
else:
allocated[i] = [flist.pop()]
return allocated
2022-07-21 12:39:41 +00:00
def populateChans(net, clist, relay, new):
2022-07-21 12:39:41 +00:00
# divided = array_split(clist, relay)
allocated = emptyChanAllocate(net, clist, relay, new)
if not allocated:
return
for i in allocated.keys():
if net in main.TempChan.keys():
main.TempChan[net][i] = allocated[i]
else:
main.TempChan[net] = {i: allocated[i]}
2022-07-21 12:39:41 +00:00
def notifyJoin(net):
2022-08-11 18:22:09 +00:00
for i in getActiveRelays(net):
2022-07-21 12:39:41 +00:00
name = net + str(i)
if name in main.IRCPool.keys():
main.IRCPool[name].checkChannels()
2022-07-21 12:39:41 +00:00
def minifyChans(net, listinfo):
if not allRelaysActive(net):
error("All relays for %s are not active, cannot minify list" % net)
return False
2022-08-11 18:22:09 +00:00
for i in getActiveRelays(net):
2022-07-21 12:39:41 +00:00
name = net + str(i)
for x in main.IRCPool[name].channels:
for y in listinfo:
if y[0] == x:
listinfo.remove(y)
if not listinfo:
log("We're on all the channels we want to be on, dropping LIST")
return False
return listinfo
2022-07-21 12:39:41 +00:00
def keepChannels(net, listinfo, mean, sigrelay, relay):
listinfo = minifyChans(net, listinfo)
if not listinfo:
return
if relay <= main.config["ChanKeep"]["SigSwitch"]: # we can cover all of the channels
coverAll = True
elif relay > main.config["ChanKeep"]["SigSwitch"]: # we cannot cover all of the channels
coverAll = False
if not sigrelay <= main.config["ChanKeep"]["MaxRelay"]:
error("Network %s is too big to cover: %i relays required" % (net, sigrelay))
return
if coverAll:
2022-08-11 18:22:09 +00:00
needed = relay - len(getActiveRelays(net))
newNums = modules.provision.provisionMultipleRelays(net, needed)
flist = [i[0] for i in listinfo]
populateChans(net, flist, relay, newNums)
else:
2022-08-11 18:22:09 +00:00
needed = sigrelay - len(getActiveRelays(net))
newNums = modules.provision.provisionMultipleRelays(net, needed)
siglist = [i[0] for i in listinfo if int(i[1]) > mean]
populateChans(net, siglist, sigrelay, newNums)
notifyJoin(net)
2022-07-21 12:39:41 +00:00
2020-11-02 20:14:02 +00:00
def joinSingle(net, channel):
eca = emptyChanAllocate(net, [channel], None, [])
if not eca:
return False
if not len(eca.keys()) == 1:
return False
num = list(eca.keys())[0]
name = f"{net}{num}"
if name not in main.IRCPool:
2020-11-02 20:14:02 +00:00
return False
main.IRCPool[name].join(channel)
return num
2020-11-02 20:14:02 +00:00
2022-07-21 12:39:41 +00:00
def partSingle(net, channel):
"""
Iterate over all the relays of net and part channels matching channel.
:param net:
:param channel:
:return:
"""
parted = []
2022-08-11 18:22:09 +00:00
for i in getActiveRelays(net):
name = f"{net}{i}"
if name in main.IRCPool.keys():
if channel in main.IRCPool[name].channels:
main.IRCPool[name].part(channel)
parted.append(str(i))
return parted
2019-10-08 20:10:42 +00:00
def nukeNetwork(net):
2022-07-21 12:39:41 +00:00
# purgeRecords(net)
# p = main.g.pipeline()
main.g.delete("analytics.list." + net)
# p.delete("list."+net)
# p.execute()
2022-07-21 12:39:41 +00:00
# def nukeNetwork(net):
# deferToThread(_nukeNetwork, net)
2019-10-08 20:10:42 +00:00
def _initialList(net, num, listinfo, chanlimit):
listlength = len(listinfo)
cumul = 0
try:
cumul += sum(int(i[1]) for i in listinfo)
except TypeError:
warn("Bad LIST data received from %s - %i" % (net, num))
return
2022-07-21 12:39:41 +00:00
mean = round(cumul / listlength, 2)
2019-10-08 20:10:42 +00:00
siglength = 0
insiglength = 0
sigcumul = 0
insigcumul = 0
for i in listinfo:
if int(i[1]) > mean:
2019-10-08 20:10:42 +00:00
siglength += 1
sigcumul += int(i[1])
elif int(i[1]) < mean:
2019-10-08 20:10:42 +00:00
insiglength += 1
insigcumul += int(i[1])
2022-07-21 12:39:41 +00:00
sigrelay = ceil(siglength / chanlimit)
relay = ceil(listlength / chanlimit)
2022-07-21 12:40:05 +00:00
# netbase = "list.%s" % net
2019-10-08 20:10:42 +00:00
abase = "analytics.list.%s" % net
p = main.g.pipeline()
p.hset(abase, "mean", mean)
p.hset(abase, "total", listlength)
p.hset(abase, "sigtotal", siglength)
p.hset(abase, "insigtotal", insiglength)
2022-07-21 12:39:41 +00:00
p.hset(abase, "sigperc", round(siglength / listlength * 100, 2))
p.hset(abase, "insigperc", round(insiglength / listlength * 100, 2))
2019-10-08 20:10:42 +00:00
p.hset(abase, "cumul", cumul)
p.hset(abase, "sigcumul", sigcumul)
p.hset(abase, "insigcumul", insigcumul)
p.hset(abase, "relay", relay)
2019-10-08 20:10:42 +00:00
p.hset(abase, "sigrelay", sigrelay)
2022-07-21 12:39:41 +00:00
p.hset(abase, "insigrelay", ceil(insiglength / chanlimit))
2019-10-08 20:10:42 +00:00
# Purge existing records before writing
2022-07-21 12:39:41 +00:00
# purgeRecords(net)
# for i in listinfo:
# p.rpush(netbase+"."+i[0], i[1])
# p.rpush(netbase+"."+i[0], i[2])
# p.sadd(netbase, i[0])
2019-10-08 20:10:42 +00:00
p.execute()
debug("List parsing completed on %s" % net)
keepChannels(net, listinfo, mean, sigrelay, relay)
2019-10-08 20:10:42 +00:00
2022-07-21 12:39:41 +00:00
2019-10-08 20:10:42 +00:00
def initialList(net, num, listinfo, chanlimit):
deferToThread(_initialList, net, num, deepcopy(listinfo), chanlimit)