Fix a bug where the channel allocation algorithm failed due to necessary relays not having been provisioned yet. Passed the newly created relay numbers to the allocation function and assumed their CHANMAX would be the same as all other relays for the same network.
192 lines
6.1 KiB
Python
192 lines
6.1 KiB
Python
import main
|
|
from utils.logging.log import *
|
|
from utils.logging.debug import *
|
|
from copy import deepcopy
|
|
from math import ceil
|
|
from modules.provision import provisionMultipleRelays
|
|
from twisted.internet.threads import deferToThread
|
|
|
|
def allRelaysActive(net):
|
|
relayNum = len(main.network[net].relays.keys())
|
|
existNum = 0
|
|
for i in main.network[net].relays.keys():
|
|
name = net+str(i)
|
|
if name in main.IRCPool.keys():
|
|
if main.IRCPool[name].connected:
|
|
existNum += 1
|
|
if existNum == relayNum:
|
|
return True
|
|
return False
|
|
|
|
def getChanFree(net, new):
|
|
chanfree = {}
|
|
chanlimits = set()
|
|
for i in main.network[net].relays.keys():
|
|
if i in new:
|
|
continue
|
|
name = net+str(i)
|
|
chanfree[i] = main.IRCPool[name].chanlimit-len(main.IRCPool[name].channels)
|
|
chanlimits.add(main.IRCPool[name].chanlimit)
|
|
|
|
if not len(chanlimits) == 1:
|
|
error("Network %s has servers with different CHANMAX values" % net)
|
|
return False
|
|
return (chanfree, chanlimits.pop())
|
|
|
|
def emptyChanAllocate(net, flist, relay, new):
|
|
chanfree = getChanFree(net, new)
|
|
if not chanfree:
|
|
return
|
|
for i in new:
|
|
chanfree[0][i] = chanfree[1]
|
|
print("chanfree", chanfree)
|
|
allocated = {}
|
|
toalloc = len(flist)
|
|
if toalloc > sum(chanfree[0].values()):
|
|
error("Too many channels to allocate for %s - this is probably a bug" % net)
|
|
return False
|
|
for i in chanfree[0].keys():
|
|
for x in range(chanfree[0][i]):
|
|
if not len(flist):
|
|
break
|
|
if i in allocated.keys():
|
|
allocated[i].append(flist.pop())
|
|
else:
|
|
allocated[i] = [flist.pop()]
|
|
return allocated
|
|
|
|
def populateChans(net, clist, relay, new):
|
|
#divided = array_split(clist, relay)
|
|
allocated = emptyChanAllocate(net, clist, relay, new)
|
|
if not allocated:
|
|
return
|
|
for i in allocated.keys():
|
|
if net in main.TempChan.keys():
|
|
main.TempChan[net][i] = allocated[i]
|
|
else:
|
|
main.TempChan[net] = {i: allocated[i]}
|
|
|
|
def notifyJoin(net):
|
|
for i in main.network[net].relays.keys():
|
|
name = net+str(i)
|
|
if name in main.IRCPool.keys():
|
|
main.IRCPool[name].checkChannels()
|
|
|
|
def minifyChans(net, listinfo):
|
|
if not allRelaysActive(net):
|
|
error("All relays for %s are not active, cannot minify list")
|
|
return False
|
|
for i in main.network[net].relays.keys():
|
|
name = net+str(i)
|
|
for x in main.IRCPool[name].channels:
|
|
for y in listinfo:
|
|
if y[0] == x:
|
|
listinfo.remove(y)
|
|
if not listinfo:
|
|
log("We're on all the channels we want to be on, dropping LIST")
|
|
return False
|
|
return listinfo
|
|
|
|
def keepChannels(net, listinfo, mean, sigrelay, relay):
|
|
#print("list", listinfo)
|
|
#print("sigrelay", sigrelay)
|
|
#print("cur", len(main.network[net].relays.keys()))
|
|
listinfo = minifyChans(net, listinfo)
|
|
if not listinfo:
|
|
return
|
|
if relay <= main.config["ChanKeep"]["SigSwitch"]: # we can cover all of the channels
|
|
coverAll = True
|
|
elif relay > main.config["ChanKeep"]["SigSwitch"]: # we cannot cover all of the channels
|
|
coverAll = False
|
|
if not sigrelay <= main.config["ChanKeep"]["MaxRelay"]:
|
|
error("Network %s is too big to cover: %i relays required" % (net, sigrelay))
|
|
return
|
|
if coverAll:
|
|
needed = relay-len(main.network[net].relays.keys())
|
|
newNums = provisionMultipleRelays(net, needed)
|
|
flist = [i[0] for i in listinfo]
|
|
populateChans(net, flist, relay, newNums)
|
|
else:
|
|
needed = sigrelay-len(main.network[net].relays.keys())
|
|
newNums = provisionMultipleRelays(net, needed)
|
|
siglist = [i[0] for i in listinfo if int(i[1]) > mean]
|
|
populateChans(net, siglist, sigrelay, newNums)
|
|
notifyJoin(net)
|
|
|
|
#print("coverall", coverAll)
|
|
#print("needed", needed)
|
|
|
|
#def purgeRecords(net):
|
|
# base = "list.%s" % net
|
|
# p = main.g.pipeline()
|
|
# existingChans = main.g.smembers("list."+net)
|
|
# for i in existingChans:
|
|
# p.delete(base+"."+i.decode("utf-8"))
|
|
# p.execute()
|
|
|
|
def nukeNetwork(net):
|
|
#purgeRecords(net)
|
|
#p = main.g.pipeline()
|
|
main.g.delete("analytics.list."+net)
|
|
#p.delete("list."+net)
|
|
#p.execute()
|
|
|
|
#def nukeNetwork(net):
|
|
# deferToThread(_nukeNetwork, net)
|
|
|
|
|
|
def _initialList(net, num, listinfo, chanlimit):
|
|
#listinfo = sorted(listinfo, key=lambda x: xdd[0])
|
|
listlength = len(listinfo)
|
|
cumul = 0
|
|
try:
|
|
cumul += sum(int(i[1]) for i in listinfo)
|
|
except TypeError:
|
|
warn("Bad LIST data received from %s - %i" % (net, num))
|
|
return
|
|
mean = round(cumul/listlength, 2)
|
|
siglength = 0
|
|
insiglength = 0
|
|
sigcumul = 0
|
|
insigcumul = 0
|
|
for i in listinfo:
|
|
if int(i[1]) > mean:
|
|
siglength += 1
|
|
sigcumul += int(i[1])
|
|
elif int(i[1]) < mean:
|
|
insiglength += 1
|
|
insigcumul += int(i[1])
|
|
|
|
sigrelay = ceil(siglength/chanlimit)
|
|
relay = ceil(listlength/chanlimit)
|
|
netbase = "list.%s" % net
|
|
abase = "analytics.list.%s" % net
|
|
p = main.g.pipeline()
|
|
p.hset(abase, "mean", mean)
|
|
p.hset(abase, "total", listlength)
|
|
p.hset(abase, "sigtotal", siglength)
|
|
p.hset(abase, "insigtotal", insiglength)
|
|
p.hset(abase, "sigperc", round(siglength/listlength*100, 2))
|
|
p.hset(abase, "insigperc", round(insiglength/listlength*100, 2))
|
|
p.hset(abase, "cumul", cumul)
|
|
p.hset(abase, "sigcumul", sigcumul)
|
|
p.hset(abase, "insigcumul", insigcumul)
|
|
p.hset(abase, "relay", relay)
|
|
p.hset(abase, "sigrelay", sigrelay)
|
|
p.hset(abase, "insigrelay", ceil(insiglength/chanlimit))
|
|
|
|
# Purge existing records before writing
|
|
#purgeRecords(net)
|
|
#for i in listinfo:
|
|
# p.rpush(netbase+"."+i[0], i[1])
|
|
# p.rpush(netbase+"."+i[0], i[2])
|
|
# p.sadd(netbase, i[0])
|
|
|
|
p.execute()
|
|
debug("List parsing completed on %s" % net)
|
|
keepChannels(net, listinfo, mean, sigrelay, relay)
|
|
|
|
def initialList(net, num, listinfo, chanlimit):
|
|
deferToThread(_initialList, net, num, deepcopy(listinfo), chanlimit)
|
|
|