191 lines
6.1 KiB
Python
191 lines
6.1 KiB
Python
import main
|
|
from utils.logging.log import *
|
|
from utils.logging.debug import *
|
|
from copy import deepcopy
|
|
from math import ceil
|
|
from modules.provision import provisionMultipleRelays
|
|
from twisted.internet.threads import deferToThread
|
|
|
|
def allRelaysActive(net):
|
|
relayNum = len(main.network[net].relays.keys())
|
|
existNum = 0
|
|
for i in main.network[net].relays.keys():
|
|
name = net+str(i)
|
|
if name in main.IRCPool.keys():
|
|
if main.IRCPool[name].isconnected:
|
|
existNum += 1
|
|
if existNum == relayNum:
|
|
return True
|
|
return False
|
|
|
|
def getChanFree(net, new):
|
|
chanfree = {}
|
|
chanlimits = set()
|
|
for i in main.network[net].relays.keys():
|
|
if i in new:
|
|
continue
|
|
name = net+str(i)
|
|
chanfree[i] = main.IRCPool[name].chanlimit-len(main.IRCPool[name].channels)
|
|
chanlimits.add(main.IRCPool[name].chanlimit)
|
|
if not len(chanlimits) == 1:
|
|
error("Network %s has servers with different CHANLIMIT values" % net)
|
|
print(chanlimits)
|
|
return False
|
|
return (chanfree, chanlimits.pop())
|
|
|
|
def emptyChanAllocate(net, flist, relay, new):
|
|
chanfree = getChanFree(net, new)
|
|
if not chanfree:
|
|
return
|
|
for i in new:
|
|
chanfree[0][i] = chanfree[1]
|
|
allocated = {}
|
|
toalloc = len(flist)
|
|
if toalloc > sum(chanfree[0].values()):
|
|
error("Too many channels to allocate for %s - this is probably a bug" % net)
|
|
return False
|
|
for i in chanfree[0].keys():
|
|
for x in range(chanfree[0][i]):
|
|
if not len(flist):
|
|
break
|
|
if i in allocated.keys():
|
|
allocated[i].append(flist.pop())
|
|
else:
|
|
allocated[i] = [flist.pop()]
|
|
return allocated
|
|
|
|
def populateChans(net, clist, relay, new):
|
|
#divided = array_split(clist, relay)
|
|
allocated = emptyChanAllocate(net, clist, relay, new)
|
|
if not allocated:
|
|
return
|
|
for i in allocated.keys():
|
|
if net in main.TempChan.keys():
|
|
main.TempChan[net][i] = allocated[i]
|
|
else:
|
|
main.TempChan[net] = {i: allocated[i]}
|
|
|
|
def notifyJoin(net):
|
|
for i in main.network[net].relays.keys():
|
|
name = net+str(i)
|
|
if name in main.IRCPool.keys():
|
|
main.IRCPool[name].checkChannels()
|
|
|
|
def minifyChans(net, listinfo):
|
|
if not allRelaysActive(net):
|
|
error("All relays for %s are not active, cannot minify list" % net)
|
|
return False
|
|
for i in main.network[net].relays.keys():
|
|
name = net+str(i)
|
|
for x in main.IRCPool[name].channels:
|
|
for y in listinfo:
|
|
if y[0] == x:
|
|
listinfo.remove(y)
|
|
if not listinfo:
|
|
log("We're on all the channels we want to be on, dropping LIST")
|
|
return False
|
|
return listinfo
|
|
|
|
def keepChannels(net, listinfo, mean, sigrelay, relay):
|
|
#print("list", listinfo)
|
|
#print("sigrelay", sigrelay)
|
|
#print("cur", len(main.network[net].relays.keys()))
|
|
listinfo = minifyChans(net, listinfo)
|
|
if not listinfo:
|
|
return
|
|
if relay <= main.config["ChanKeep"]["SigSwitch"]: # we can cover all of the channels
|
|
coverAll = True
|
|
elif relay > main.config["ChanKeep"]["SigSwitch"]: # we cannot cover all of the channels
|
|
coverAll = False
|
|
if not sigrelay <= main.config["ChanKeep"]["MaxRelay"]:
|
|
error("Network %s is too big to cover: %i relays required" % (net, sigrelay))
|
|
return
|
|
if coverAll:
|
|
needed = relay-len(main.network[net].relays.keys())
|
|
newNums = provisionMultipleRelays(net, needed)
|
|
flist = [i[0] for i in listinfo]
|
|
populateChans(net, flist, relay, newNums)
|
|
else:
|
|
needed = sigrelay-len(main.network[net].relays.keys())
|
|
newNums = provisionMultipleRelays(net, needed)
|
|
siglist = [i[0] for i in listinfo if int(i[1]) > mean]
|
|
populateChans(net, siglist, sigrelay, newNums)
|
|
notifyJoin(net)
|
|
|
|
#print("coverall", coverAll)
|
|
#print("needed", needed)
|
|
|
|
#def purgeRecords(net):
|
|
# base = "list.%s" % net
|
|
# p = main.g.pipeline()
|
|
# existingChans = main.g.smembers("list."+net)
|
|
# for i in existingChans:
|
|
# p.delete(base+"."+i.decode("utf-8"))
|
|
# p.execute()
|
|
|
|
def nukeNetwork(net):
|
|
#purgeRecords(net)
|
|
#p = main.g.pipeline()
|
|
main.g.delete("analytics.list."+net)
|
|
#p.delete("list."+net)
|
|
#p.execute()
|
|
|
|
#def nukeNetwork(net):
|
|
# deferToThread(_nukeNetwork, net)
|
|
|
|
|
|
def _initialList(net, num, listinfo, chanlimit):
|
|
#listinfo = sorted(listinfo, key=lambda x: xdd[0])
|
|
listlength = len(listinfo)
|
|
cumul = 0
|
|
try:
|
|
cumul += sum(int(i[1]) for i in listinfo)
|
|
except TypeError:
|
|
warn("Bad LIST data received from %s - %i" % (net, num))
|
|
return
|
|
mean = round(cumul/listlength, 2)
|
|
siglength = 0
|
|
insiglength = 0
|
|
sigcumul = 0
|
|
insigcumul = 0
|
|
for i in listinfo:
|
|
if int(i[1]) > mean:
|
|
siglength += 1
|
|
sigcumul += int(i[1])
|
|
elif int(i[1]) < mean:
|
|
insiglength += 1
|
|
insigcumul += int(i[1])
|
|
|
|
sigrelay = ceil(siglength/chanlimit)
|
|
relay = ceil(listlength/chanlimit)
|
|
netbase = "list.%s" % net
|
|
abase = "analytics.list.%s" % net
|
|
p = main.g.pipeline()
|
|
p.hset(abase, "mean", mean)
|
|
p.hset(abase, "total", listlength)
|
|
p.hset(abase, "sigtotal", siglength)
|
|
p.hset(abase, "insigtotal", insiglength)
|
|
p.hset(abase, "sigperc", round(siglength/listlength*100, 2))
|
|
p.hset(abase, "insigperc", round(insiglength/listlength*100, 2))
|
|
p.hset(abase, "cumul", cumul)
|
|
p.hset(abase, "sigcumul", sigcumul)
|
|
p.hset(abase, "insigcumul", insigcumul)
|
|
p.hset(abase, "relay", relay)
|
|
p.hset(abase, "sigrelay", sigrelay)
|
|
p.hset(abase, "insigrelay", ceil(insiglength/chanlimit))
|
|
|
|
# Purge existing records before writing
|
|
#purgeRecords(net)
|
|
#for i in listinfo:
|
|
# p.rpush(netbase+"."+i[0], i[1])
|
|
# p.rpush(netbase+"."+i[0], i[2])
|
|
# p.sadd(netbase, i[0])
|
|
|
|
p.execute()
|
|
debug("List parsing completed on %s" % net)
|
|
keepChannels(net, listinfo, mean, sigrelay, relay)
|
|
|
|
def initialList(net, num, listinfo, chanlimit):
|
|
deferToThread(_initialList, net, num, deepcopy(listinfo), chanlimit)
|
|
|