monolith/modules/chankeep.py

199 lines
6.3 KiB
Python

import main
from utils.logging.log import log, warn, error
from utils.logging.debug import debug
from copy import deepcopy
from math import ceil
import modules.provision
from twisted.internet.threads import deferToThread
def allRelaysActive(net):
relayNum = len(main.network[net].relays.keys())
existNum = 0
for i in main.network[net].relays.keys():
name = net + str(i)
if name in main.IRCPool.keys():
if main.IRCPool[name].authenticated:
existNum += 1
if existNum == relayNum:
return True
return False
def getChanFree(net, new):
chanfree = {}
chanlimits = set()
for i in main.network[net].relays.keys():
if i in new:
continue
name = net + str(i)
chanfree[i] = main.IRCPool[name].chanlimit - len(main.IRCPool[name].channels)
chanlimits.add(main.IRCPool[name].chanlimit)
if not len(chanlimits) == 1:
error("Network %s has servers with different CHANLIMIT values" % net)
return False
return (chanfree, chanlimits.pop())
def emptyChanAllocate(net, flist, relay, new):
chanfree = getChanFree(net, new)
if not chanfree:
return
for i in new:
chanfree[0][i] = chanfree[1]
allocated = {}
toalloc = len(flist)
if toalloc > sum(chanfree[0].values()):
correction = round(toalloc - sum(chanfree[0].values()) / chanfree[1])
# print("correction", correction)
warn("Ran out of channel spaces, provisioning additional %i relays for %s" % (correction, net))
# newNums = modules.provision.provisionMultipleRelays(net, correction)
return False
for i in chanfree[0].keys():
for x in range(chanfree[0][i]):
if not len(flist):
break
if i in allocated.keys():
allocated[i].append(flist.pop())
else:
allocated[i] = [flist.pop()]
return allocated
def populateChans(net, clist, relay, new):
# divided = array_split(clist, relay)
allocated = emptyChanAllocate(net, clist, relay, new)
if not allocated:
return
for i in allocated.keys():
if net in main.TempChan.keys():
main.TempChan[net][i] = allocated[i]
else:
main.TempChan[net] = {i: allocated[i]}
def notifyJoin(net):
for i in main.network[net].relays.keys():
name = net + str(i)
if name in main.IRCPool.keys():
main.IRCPool[name].checkChannels()
def minifyChans(net, listinfo):
if not allRelaysActive(net):
error("All relays for %s are not active, cannot minify list" % net)
return False
for i in main.network[net].relays.keys():
name = net + str(i)
for x in main.IRCPool[name].channels:
for y in listinfo:
if y[0] == x:
listinfo.remove(y)
if not listinfo:
log("We're on all the channels we want to be on, dropping LIST")
return False
return listinfo
def keepChannels(net, listinfo, mean, sigrelay, relay):
listinfo = minifyChans(net, listinfo)
if not listinfo:
return
if relay <= main.config["ChanKeep"]["SigSwitch"]: # we can cover all of the channels
coverAll = True
elif relay > main.config["ChanKeep"]["SigSwitch"]: # we cannot cover all of the channels
coverAll = False
if not sigrelay <= main.config["ChanKeep"]["MaxRelay"]:
error("Network %s is too big to cover: %i relays required" % (net, sigrelay))
return
if coverAll:
needed = relay - len(main.network[net].relays.keys())
newNums = modules.provision.provisionMultipleRelays(net, needed)
flist = [i[0] for i in listinfo]
populateChans(net, flist, relay, newNums)
else:
needed = sigrelay - len(main.network[net].relays.keys())
newNums = modules.provision.provisionMultipleRelays(net, needed)
siglist = [i[0] for i in listinfo if int(i[1]) > mean]
populateChans(net, siglist, sigrelay, newNums)
notifyJoin(net)
def joinSingle(net, channel):
if allRelaysActive(net):
chanfree = getChanFree(net, [])
print("chanfree", chanfree)
for i in chanfree[0]:
if chanfree[0][i] < 0:
print("JOIN CHAN")
else:
error("All relays for %s are not active" % net)
return False
def nukeNetwork(net):
# purgeRecords(net)
# p = main.g.pipeline()
main.g.delete("analytics.list." + net)
# p.delete("list."+net)
# p.execute()
# def nukeNetwork(net):
# deferToThread(_nukeNetwork, net)
def _initialList(net, num, listinfo, chanlimit):
listlength = len(listinfo)
cumul = 0
try:
cumul += sum(int(i[1]) for i in listinfo)
except TypeError:
warn("Bad LIST data received from %s - %i" % (net, num))
return
mean = round(cumul / listlength, 2)
siglength = 0
insiglength = 0
sigcumul = 0
insigcumul = 0
for i in listinfo:
if int(i[1]) > mean:
siglength += 1
sigcumul += int(i[1])
elif int(i[1]) < mean:
insiglength += 1
insigcumul += int(i[1])
sigrelay = ceil(siglength / chanlimit)
relay = ceil(listlength / chanlimit)
# netbase = "list.%s" % net
abase = "analytics.list.%s" % net
p = main.g.pipeline()
p.hset(abase, "mean", mean)
p.hset(abase, "total", listlength)
p.hset(abase, "sigtotal", siglength)
p.hset(abase, "insigtotal", insiglength)
p.hset(abase, "sigperc", round(siglength / listlength * 100, 2))
p.hset(abase, "insigperc", round(insiglength / listlength * 100, 2))
p.hset(abase, "cumul", cumul)
p.hset(abase, "sigcumul", sigcumul)
p.hset(abase, "insigcumul", insigcumul)
p.hset(abase, "relay", relay)
p.hset(abase, "sigrelay", sigrelay)
p.hset(abase, "insigrelay", ceil(insiglength / chanlimit))
# Purge existing records before writing
# purgeRecords(net)
# for i in listinfo:
# p.rpush(netbase+"."+i[0], i[1])
# p.rpush(netbase+"."+i[0], i[2])
# p.sadd(netbase, i[0])
p.execute()
debug("List parsing completed on %s" % net)
keepChannels(net, listinfo, mean, sigrelay, relay)
def initialList(net, num, listinfo, chanlimit):
deferToThread(_initialList, net, num, deepcopy(listinfo), chanlimit)