Implement fair channel allocation in ChanKeep

* Allocate channels to relays only if they have free space based on
their chanlimit value
* Minify channels by removing ones that are already covered before
passing them off to be joined
This commit is contained in:
Mark Veidemanis 2019-10-12 21:05:55 +01:00
parent 7a6e3338c0
commit 0321651c20
4 changed files with 103 additions and 31 deletions

26
commands/list.py Normal file
View File

@ -0,0 +1,26 @@
import main
class ListCommand:
def __init__(self, *args):
self.list(*args)
def list(self, addr, authed, data, obj, spl, success, failure, info, incUsage, length):
if authed:
if length == 2:
if not spl[1] in main.network.keys():
failure("No such network: %s" % spl[1])
return
if not 1 in main.network[spl[1]].relays.keys():
failure("Network has no first instance")
return
if not spl[1]+"1" in main.IRCPool.keys():
failure("No IRC instance: %s - 1" % spl[1])
return
main.IRCPool[spl[1]+"1"].list()
success("Requested list with first instance of %s" % spl[1])
return
else:
incUsage("list")
return
else:
incUsage(None)

View File

@ -164,12 +164,15 @@ class IRCBot(IRCClient):
increment = 0.7 increment = 0.7
increment += 0.1 increment += 0.1
else: else:
print("Already on %s, skipping." % i) error("%s - %i - Cannot join channel we are already on - %s" % (self.net, self.num, i))
def checkChannels(self): def checkChannels(self):
if self.net in main.TempChan.keys(): if self.net in main.TempChan.keys():
if self.num in main.TempChan[self.net].keys(): if self.num in main.TempChan[self.net].keys():
self.joinChannels(main.TempChan[self.net][self.num]) self.joinChannels(main.TempChan[self.net][self.num])
del main.TempChan[self.net][self.num]
if not main.TempChan[self.net]:
del main.TempChan[self.net]
def event(self, **cast): def event(self, **cast):
for i in list(cast.keys()): # Make a copy of the .keys() as Python 3 cannot handle iterating over for i in list(cast.keys()): # Make a copy of the .keys() as Python 3 cannot handle iterating over
@ -392,7 +395,6 @@ class IRCBot(IRCClient):
def got_list(self, listinfo): def got_list(self, listinfo):
if len(listinfo) == 0: # probably ngircd not supporting LIST >0 if len(listinfo) == 0: # probably ngircd not supporting LIST >0
return return
chankeep.initialList(self.net, self.num, listinfo, self.chanlimit) chankeep.initialList(self.net, self.num, listinfo, self.chanlimit)
def isupport(self, options): def isupport(self, options):

View File

