monolith/modules/chankeep.py

415 lines
14 KiB
Python
Raw Normal View History

2019-10-08 20:10:42 +00:00
from copy import deepcopy
from math import ceil
2022-07-21 12:40:09 +00:00
from twisted.internet.threads import deferToThread
2022-07-21 12:39:59 +00:00
2022-07-21 12:40:09 +00:00
import main
import modules.provision
2022-08-11 20:44:19 +00:00
from utils.logging.debug import debug, trace
2022-07-21 12:40:09 +00:00
from utils.logging.log import error, log, warn
2022-07-21 12:39:41 +00:00
2022-08-11 18:22:09 +00:00
def getActiveRelays(net):
2022-08-12 22:53:02 +00:00
"""
Get a list of active relays for a network.
:param net: network
:rtype: list of int
:return: list of active relay numbers"""
2022-08-11 18:22:09 +00:00
activeRelays = [x for x in main.network[net].relays.keys() if main.network[net].relays[x]["enabled"]]
return activeRelays
def allRelaysActive(net):
2022-08-11 18:22:09 +00:00
"""
Check if all enabled relays are active and authenticated.
2022-08-12 22:53:02 +00:00
:param net: network
:rtype: bool
:return: True if all relays are active and authenticated, False otherwise
2022-08-11 18:22:09 +00:00
"""
activeRelays = getActiveRelays(net)
debug(f"allRelaysActive() active relays for {net}: {activeRelays}")
relayNum = len(activeRelays)
existNum = 0
2022-08-11 18:22:09 +00:00
for i in activeRelays:
2022-07-21 12:39:41 +00:00
name = net + str(i)
if name in main.IRCPool.keys():
if main.IRCPool[name].authenticated:
existNum += 1
debug(f"allRelaysActive() finished, {existNum}/{relayNum} relays active for {net}")
if existNum == relayNum:
return True
return False
2022-07-21 12:39:41 +00:00
def getChanFree(net, new):
"""
Get a dictionary with the free channel spaces for
each relay, and a channel limit.
Example return:
({1: 99}, 100)
:param net: network
:param new: list of newly provisioned relays to skip
:return: ({relay: channel spaces}, channel limit)
"""
chanfree = {}
chanlimits = set()
2022-08-11 18:22:09 +00:00
for i in getActiveRelays(net):
if i in new:
continue
2022-07-21 12:39:41 +00:00
name = net + str(i)
if name not in main.IRCPool.keys():
continue
if not main.IRCPool[name].isconnected:
continue
2022-07-21 12:39:41 +00:00
chanfree[i] = main.IRCPool[name].chanlimit - len(main.IRCPool[name].channels)
chanlimits.add(main.IRCPool[name].chanlimit)
if not len(chanlimits) == 1:
error("Network %s has servers with different CHANLIMIT values" % net)
return False
return (chanfree, chanlimits.pop())
2022-08-12 22:32:00 +00:00
def getTotalChans(net):
2022-08-12 22:53:02 +00:00
"""
Get the total number of channels on all relays for a network.
:param net: network
:rtype: int
:return: total number of channels
"""
total = 0
for i in getActiveRelays(net):
name = net + str(i)
if name in main.IRCPool.keys():
total += len(main.IRCPool[name].channels)
return total
2022-07-21 12:39:41 +00:00
def emptyChanAllocate(net, flist, new):
2022-08-12 22:53:02 +00:00
"""
Allocate channels to relays.
:param net: network
:param flist: list of channels to allocate
:param new: list of newly provisioned relays to account for
:rtype: dict
:return: dictionary of {relay: list of channels}"""
# Get the free channel spaces for each relay
chanfree = getChanFree(net, new)
if not chanfree:
return
2022-08-12 22:53:02 +00:00
# Pretend the newly provisioned relays are already on the network
for i in new:
chanfree[0][i] = chanfree[1]
allocated = {}
2022-08-12 22:53:02 +00:00
# Copy the list since we're going to mutate it
toalloc = len(flist)
2022-08-12 22:53:02 +00:00
2022-08-11 19:51:41 +00:00
# Used to correct allocations and provision additional relays
# if the math since the last LIST is a bit wrong
# toalloc:2148 free:{1: 250} chanlimit:250 correction:2147
2022-08-12 22:53:02 +00:00
newlist = list(flist)
if toalloc > sum(chanfree[0].values()):
2022-08-11 19:51:41 +00:00
sum_free = sum(chanfree[0].values()) # 250
chans_not_covered = toalloc - sum_free # 2148 - 250 = 1898
relays_needed = chans_not_covered / chanfree[1] # 1898 / 250 = 7.592
correction = ceil(relays_needed)
2022-08-11 20:44:19 +00:00
if main.config["ChanKeep"]["Provision"]:
debug(
(
f"emptyChanAllocate() secondary allocation sum_free:{sum_free} "
f"chans_not_covered:{chans_not_covered} relays_needed:{relays_needed} "
f"correction:{correction}"
)
2022-08-11 19:51:41 +00:00
)
2022-08-11 20:44:19 +00:00
debug(
(
f"emptyChanAllocate() not enough free channels: toalloc:{toalloc} "
f"free:{chanfree[0]} chanlimit:{chanfree[1]} correction:{correction}"
)
2022-08-11 19:51:41 +00:00
)
2022-08-11 20:44:19 +00:00
warn("Ran out of channel spaces, provisioning additional %i relays for %s" % (correction, net))
modules.provision.provisionMultipleRelays(net, correction)
return False
else:
# We don't have enough spaces and we can't add any.
# Let's do the best we can in the circumstances.
debug(f"emptyChanAllocate() cannot create additional relays for {net}")
debug(f"emptyChanAllocate() {chans_not_covered} channels cannot be covered")
2022-08-12 22:53:02 +00:00
newlist = newlist[:sum_free]
2022-08-11 20:44:19 +00:00
debug(f"emptyChanAllocate() flist truncated to {sum_free}, length nis now {len(flist)}")
trace(f"emptyChanAllocate() best effort allocation: {flist}")
for i in chanfree[0].keys():
for x in range(chanfree[0][i]):
if not len(newlist):
break
if i in allocated.keys():
allocated[i].append(newlist.pop())
else:
allocated[i] = [newlist.pop()]
return allocated
2022-07-21 12:39:41 +00:00
def populateChans(net, clist, new):
2022-08-12 22:53:02 +00:00
"""
Populate channels on relays.
Stores channels to join in a list in main.TempChan[net][num]
:param net: network
:param clist: list of channels to join
:param new: list of newly provisioned relays to account for"""
2022-07-21 12:39:41 +00:00
# divided = array_split(clist, relay)
allocated = emptyChanAllocate(net, clist, new)
if not allocated:
return
for i in allocated.keys():
if net in main.TempChan.keys():
main.TempChan[net][i] = allocated[i]
else:
main.TempChan[net] = {i: allocated[i]}
2022-07-21 12:39:41 +00:00
def notifyJoin(net):
2022-08-12 22:53:02 +00:00
"""
Notify relays to join channels.
They will pull from main.TempChan and remove channels they join.
:param net: network
"""
2022-08-11 18:22:09 +00:00
for i in getActiveRelays(net):
2022-07-21 12:39:41 +00:00
name = net + str(i)
if name in main.IRCPool.keys():
main.IRCPool[name].checkChannels()
2022-07-21 12:39:41 +00:00
def minifyChans(net, listinfo):
2022-08-12 22:53:02 +00:00
"""
Remove channels from listinfo that are already covered by a relay.
:param net: network
:param listinfo: list of channels to check
:type listinfo: list of [channel, num_users]
:return: list of channels with joined channels removed
:rtype: list of [channel, num_users]
"""
if not allRelaysActive(net):
error("All relays for %s are not active, cannot minify list" % net)
return False
2022-08-11 18:22:09 +00:00
for i in getActiveRelays(net):
2022-07-21 12:39:41 +00:00
name = net + str(i)
for x in main.IRCPool[name].channels:
for y in listinfo:
if y[0] == x:
listinfo.remove(y)
if not listinfo:
log("We're on all the channels we want to be on, dropping LIST")
return False
return listinfo
2022-07-21 12:39:41 +00:00
def keepChannels(net, listinfo, mean, sigrelay, relay, chanlimit):
2022-08-12 22:53:02 +00:00
"""
Minify channels, determine whether we can cover all the channels
on the network, or need to use 'significant' mode.
Truncate the channel list to available channel spaces.
Allocate these channels to relays.
Notify relays that they should pull from TempChan to join.
:param net: network
:param listinfo: list of [channel, num_users] lists
:param mean: mean of channel population
:param sigrelay: number of relays needed to cover significant channels
:param relay: number of relays needed to cover all channels
:param chanlimit: maximum number of channels to allocate to a relay
"""
listinfo = minifyChans(net, listinfo)
if not listinfo:
return
if relay <= main.config["ChanKeep"]["SigSwitch"]: # we can cover all of the channels
coverAll = True
elif relay > main.config["ChanKeep"]["SigSwitch"]: # we cannot cover all of the channels
coverAll = False
if not sigrelay <= main.config["ChanKeep"]["MaxRelay"]:
error("Network %s is too big to cover: %i relays required" % (net, sigrelay))
return
num_instances = len(getActiveRelays(net))
debug(f"keepChannels() {net} instances:{num_instances} chanlimit:{chanlimit}")
chan_slots_used = getTotalChans(net)
debug(f"keepChannels() slots_used:{chan_slots_used}")
max_chans = (chanlimit * num_instances) - chan_slots_used
debug(f"keepChannels() max_chans:{max_chans}")
if coverAll:
2022-08-11 18:22:09 +00:00
needed = relay - len(getActiveRelays(net))
debug(f"keepChannels() coverAll asking to provision {needed} relays for {net} relay:{relay}")
newNums = modules.provision.provisionMultipleRelays(net, needed)
flist = [i[0] for i in listinfo]
chosen = sorted(flist, reverse=True, key=lambda x: x[1])[:max_chans]
populateChans(net, chosen, newNums)
else:
2022-08-11 18:22:09 +00:00
needed = sigrelay - len(getActiveRelays(net))
debug(f"keepChannels() NOT coverAll asking to provision {needed} relays for {net} sigrelay:{sigrelay}")
newNums = modules.provision.provisionMultipleRelays(net, needed)
siglist = [i[0] for i in listinfo if int(i[1]) > mean]
chosen = sorted(siglist, reverse=True, key=lambda x: x[1])[:max_chans]
populateChans(net, chosen, newNums)
notifyJoin(net)
2022-07-21 12:39:41 +00:00
2020-11-02 20:14:02 +00:00
def joinSingle(net, channel):
2022-08-12 22:53:02 +00:00
"""
Join a channel on a relay.
Use ECA to determine which relay to join on.
:param net: network
:param channel: channel to join
:return: relay number that joined the channel
:rtype: int
"""
eca = emptyChanAllocate(net, [channel], [])
if not eca:
return False
if not len(eca.keys()) == 1:
return False
num = list(eca.keys())[0]
name = f"{net}{num}"
if name not in main.IRCPool:
2020-11-02 20:14:02 +00:00
return False
main.IRCPool[name].join(channel)
return num
2020-11-02 20:14:02 +00:00
2022-07-21 12:39:41 +00:00
def partSingle(net, channel):
"""
Iterate over all the relays of net and part channels matching channel.
2022-08-12 22:53:02 +00:00
:param net: network
:param channel: channel to part
:return: list of relays that parted the channel
:rtype: list of str
"""
parted = []
2022-08-11 18:22:09 +00:00
for i in getActiveRelays(net):
name = f"{net}{i}"
if name in main.IRCPool.keys():
if channel in main.IRCPool[name].channels:
main.IRCPool[name].part(channel)
parted.append(str(i))
return parted
2019-10-08 20:10:42 +00:00
def nukeNetwork(net):
2022-08-12 22:53:02 +00:00
"""
Remove network records.
:param net: network"""
2022-07-21 12:39:41 +00:00
# purgeRecords(net)
# p = main.g.pipeline()
main.g.delete("analytics.list." + net)
# p.delete("list."+net)
# p.execute()
2022-07-21 12:39:41 +00:00
# def nukeNetwork(net):
# deferToThread(_nukeNetwork, net)
2019-10-08 20:10:42 +00:00
def _initialList(net, num, listinfo, chanlimit):
2022-08-12 22:53:02 +00:00
"""
Called when a relay receives a full LIST response.
Run statistics to determine how many channels are significant.
This is done by adding all the numbers of users on the channels together,
then dividing by the number of channels.
* cumul - cumulative sum of all channel membership
* siglength - number of significant channels
* listlength - number of channels in the list
* sigrelay - number of relays needed to cover siglength
* relay - number of relays needed to cover all channels
:param net: network
:param num: relay number
:param listinfo: list of [channel, num_users] lists
:param chanlimit: maximum number of channels the relay can join
"""
2019-10-08 20:10:42 +00:00
listlength = len(listinfo)
cumul = 0
try:
cumul += sum(int(i[1]) for i in listinfo)
except TypeError:
warn("Bad LIST data received from %s - %i" % (net, num))
return
2022-07-21 12:39:41 +00:00
mean = round(cumul / listlength, 2)
2019-10-08 20:10:42 +00:00
siglength = 0
insiglength = 0
sigcumul = 0
insigcumul = 0
for i in listinfo:
if int(i[1]) > mean:
2019-10-08 20:10:42 +00:00
siglength += 1
sigcumul += int(i[1])
elif int(i[1]) < mean:
2019-10-08 20:10:42 +00:00
insiglength += 1
insigcumul += int(i[1])
2022-07-21 12:39:41 +00:00
sigrelay = ceil(siglength / chanlimit)
relay = ceil(listlength / chanlimit)
2022-08-12 22:53:02 +00:00
2019-10-08 20:10:42 +00:00
abase = "analytics.list.%s" % net
main.g.delete(abase)
2019-10-08 20:10:42 +00:00
p = main.g.pipeline()
2022-08-12 22:53:02 +00:00
# See docstring for meanings
2019-10-08 20:10:42 +00:00
p.hset(abase, "mean", mean)
p.hset(abase, "total_chans", listlength)
p.hset(abase, "big_chans", siglength)
p.hset(abase, "small_chans", insiglength)
p.hset(abase, "big_chan_perc", round(siglength / listlength * 100, 2))
p.hset(abase, "small_chan_perc", round(insiglength / listlength * 100, 2))
p.hset(abase, "total_cumul_mem", cumul)
p.hset(abase, "big_chan_cumul_mem", sigcumul)
p.hset(abase, "small_chan_cumul_mem", insigcumul)
p.hset(abase, "relays_for_all_chans", relay)
p.hset(abase, "relays_for_big_chans", sigrelay)
p.hset(abase, "relays_for_small_chans", ceil(insiglength / chanlimit))
2022-08-11 19:32:49 +00:00
debug(
(
f"_initialList() net:{net} num:{num} listlength:{listlength} "
f"mean:{mean} siglength:{siglength} insiglength:{insiglength} "
f"sigrelay:{sigrelay} relay:{relay} chanlimit:{chanlimit}"
)
)
2019-10-08 20:10:42 +00:00
# Purge existing records before writing
2022-07-21 12:39:41 +00:00
# purgeRecords(net)
# for i in listinfo:
# p.rpush(netbase+"."+i[0], i[1])
# p.rpush(netbase+"."+i[0], i[2])
# p.sadd(netbase, i[0])
2019-10-08 20:10:42 +00:00
p.execute()
debug("List parsing completed on %s" % net)
keepChannels(net, listinfo, mean, sigrelay, relay, chanlimit)
# return (listinfo, mean, sigrelay, relay)
2019-10-08 20:10:42 +00:00
2022-08-13 12:32:22 +00:00
def convert(data):
"""
Recursively convert a dictionary.
"""
if isinstance(data, bytes):
return data.decode("ascii")
if isinstance(data, dict):
return dict(map(convert, data.items()))
if isinstance(data, tuple):
return map(convert, data)
if isinstance(data, list):
return list(map(convert, data))
return data
def getListInfo(net):
abase = f"analytics.list.{net}"
info = main.g.hgetall(abase)
return convert(info)
2022-07-21 12:39:41 +00:00
2019-10-08 20:10:42 +00:00
def initialList(net, num, listinfo, chanlimit):
"""
2022-08-12 22:53:02 +00:00
Run _initialList in a thread.
See above docstring.
"""
2022-08-12 22:53:02 +00:00
deferToThread(_initialList, net, num, deepcopy(listinfo), chanlimit)