Implement indexing into Apache Druid #1
|
@ -61,7 +61,7 @@ def getChanFree(net, new):
|
||||||
return (chanfree, chanlimits.pop())
|
return (chanfree, chanlimits.pop())
|
||||||
|
|
||||||
|
|
||||||
def emptyChanAllocate(net, flist, relay, new):
|
def emptyChanAllocate(net, flist, new):
|
||||||
chanfree = getChanFree(net, new)
|
chanfree = getChanFree(net, new)
|
||||||
if not chanfree:
|
if not chanfree:
|
||||||
return
|
return
|
||||||
|
@ -102,20 +102,21 @@ def emptyChanAllocate(net, flist, relay, new):
|
||||||
flist = flist[:sum_free]
|
flist = flist[:sum_free]
|
||||||
debug(f"emptyChanAllocate() flist truncated to {sum_free}, length nis now {len(flist)}")
|
debug(f"emptyChanAllocate() flist truncated to {sum_free}, length nis now {len(flist)}")
|
||||||
trace(f"emptyChanAllocate() best effort allocation: {flist}")
|
trace(f"emptyChanAllocate() best effort allocation: {flist}")
|
||||||
|
newlist = list(flist)
|
||||||
for i in chanfree[0].keys():
|
for i in chanfree[0].keys():
|
||||||
for x in range(chanfree[0][i]):
|
for x in range(chanfree[0][i]):
|
||||||
if not len(flist):
|
if not len(newlist):
|
||||||
break
|
break
|
||||||
if i in allocated.keys():
|
if i in allocated.keys():
|
||||||
allocated[i].append(flist.pop())
|
allocated[i].append(newlist.pop())
|
||||||
else:
|
else:
|
||||||
allocated[i] = [flist.pop()]
|
allocated[i] = [newlist.pop()]
|
||||||
return allocated
|
return allocated
|
||||||
|
|
||||||
|
|
||||||
def populateChans(net, clist, relay, new):
|
def populateChans(net, clist, new):
|
||||||
# divided = array_split(clist, relay)
|
# divided = array_split(clist, relay)
|
||||||
allocated = emptyChanAllocate(net, clist, relay, new)
|
allocated = emptyChanAllocate(net, clist, new)
|
||||||
if not allocated:
|
if not allocated:
|
||||||
return
|
return
|
||||||
for i in allocated.keys():
|
for i in allocated.keys():
|
||||||
|
@ -148,7 +149,7 @@ def minifyChans(net, listinfo):
|
||||||
return listinfo
|
return listinfo
|
||||||
|
|
||||||
|
|
||||||
def keepChannels(net, listinfo, mean, sigrelay, relay):
|
def keepChannels(net, listinfo, mean, sigrelay, relay, chanlimit):
|
||||||
listinfo = minifyChans(net, listinfo)
|
listinfo = minifyChans(net, listinfo)
|
||||||
if not listinfo:
|
if not listinfo:
|
||||||
return
|
return
|
||||||
|
@ -159,23 +160,27 @@ def keepChannels(net, listinfo, mean, sigrelay, relay):
|
||||||
if not sigrelay <= main.config["ChanKeep"]["MaxRelay"]:
|
if not sigrelay <= main.config["ChanKeep"]["MaxRelay"]:
|
||||||
error("Network %s is too big to cover: %i relays required" % (net, sigrelay))
|
error("Network %s is too big to cover: %i relays required" % (net, sigrelay))
|
||||||
return
|
return
|
||||||
|
num_instances = len(getActiveRelays(net))
|
||||||
|
max_chans = chanlimit * num_instances
|
||||||
if coverAll:
|
if coverAll:
|
||||||
needed = relay - len(getActiveRelays(net))
|
needed = relay - len(getActiveRelays(net))
|
||||||
debug(f"keepChannels() coverAll asking to provision {needed} relays for {net} relay:{relay}")
|
debug(f"keepChannels() coverAll asking to provision {needed} relays for {net} relay:{relay}")
|
||||||
newNums = modules.provision.provisionMultipleRelays(net, needed)
|
newNums = modules.provision.provisionMultipleRelays(net, needed)
|
||||||
flist = [i[0] for i in listinfo]
|
flist = [i[0] for i in listinfo]
|
||||||
populateChans(net, flist, relay, newNums)
|
chosen = sorted(flist, reverse=True, key=lambda x: x[1])[:max_chans]
|
||||||
|
populateChans(net, chosen, newNums)
|
||||||
else:
|
else:
|
||||||
needed = sigrelay - len(getActiveRelays(net))
|
needed = sigrelay - len(getActiveRelays(net))
|
||||||
debug(f"keepChannels() NOT coverAll asking to provision {needed} relays for {net} sigrelay:{sigrelay}")
|
debug(f"keepChannels() NOT coverAll asking to provision {needed} relays for {net} sigrelay:{sigrelay}")
|
||||||
newNums = modules.provision.provisionMultipleRelays(net, needed)
|
newNums = modules.provision.provisionMultipleRelays(net, needed)
|
||||||
siglist = [i[0] for i in listinfo if int(i[1]) > mean]
|
siglist = [i[0] for i in listinfo if int(i[1]) > mean]
|
||||||
populateChans(net, siglist, sigrelay, newNums)
|
chosen = sorted(siglist, reverse=True, key=lambda x: x[1])[:max_chans]
|
||||||
|
populateChans(net, chosen, newNums)
|
||||||
notifyJoin(net)
|
notifyJoin(net)
|
||||||
|
|
||||||
|
|
||||||
def joinSingle(net, channel):
|
def joinSingle(net, channel):
|
||||||
eca = emptyChanAllocate(net, [channel], None, [])
|
eca = emptyChanAllocate(net, [channel], [])
|
||||||
if not eca:
|
if not eca:
|
||||||
return False
|
return False
|
||||||
if not len(eca.keys()) == 1:
|
if not len(eca.keys()) == 1:
|
||||||
|
@ -272,8 +277,22 @@ def _initialList(net, num, listinfo, chanlimit):
|
||||||
|
|
||||||
p.execute()
|
p.execute()
|
||||||
debug("List parsing completed on %s" % net)
|
debug("List parsing completed on %s" % net)
|
||||||
keepChannels(net, listinfo, mean, sigrelay, relay)
|
keepChannels(net, listinfo, mean, sigrelay, relay, chanlimit)
|
||||||
|
|
||||||
|
# return (listinfo, mean, sigrelay, relay)
|
||||||
|
|
||||||
|
|
||||||
def initialList(net, num, listinfo, chanlimit):
|
def initialList(net, num, listinfo, chanlimit):
|
||||||
deferToThread(_initialList, net, num, deepcopy(listinfo), chanlimit)
|
deferToThread(_initialList, net, num, deepcopy(listinfo), chanlimit)
|
||||||
|
|
||||||
|
|
||||||
|
def chankeep_handler(net, num, listinfo, chanlimit):
|
||||||
|
"""
|
||||||
|
Handle a channel keep request.
|
||||||
|
:param net:
|
||||||
|
:param num:
|
||||||
|
:param listinfo:
|
||||||
|
:param chanlimit:
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
listinfo, mean, sigrelay, relay = _initialList(net, num, listinfo, chanlimit)
|
|
@ -0,0 +1,3 @@
|
||||||
|
#!/bin/sh
|
||||||
|
#pre-commit run -a
|
||||||
|
python -m unittest discover -s tests -p 'test_*.py'
|
|
@ -0,0 +1,109 @@
|
||||||
|
import logging
|
||||||
|
from unittest import TestCase
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
from random import randint
|
||||||
|
from modules import chankeep
|
||||||
|
from math import ceil
|
||||||
|
import heapq
|
||||||
|
|
||||||
|
class TestChanKeep(TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.net = "testnet"
|
||||||
|
self.num = 1
|
||||||
|
self.chanlimit = 100
|
||||||
|
chankeep.main.initConf()
|
||||||
|
chankeep.main.r = MagicMock()
|
||||||
|
chankeep.main.g = MagicMock()
|
||||||
|
chankeep.main.g.pipeline = MagicMock()
|
||||||
|
chankeep.main.config["ChanKeep"]["Provision"] = False
|
||||||
|
|
||||||
|
self.listinfo = self.generate_listinfo()
|
||||||
|
self.chan_name_list = [x[0] for x in self.listinfo]
|
||||||
|
self.chan_member_list = [x[1] for x in self.listinfo]
|
||||||
|
|
||||||
|
def generate_listinfo(self, ranges=None):
|
||||||
|
"""
|
||||||
|
Create a fake listinfo.
|
||||||
|
Where #channel has 192 users, and #channel2 has 188 users.
|
||||||
|
listinfo = [["#channel", 192], ["#channel2", 188]]
|
||||||
|
"""
|
||||||
|
if not ranges:
|
||||||
|
ranges = [[100, 5, 10], [400, 100, 200], [2, 500, 1000]]
|
||||||
|
listinfo = []
|
||||||
|
for channum, min, max in ranges:
|
||||||
|
for i in range(channum):
|
||||||
|
chan_name = f"#num-{channum}-{i}"
|
||||||
|
chan_users = randint(min, max)
|
||||||
|
listinfo.append([chan_name, chan_users])
|
||||||
|
return listinfo
|
||||||
|
|
||||||
|
def percent_diff(self, a, b):
|
||||||
|
return (abs(b - a) / a) * 100.0
|
||||||
|
|
||||||
|
def test_alt_listinfo(self):
|
||||||
|
# We're looking for a perc of 1000-1100
|
||||||
|
# And a sigrelay of 2
|
||||||
|
# We only want those 10 big channels
|
||||||
|
instances = 1
|
||||||
|
chanlimit = 5
|
||||||
|
max_chans = instances * chanlimit
|
||||||
|
listinfo = self.generate_listinfo(ranges=[[1000, 1, 2], [200, 400, 800], [10, 1000, 2000]])
|
||||||
|
listinfo_num = [x[1] for x in listinfo]
|
||||||
|
|
||||||
|
listlength = len(listinfo)
|
||||||
|
cumul = 0
|
||||||
|
try:
|
||||||
|
cumul += sum(int(i[1]) for i in listinfo)
|
||||||
|
except TypeError:
|
||||||
|
return
|
||||||
|
mean = round(cumul / listlength, 2)
|
||||||
|
siglength = 0
|
||||||
|
insiglength = 0
|
||||||
|
sigcumul = 0
|
||||||
|
insigcumul = 0
|
||||||
|
for i in listinfo:
|
||||||
|
if int(i[1]) > mean:
|
||||||
|
siglength += 1
|
||||||
|
sigcumul += int(i[1])
|
||||||
|
elif int(i[1]) < mean:
|
||||||
|
insiglength += 1
|
||||||
|
insigcumul += int(i[1])
|
||||||
|
|
||||||
|
sigrelay = ceil(siglength / chanlimit)
|
||||||
|
relay = ceil(listlength / chanlimit)
|
||||||
|
print(f"len:{listlength} cumul:{cumul} mean:{mean} siglength:{siglength} insiglength:{insiglength} sigrelay:{sigrelay} relay:{relay} sigcumul:{sigcumul} insigcumul:{insigcumul}")
|
||||||
|
# We want a return between 1000 and 1100
|
||||||
|
|
||||||
|
list_insig = [x for x in listinfo_num if x < mean]
|
||||||
|
list_sig = [x for x in listinfo if x[1] > mean]
|
||||||
|
chosen = sorted(list_sig, reverse=True, key=lambda x: x[1])[:max_chans]
|
||||||
|
print("CHOSEN", chosen)
|
||||||
|
self.assertEqual(len(chosen), 5)
|
||||||
|
|
||||||
|
@patch("modules.chankeep.keepChannels")
|
||||||
|
def test__initialList(self, keepchannels):
|
||||||
|
chankeep._initialList(self.net, self.num, self.listinfo, self.chanlimit)
|
||||||
|
net, passed_list, mean, sigrelay, relay, chanlimit = keepchannels.call_args_list[0][0]
|
||||||
|
self.assertEqual(net, self.net)
|
||||||
|
self.assertEqual(passed_list, self.listinfo)
|
||||||
|
self.assertEqual(chanlimit, self.chanlimit)
|
||||||
|
# print(net, mean, sigrelay, relay)
|
||||||
|
|
||||||
|
@patch("modules.chankeep.getChanFree")
|
||||||
|
def test_empty_chan_allocate(self, getchanfree):
|
||||||
|
getchanfree.return_value = ({1: 600}, 600) # pretend we have 600 channels free
|
||||||
|
eca = chankeep.emptyChanAllocate(self.net, self.chan_name_list, [])
|
||||||
|
self.assertEqual(len(eca), 1)
|
||||||
|
num = list(eca.keys())[0]
|
||||||
|
chans = eca[list(eca.keys())[0]]
|
||||||
|
self.assertEqual(num, self.num)
|
||||||
|
self.assertCountEqual(chans, self.chan_name_list)
|
||||||
|
|
||||||
|
getchanfree.return_value = ({1: 100}, 10)
|
||||||
|
eca = chankeep.emptyChanAllocate(self.net, self.chan_name_list, [])
|
||||||
|
self.assertEqual(len(eca), 1)
|
||||||
|
num = list(eca.keys())[0]
|
||||||
|
chans = eca[list(eca.keys())[0]]
|
||||||
|
self.assertEqual(num, self.num)
|
||||||
|
self.assertEqual(len(chans), 100)
|
||||||
|
#self.assertCountEqual(chans, self.chan_name_list)
|
Loading…
Reference in New Issue