monolith/modules/chankeep.py
Mark Veidemanis 6ad6d6dc50 Fix channel allocation when relays are provisioned
Fix a bug where the channel allocation algorithm failed due to
necessary relays not having been provisioned yet.
Passed the newly created relay numbers to the allocation function
and assumed their CHANMAX would be the same as all other relays for
the same network.
2019-10-12 21:40:50 +01:00

192 lines
6.1 KiB
Python

import main
from utils.logging.log import *
from utils.logging.debug import *
from copy import deepcopy
from math import ceil
from modules.provision import provisionMultipleRelays
from twisted.internet.threads import deferToThread
def allRelaysActive(net):
relayNum = len(main.network[net].relays.keys())
existNum = 0
for i in main.network[net].relays.keys():
name = net+str(i)
if name in main.IRCPool.keys():
if main.IRCPool[name].connected:
existNum += 1
if existNum == relayNum:
return True
return False
def getChanFree(net, new):
chanfree = {}
chanlimits = set()
for i in main.network[net].relays.keys():
if i in new:
continue
name = net+str(i)
chanfree[i] = main.IRCPool[name].chanlimit-len(main.IRCPool[name].channels)
chanlimits.add(main.IRCPool[name].chanlimit)
if not len(chanlimits) == 1:
error("Network %s has servers with different CHANMAX values" % net)
return False
return (chanfree, chanlimits.pop())
def emptyChanAllocate(net, flist, relay, new):
chanfree = getChanFree(net, new)
if not chanfree:
return
for i in new:
chanfree[0][i] = chanfree[1]
print("chanfree", chanfree)
allocated = {}
toalloc = len(flist)
if toalloc > sum(chanfree[0].values()):
error("Too many channels to allocate for %s - this is probably a bug" % net)
return False
for i in chanfree[0].keys():
for x in range(chanfree[0][i]):
if not len(flist):
break
if i in allocated.keys():
allocated[i].append(flist.pop())
else:
allocated[i] = [flist.pop()]
return allocated
def populateChans(net, clist, relay, new):
#divided = array_split(clist, relay)
allocated = emptyChanAllocate(net, clist, relay, new)
if not allocated:
return
for i in allocated.keys():
if net in main.TempChan.keys():
main.TempChan[net][i] = allocated[i]
else:
main.TempChan[net] = {i: allocated[i]}
def notifyJoin(net):
for i in main.network[net].relays.keys():
name = net+str(i)
if name in main.IRCPool.keys():
main.IRCPool[name].checkChannels()
def minifyChans(net, listinfo):
if not allRelaysActive(net):
error("All relays for %s are not active, cannot minify list")
return False
for i in main.network[net].relays.keys():
name = net+str(i)
for x in main.IRCPool[name].channels:
for y in listinfo:
if y[0] == x:
listinfo.remove(y)
if not listinfo:
log("We're on all the channels we want to be on, dropping LIST")
return False
return listinfo
def keepChannels(net, listinfo, mean, sigrelay, relay):
#print("list", listinfo)
#print("sigrelay", sigrelay)
#print("cur", len(main.network[net].relays.keys()))
listinfo = minifyChans(net, listinfo)
if not listinfo:
return
if relay <= main.config["ChanKeep"]["SigSwitch"]: # we can cover all of the channels
coverAll = True
elif relay > main.config["ChanKeep"]["SigSwitch"]: # we cannot cover all of the channels
coverAll = False
if not sigrelay <= main.config["ChanKeep"]["MaxRelay"]:
error("Network %s is too big to cover: %i relays required" % (net, sigrelay))
return
if coverAll:
needed = relay-len(main.network[net].relays.keys())
newNums = provisionMultipleRelays(net, needed)
flist = [i[0] for i in listinfo]
populateChans(net, flist, relay, newNums)
else:
needed = sigrelay-len(main.network[net].relays.keys())
newNums = provisionMultipleRelays(net, needed)
siglist = [i[0] for i in listinfo if int(i[1]) > mean]
populateChans(net, siglist, sigrelay, newNums)
notifyJoin(net)
#print("coverall", coverAll)
#print("needed", needed)
#def purgeRecords(net):
# base = "list.%s" % net
# p = main.g.pipeline()
# existingChans = main.g.smembers("list."+net)
# for i in existingChans:
# p.delete(base+"."+i.decode("utf-8"))
# p.execute()
def nukeNetwork(net):
#purgeRecords(net)
#p = main.g.pipeline()
main.g.delete("analytics.list."+net)
#p.delete("list."+net)
#p.execute()
#def nukeNetwork(net):
# deferToThread(_nukeNetwork, net)
def _initialList(net, num, listinfo, chanlimit):
#listinfo = sorted(listinfo, key=lambda x: xdd[0])
listlength = len(listinfo)
cumul = 0
try:
cumul += sum(int(i[1]) for i in listinfo)
except TypeError:
warn("Bad LIST data received from %s - %i" % (net, num))
return
mean = round(cumul/listlength, 2)
siglength = 0
insiglength = 0
sigcumul = 0
insigcumul = 0
for i in listinfo:
if int(i[1]) > mean:
siglength += 1
sigcumul += int(i[1])
elif int(i[1]) < mean:
insiglength += 1
insigcumul += int(i[1])
sigrelay = ceil(siglength/chanlimit)
relay = ceil(listlength/chanlimit)
netbase = "list.%s" % net
abase = "analytics.list.%s" % net
p = main.g.pipeline()
p.hset(abase, "mean", mean)
p.hset(abase, "total", listlength)
p.hset(abase, "sigtotal", siglength)
p.hset(abase, "insigtotal", insiglength)
p.hset(abase, "sigperc", round(siglength/listlength*100, 2))
p.hset(abase, "insigperc", round(insiglength/listlength*100, 2))
p.hset(abase, "cumul", cumul)
p.hset(abase, "sigcumul", sigcumul)
p.hset(abase, "insigcumul", insigcumul)
p.hset(abase, "relay", relay)
p.hset(abase, "sigrelay", sigrelay)
p.hset(abase, "insigrelay", ceil(insiglength/chanlimit))
# Purge existing records before writing
#purgeRecords(net)
#for i in listinfo:
# p.rpush(netbase+"."+i[0], i[1])
# p.rpush(netbase+"."+i[0], i[2])
# p.sadd(netbase, i[0])
p.execute()
debug("List parsing completed on %s" % net)
keepChannels(net, listinfo, mean, sigrelay, relay)
def initialList(net, num, listinfo, chanlimit):
deferToThread(_initialList, net, num, deepcopy(listinfo), chanlimit)