@ -13,18 +13,46 @@ def allRelaysActive(net):
for i in main.network[net].relays.keys(): for i in main.network[net].relays.keys():
name = net+str(i) name = net+str(i)
if name in main.IRCPool.keys(): if name in main.IRCPool.keys():
existNum += 1 if main.IRCPool[name].connected:
existNum += 1
if existNum == relayNum: if existNum == relayNum:
return True return True
return False return False
def getChanFree(net):
chanfree = {}
for i in main.network[net].relays.keys():
name = net+str(i)
chanfree[i] = main.IRCPool[name].chanlimit-len(main.IRCPool[name].channels)
return chanfree
def emptyChanAllocate(net, flist, relay):
chanfree = getChanFree(net)
allocated = {}
toalloc = len(flist)
if toalloc > sum(chanfree.values()):
error("Too many channels to allocate for %s - this is probably a bug" % net)
return False
for i in chanfree.keys():
for x in range(chanfree[i]):
if not len(flist):
break
if i in allocated.keys():
allocated[i].append(flist.pop())
else:
allocated[i] = [flist.pop()]
return allocated
def populateChans(net, clist, relay): def populateChans(net, clist, relay):
divided = array_split(clist, relay) #divided = array_split(clist, relay)
for i in range(0, len(divided)): allocated = emptyChanAllocate(net, clist, relay)
if not allocated:
return
for i in allocated.keys():
if net in main.TempChan.keys(): if net in main.TempChan.keys():
main.TempChan[net][i] = divided[i] main.TempChan[net][i] = allocated[i]
else: else:
main.TempChan[net] = {i: divided[i]} main.TempChan[net] = {i: allocated[i]}
def notifyJoin(net): def notifyJoin(net):
for i in main.network[net].relays.keys(): for i in main.network[net].relays.keys():
@ -32,10 +60,28 @@ def notifyJoin(net):
if name in main.IRCPool.keys(): if name in main.IRCPool.keys():
main.IRCPool[name].checkChannels() main.IRCPool[name].checkChannels()
def minifyChans(net, listinfo):
if not allRelaysActive(net):
error("All relays for %s are not active, cannot minify list")
return False
for i in main.network[net].relays.keys():
name = net+str(i)
for x in main.IRCPool[name].channels:
for y in listinfo:
if y[0] == x:
listinfo.remove(y)
if not listinfo:
log("We're on all the channels we want to be on, dropping LIST")
return False
return listinfo
def keepChannels(net, listinfo, mean, sigrelay, relay): def keepChannels(net, listinfo, mean, sigrelay, relay):
#print("list", listinfo) #print("list", listinfo)
#print("sigrelay", sigrelay) #print("sigrelay", sigrelay)
#print("cur", len(main.network[net].relays.keys())) #print("cur", len(main.network[net].relays.keys()))
listinfo = minifyChans(net, listinfo)
if not listinfo:
return
if relay <= main.config["ChanKeep"]["SigSwitch"]: # we can cover all of the channels if relay <= main.config["ChanKeep"]["SigSwitch"]: # we can cover all of the channels
coverAll = True coverAll = True
elif relay > main.config["ChanKeep"]["SigSwitch"]: # we cannot cover all of the channels elif relay > main.config["ChanKeep"]["SigSwitch"]: # we cannot cover all of the channels
@ -58,23 +104,24 @@ def keepChannels(net, listinfo, mean, sigrelay, relay):
#print("coverall", coverAll) #print("coverall", coverAll)
#print("needed", needed) #print("needed", needed)
def purgeRecords(net): #def purgeRecords(net):
base = "list.%s" % net # base = "list.%s" % net
p = main.g.pipeline() # p = main.g.pipeline()
existingChans = main.g.smembers("list."+net) # existingChans = main.g.smembers("list."+net)
for i in existingChans: # for i in existingChans:
p.delete(base+"."+i.decode("utf-8")) # p.delete(base+"."+i.decode("utf-8"))
p.execute() # p.execute()
def _nukeNetwork(net):
purgeRecords(net)
p = main.g.pipeline()
p.delete("analytics.list."+net)
p.delete("list."+net)
p.execute()
def nukeNetwork(net): def nukeNetwork(net):
deferToThread(_nukeNetwork, net) #purgeRecords(net)
#p = main.g.pipeline()
main.g.delete("analytics.list."+net)
#p.delete("list."+net)
#p.execute()
#def nukeNetwork(net):
# deferToThread(_nukeNetwork, net)
def _initialList(net, num, listinfo, chanlimit): def _initialList(net, num, listinfo, chanlimit):
#listinfo = sorted(listinfo, key=lambda x: xdd[0]) #listinfo = sorted(listinfo, key=lambda x: xdd[0])
@ -98,9 +145,6 @@ def _initialList(net, num, listinfo, chanlimit):
insiglength += 1 insiglength += 1
insigcumul += int(i[1]) insigcumul += int(i[1])
if not net in main.network.keys():
warn("Cannot write list info - no network entry for %s" % net)
return
sigrelay = ceil(siglength/chanlimit) sigrelay = ceil(siglength/chanlimit)
relay = ceil(listlength/chanlimit) relay = ceil(listlength/chanlimit)
netbase = "list.%s" % net netbase = "list.%s" % net
@ -118,12 +162,13 @@ def _initialList(net, num, listinfo, chanlimit):
p.hset(abase, "relay", relay) p.hset(abase, "relay", relay)
p.hset(abase, "sigrelay", sigrelay) p.hset(abase, "sigrelay", sigrelay)
p.hset(abase, "insigrelay", ceil(insiglength/chanlimit)) p.hset(abase, "insigrelay", ceil(insiglength/chanlimit))
# Purge existing records before writing # Purge existing records before writing
purgeRecords(net) #purgeRecords(net)
for i in listinfo: #for i in listinfo:
p.rpush(netbase+"."+i[0], i[1]) # p.rpush(netbase+"."+i[0], i[1])
p.rpush(netbase+"."+i[0], i[2]) # p.rpush(netbase+"."+i[0], i[2])
p.sadd(netbase, i[0]) # p.sadd(netbase, i[0])
p.execute() p.execute()
debug("List parsing completed on %s" % net) debug("List parsing completed on %s" % net)

View File

@ -65,7 +65,6 @@ def provisionRelay(num, network):
def provisionMultipleRelays(net, relaysNeeded): def provisionMultipleRelays(net, relaysNeeded):
for i in range(relaysNeeded): for i in range(relaysNeeded):
num, alias = main.network[net].add_relay() num, alias = main.network[net].add_relay()
print(relaysNeeded, "for", net, ":", num, alias)
provisionRelay(num, net) provisionRelay(num, net)
main.saveConf("network") main.saveConf("network")