monolith/legacy/tests/test_chankeep.py

120 lines
4.3 KiB
Python

from math import ceil
from random import randint
from unittest import TestCase
from unittest.mock import MagicMock, patch
from modules import chankeep
class TestChanKeep(TestCase):
def setUp(self):
self.net = "testnet"
self.num = 1
self.chanlimit = 100
chankeep.main.initConf()
chankeep.main.r = MagicMock()
chankeep.main.g = MagicMock()
chankeep.main.g.pipeline = MagicMock()
chankeep.main.config["ChanKeep"]["Provision"] = False
chankeep.getAverageChanlimit = MagicMock()
chankeep.getAverageChanlimit.return_value = self.chanlimit
self.listinfo = self.generate_listinfo()
self.chan_name_list = [x[0] for x in self.listinfo]
self.chan_member_list = [x[1] for x in self.listinfo]
def generate_listinfo(self, ranges=None):
"""
Create a fake listinfo.
Where #channel has 192 users, and #channel2 has 188 users.
listinfo = [["#channel", 192], ["#channel2", 188]]
"""
if not ranges:
ranges = [[100, 5, 10], [400, 100, 200], [2, 500, 1000]]
listinfo = []
for channum, min, max in ranges:
for i in range(channum):
chan_name = f"#num-{channum}-{i}"
chan_users = randint(min, max)
listinfo.append([chan_name, chan_users])
return listinfo
def percent_diff(self, a, b):
return (abs(b - a) / a) * 100.0
def test_alt_listinfo(self):
# We're looking for a perc of 1000-1100
# And a sigrelay of 2
# We only want those 10 big channels
instances = 1
chanlimit = 5
max_chans = instances * chanlimit
listinfo = self.generate_listinfo(
ranges=[[1000, 1, 2], [200, 400, 800], [10, 1000, 2000]]
)
# listinfo_num = [x[1] for x in listinfo]
listlength = len(listinfo)
cumul = 0
try:
cumul += sum(int(i[1]) for i in listinfo)
except TypeError:
return
mean = round(cumul / listlength, 2)
siglength = 0
insiglength = 0
sigcumul = 0
insigcumul = 0
for i in listinfo:
if int(i[1]) > mean:
siglength += 1
sigcumul += int(i[1])
elif int(i[1]) < mean:
insiglength += 1
insigcumul += int(i[1])
sigrelay = ceil(siglength / chanlimit)
relay = ceil(listlength / chanlimit)
print(
(
f"len:{listlength} cumul:{cumul} mean:{mean} "
f"siglength:{siglength} insiglength:{insiglength} "
f"sigrelay:{sigrelay} relay:{relay} sigcumul:{sigcumul} "
f"insigcumul:{insigcumul}"
)
)
# We want a return between 1000 and 1100
# list_insig = [x for x in listinfo_num if x < mean]
list_sig = [x for x in listinfo if x[1] > mean]
chosen = sorted(list_sig, reverse=True, key=lambda x: x[1])[:max_chans]
self.assertEqual(len(chosen), 5)
@patch("modules.chankeep.keepChannels")
def test__initialList(self, keepchannels):
chankeep._initialList(self.net, self.num, self.listinfo)
net, passed_list, mean, sigrelay, relay = keepchannels.call_args_list[0][0]
self.assertEqual(net, self.net)
self.assertEqual(passed_list, self.listinfo)
# self.assertEqual(chanlimit, self.chanlimit)
# print(net, mean, sigrelay, relay)
@patch("modules.chankeep.getChanFree")
def test_empty_chan_allocate(self, getchanfree):
getchanfree.return_value = ({1: 600}, 600) # pretend we have 600 channels free
eca = chankeep.emptyChanAllocate(self.net, self.chan_name_list)
self.assertEqual(len(eca), 1)
num = list(eca.keys())[0]
chans = eca[list(eca.keys())[0]]
self.assertEqual(num, self.num)
self.assertCountEqual(chans, self.chan_name_list)
getchanfree.return_value = ({1: 100}, 10)
eca = chankeep.emptyChanAllocate(self.net, self.chan_name_list)
self.assertEqual(len(eca), 1)
num = list(eca.keys())[0]
chans = eca[list(eca.keys())[0]]
self.assertEqual(num, self.num)
self.assertEqual(len(chans), 100)
# self.assertCountEqual(chans, self.chan_name_list)