Implement indexing into Apache Druid #1

Closed
m wants to merge 263 commits from druid into master
4 changed files with 64 additions and 94 deletions
Showing only changes of commit dd67e9cc8b - Show all commits

View File

@ -473,7 +473,7 @@ class IRCBot(IRCClient):
if len(listinfo) == 0: # probably ngircd not supporting LIST >0 if len(listinfo) == 0: # probably ngircd not supporting LIST >0
return return
if main.config["ChanKeep"]["Enabled"]: if main.config["ChanKeep"]["Enabled"]:
chankeep.initialList(self.net, self.num, listinfo, self.chanlimit) chankeep.initialList(self.net, self.num, listinfo)
def recheckList(self): def recheckList(self):
allRelays = chankeep.allRelaysActive(self.net) allRelays = chankeep.allRelaysActive(self.net)

View File

@ -4,10 +4,10 @@ from math import ceil
from twisted.internet.threads import deferToThread from twisted.internet.threads import deferToThread
import main import main
import modules.provision
from utils.logging.debug import debug, trace from utils.logging.debug import debug, trace
from utils.logging.log import error, log, warn from utils.logging.log import error, log, warn
def getEnabledRelays(net): def getEnabledRelays(net):
""" """
Get a list of enabled relays for a network. Get a list of enabled relays for a network.
@ -18,6 +18,7 @@ def getEnabledRelays(net):
enabledRelays = [x for x in main.network[net].relays.keys() if main.network[net].relays[x]["enabled"]] enabledRelays = [x for x in main.network[net].relays.keys() if main.network[net].relays[x]["enabled"]]
return enabledRelays return enabledRelays
def getActiveRelays(net): def getActiveRelays(net):
""" """
Get a list of active relays for a network. Get a list of active relays for a network.
@ -48,6 +49,7 @@ def allRelaysActive(net):
debug(f"allRelaysActive() {net}: {relaysActive} ({activeRelays}/{enabledRelays})") debug(f"allRelaysActive() {net}: {relaysActive} ({activeRelays}/{enabledRelays})")
return relaysActive return relaysActive
def getAverageChanlimit(net): def getAverageChanlimit(net):
""" """
Get the average channel limit for a network. Get the average channel limit for a network.
@ -64,6 +66,7 @@ def getAverageChanlimit(net):
debug(f"getAverageChanlimit() {net}: {avg_chanlimit}") debug(f"getAverageChanlimit() {net}: {avg_chanlimit}")
return avg_chanlimit return avg_chanlimit
def getSumChanlimit(net): def getSumChanlimit(net):
""" """
Get the sum of all channel limits for a network. Get the sum of all channel limits for a network.
@ -78,32 +81,26 @@ def getSumChanlimit(net):
total += main.IRCPool[name].chanlimit total += main.IRCPool[name].chanlimit
return total return total
def getChanFree(net, new):
def getChanFree(net):
""" """
Get a dictionary with the free channel spaces for Get a dictionary with the free channel spaces for
each relay, and a channel limit. each relay, and a channel limit.
Example return: Example return:
({1: 99}, 100) ({1: 99}, 100)
:param net: network :param net: network
:param new: list of newly provisioned relays to skip
:return: ({relay: channel spaces}, channel limit) :return: ({relay: channel spaces}, channel limit)
""" """
chanfree = {} chanfree = {}
chanlimits = set()
for i in getActiveRelays(net): for i in getActiveRelays(net):
if i in new:
continue
name = net + str(i) name = net + str(i)
if name not in main.IRCPool.keys(): if name not in main.IRCPool.keys():
continue continue
if not main.IRCPool[name].isconnected: if not main.IRCPool[name].isconnected:
continue continue
chanfree[i] = main.IRCPool[name].chanlimit - len(main.IRCPool[name].channels) chanfree[i] = main.IRCPool[name].chanlimit - len(main.IRCPool[name].channels)
chanlimits.add(main.IRCPool[name].chanlimit)
if not len(chanlimits) == 1: return chanfree
error("Network %s has servers with different CHANLIMIT values" % net)
return False
return (chanfree, chanlimits.pop())
def getTotalChans(net): def getTotalChans(net):
@ -121,7 +118,7 @@ def getTotalChans(net):
return total return total
def emptyChanAllocate(net, flist, new): def emptyChanAllocate(net, flist):
""" """
Allocate channels to relays. Allocate channels to relays.
:param net: network :param net: network
@ -131,64 +128,32 @@ def emptyChanAllocate(net, flist, new):
:return: dictionary of {relay: list of channels}""" :return: dictionary of {relay: list of channels}"""
# Get the free channel spaces for each relay # Get the free channel spaces for each relay
chanfree = getChanFree(net, new) chanfree = getChanFree(net)
if not chanfree: if not chanfree:
return return
# Pretend the newly provisioned relays are already on the network # Pretend the newly provisioned relays are already on the network
for i in new: # for i in new:
chanfree[0][i] = chanfree[1] # chanfree[0][i] = chanfree[1]
allocated = {} allocated = {}
# Copy the list since we're going to mutate it
toalloc = len(flist)
# Used to correct allocations and provision additional relays
# if the math since the last LIST is a bit wrong
# toalloc:2148 free:{1: 250} chanlimit:250 correction:2147
newlist = list(flist) newlist = list(flist)
if toalloc > sum(chanfree[0].values()): sum_free = sum(chanfree.values()) # 250
sum_free = sum(chanfree[0].values()) # 250 trunc_list = newlist[:sum_free]
chans_not_covered = toalloc - sum_free # 2148 - 250 = 1898 debug(f"emptyChanAllocate() {net}: newlist:{len(newlist)} trunc_list:{len(trunc_list)}")
relays_needed = chans_not_covered / chanfree[1] # 1898 / 250 = 7.592
correction = ceil(relays_needed) for i in chanfree.keys():
if main.config["ChanKeep"]["Provision"]: for x in range(chanfree[i]):
debug( if not len(trunc_list):
(
f"emptyChanAllocate() secondary allocation sum_free:{sum_free} "
f"chans_not_covered:{chans_not_covered} relays_needed:{relays_needed} "
f"correction:{correction}"
)
)
debug(
(
f"emptyChanAllocate() not enough free channels: toalloc:{toalloc} "
f"free:{chanfree[0]} chanlimit:{chanfree[1]} correction:{correction}"
)
)
warn("Ran out of channel spaces, provisioning additional %i relays for %s" % (correction, net))
modules.provision.provisionMultipleRelays(net, correction)
return False
else:
# We don't have enough spaces and we can't add any.
# Let's do the best we can in the circumstances.
debug(f"emptyChanAllocate() cannot create additional relays for {net}")
debug(f"emptyChanAllocate() {chans_not_covered} channels cannot be covered")
newlist = newlist[:sum_free]
debug(f"emptyChanAllocate() flist truncated to {sum_free}, length nis now {len(flist)}")
trace(f"emptyChanAllocate() best effort allocation: {flist}")
for i in chanfree[0].keys():
for x in range(chanfree[0][i]):
if not len(newlist):
break break
if i in allocated.keys(): if i in allocated.keys():
allocated[i].append(newlist.pop()) allocated[i].append(trunc_list.pop())
else: else:
allocated[i] = [newlist.pop()] allocated[i] = [trunc_list.pop()]
return allocated return allocated
def populateChans(net, clist, new): def populateChans(net, clist):
""" """
Populate channels on relays. Populate channels on relays.
Stores channels to join in a list in main.TempChan[net][num] Stores channels to join in a list in main.TempChan[net][num]
@ -196,7 +161,7 @@ def populateChans(net, clist, new):
:param clist: list of channels to join :param clist: list of channels to join
:param new: list of newly provisioned relays to account for""" :param new: list of newly provisioned relays to account for"""
# divided = array_split(clist, relay) # divided = array_split(clist, relay)
allocated = emptyChanAllocate(net, clist, new) allocated = emptyChanAllocate(net, clist)
if not allocated: if not allocated:
return return
for i in allocated.keys(): for i in allocated.keys():
@ -242,7 +207,7 @@ def minifyChans(net, listinfo):
return listinfo return listinfo
def keepChannels(net, listinfo, mean, sigrelay, relay, chanlimit): def keepChannels(net, listinfo, mean, sigrelay, relay):
""" """
Minify channels, determine whether we can cover all the channels Minify channels, determine whether we can cover all the channels
on the network, or need to use 'significant' mode. on the network, or need to use 'significant' mode.
@ -263,39 +228,40 @@ def keepChannels(net, listinfo, mean, sigrelay, relay, chanlimit):
coverAll = True coverAll = True
elif relay > main.config["ChanKeep"]["SigSwitch"]: # we cannot cover all of the channels elif relay > main.config["ChanKeep"]["SigSwitch"]: # we cannot cover all of the channels
coverAll = False coverAll = False
if not sigrelay <= main.config["ChanKeep"]["MaxRelay"]: # if not sigrelay <= main.config["ChanKeep"]["MaxRelay"]:
error("Network %s is too big to cover: %i relays required" % (net, sigrelay)) # error("Network %s is too big to cover: %i relays required" % (net, sigrelay))
return # return
num_instances = len(getActiveRelays(net)) num_instances = len(getActiveRelays(net))
debug(f"keepChannels() {net} instances:{num_instances} chanlimit:{chanlimit}") debug(f"keepChannels() {net} instances:{num_instances}")
chan_slots_used = getTotalChans(net) chan_slots_used = getTotalChans(net)
debug(f"keepChannels() slots_used:{chan_slots_used}") debug(f"keepChannels() slots_used:{chan_slots_used}")
max_chans = (chanlimit * num_instances) - chan_slots_used # max_chans = (chanlimit * num_instances) - chan_slots_used
max_chans = getSumChanlimit(net) - chan_slots_used
debug(f"keepChannels() max_chans:{max_chans}") debug(f"keepChannels() max_chans:{max_chans}")
if coverAll: if coverAll:
needed = relay - len(getActiveRelays(net)) # needed = relay - len(getActiveRelays(net))
if needed: # if needed:
debug(f"keepChannels() coverAll asking to provision {needed} relays for {net} relay:{relay}") # debug(f"keepChannels() coverAll asking to provision {needed} relays for {net} relay:{relay}")
newNums = modules.provision.provisionMultipleRelays(net, needed) # newNums = modules.provision.provisionMultipleRelays(net, needed)
else: # else:
newNums = [] # newNums = []
flist = [i[0] for i in listinfo] flist = [i[0] for i in listinfo]
chosen = sorted(flist, reverse=True, key=lambda x: x[1])[:max_chans] chosen = sorted(flist, reverse=True, key=lambda x: x[1])[:max_chans]
debug(f"keepChannels() {net}: joining {len(chosen)}/{len(flist)} channels") debug(f"keepChannels() {net}: joining {len(chosen)}/{len(flist)} channels")
trace(f"keepChannels() {net}: joining:{chosen}") trace(f"keepChannels() {net}: joining:{chosen}")
populateChans(net, chosen, newNums) populateChans(net, chosen)
else: else:
needed = sigrelay - len(getActiveRelays(net)) # needed = sigrelay - len(getActiveRelays(net))
if needed: # if needed:
debug(f"keepChannels() NOT coverAll asking to provision {needed} relays for {net} sigrelay:{sigrelay}") # debug(f"keepChannels() NOT coverAll asking to provision {needed} relays for {net} sigrelay:{sigrelay}")
newNums = modules.provision.provisionMultipleRelays(net, needed) # newNums = modules.provision.provisionMultipleRelays(net, needed)
else: # else:
newNums = [] # newNums = []
siglist = [i[0] for i in listinfo if int(i[1]) > mean] siglist = [i[0] for i in listinfo if int(i[1]) > mean]
chosen = sorted(flist, reverse=True, key=lambda x: x[1])[:max_chans] chosen = sorted(siglist, reverse=True, key=lambda x: x[1])[:max_chans]
debug(f"keepChannels() {net}: joining {len(chosen)}/{len(flist)} channels") debug(f"keepChannels() {net}: joining {len(chosen)}/{len(flist)} channels")
trace(f"keepChannels() {net}: joining:{chosen}") trace(f"keepChannels() {net}: joining:{chosen}")
populateChans(net, chosen, newNums) populateChans(net, chosen)
notifyJoin(net) notifyJoin(net)
@ -308,7 +274,7 @@ def joinSingle(net, channel):
:return: relay number that joined the channel :return: relay number that joined the channel
:rtype: int :rtype: int
""" """
eca = emptyChanAllocate(net, [channel], []) eca = emptyChanAllocate(net, [channel])
if not eca: if not eca:
return False return False
if not len(eca.keys()) == 1: if not len(eca.keys()) == 1:
@ -354,7 +320,7 @@ def nukeNetwork(net):
# deferToThread(_nukeNetwork, net) # deferToThread(_nukeNetwork, net)
def _initialList(net, num, listinfo, chanlimit): def _initialList(net, num, listinfo):
""" """
Called when a relay receives a full LIST response. Called when a relay receives a full LIST response.
Run statistics to determine how many channels are significant. Run statistics to determine how many channels are significant.
@ -390,8 +356,9 @@ def _initialList(net, num, listinfo, chanlimit):
insiglength += 1 insiglength += 1
insigcumul += int(i[1]) insigcumul += int(i[1])
sigrelay = ceil(siglength / chanlimit) avg_chanlimit = getAverageChanlimit(net)
relay = ceil(listlength / chanlimit) sigrelay = ceil(siglength / avg_chanlimit)
relay = ceil(listlength / avg_chanlimit)
abase = "analytics.list.%s" % net abase = "analytics.list.%s" % net
main.g.delete(abase) main.g.delete(abase)
@ -409,12 +376,12 @@ def _initialList(net, num, listinfo, chanlimit):
p.hset(abase, "small_chan_cumul_mem", insigcumul) p.hset(abase, "small_chan_cumul_mem", insigcumul)
p.hset(abase, "relays_for_all_chans", relay) p.hset(abase, "relays_for_all_chans", relay)
p.hset(abase, "relays_for_big_chans", sigrelay) p.hset(abase, "relays_for_big_chans", sigrelay)
p.hset(abase, "relays_for_small_chans", ceil(insiglength / chanlimit)) p.hset(abase, "relays_for_small_chans", ceil(insiglength / avg_chanlimit))
debug( debug(
( (
f"_initialList() net:{net} num:{num} listlength:{listlength} " f"_initialList() net:{net} num:{num} listlength:{listlength} "
f"mean:{mean} siglength:{siglength} insiglength:{insiglength} " f"mean:{mean} siglength:{siglength} insiglength:{insiglength} "
f"sigrelay:{sigrelay} relay:{relay} chanlimit:{chanlimit}" f"sigrelay:{sigrelay} relay:{relay} avg_chanlimit:{avg_chanlimit}"
) )
) )
@ -427,7 +394,7 @@ def _initialList(net, num, listinfo, chanlimit):
p.execute() p.execute()
debug("List parsing completed on %s" % net) debug("List parsing completed on %s" % net)
keepChannels(net, listinfo, mean, sigrelay, relay, chanlimit) keepChannels(net, listinfo, mean, sigrelay, relay)
# return (listinfo, mean, sigrelay, relay) # return (listinfo, mean, sigrelay, relay)
@ -453,9 +420,9 @@ def getListInfo(net):
return convert(info) return convert(info)
def initialList(net, num, listinfo, chanlimit): def initialList(net, num, listinfo):
""" """
Run _initialList in a thread. Run _initialList in a thread.
See above docstring. See above docstring.
""" """
deferToThread(_initialList, net, num, deepcopy(listinfo), chanlimit) deferToThread(_initialList, net, num, deepcopy(listinfo))

View File

@ -52,6 +52,7 @@ def provisionAuthenticationData(num, nick, network, security, auth, password):
commands["nickserv"].append("Set %s" % password) commands["nickserv"].append("Set %s" % password)
inst = modules.regproc.selectInst(network) inst = modules.regproc.selectInst(network)
if "setmode" in inst.keys(): if "setmode" in inst.keys():
# perform is loaded above
# commands["status"].append("LoadMod perform") # commands["status"].append("LoadMod perform")
commands["perform"] = ["add mode %nick% +" + inst["setmode"]] commands["perform"] = ["add mode %nick% +" + inst["setmode"]]
deliverRelayCommands(num, commands, user=user + "/" + network) deliverRelayCommands(num, commands, user=user + "/" + network)

View File

@ -16,6 +16,8 @@ class TestChanKeep(TestCase):
chankeep.main.g = MagicMock() chankeep.main.g = MagicMock()
chankeep.main.g.pipeline = MagicMock() chankeep.main.g.pipeline = MagicMock()
chankeep.main.config["ChanKeep"]["Provision"] = False chankeep.main.config["ChanKeep"]["Provision"] = False
chankeep.getAverageChanlimit = MagicMock()
chankeep.getAverageChanlimit.return_value = self.chanlimit
self.listinfo = self.generate_listinfo() self.listinfo = self.generate_listinfo()
self.chan_name_list = [x[0] for x in self.listinfo] self.chan_name_list = [x[0] for x in self.listinfo]
@ -89,17 +91,17 @@ class TestChanKeep(TestCase):
@patch("modules.chankeep.keepChannels") @patch("modules.chankeep.keepChannels")
def test__initialList(self, keepchannels): def test__initialList(self, keepchannels):
chankeep._initialList(self.net, self.num, self.listinfo, self.chanlimit) chankeep._initialList(self.net, self.num, self.listinfo)
net, passed_list, mean, sigrelay, relay, chanlimit = keepchannels.call_args_list[0][0] net, passed_list, mean, sigrelay, relay = keepchannels.call_args_list[0][0]
self.assertEqual(net, self.net) self.assertEqual(net, self.net)
self.assertEqual(passed_list, self.listinfo) self.assertEqual(passed_list, self.listinfo)
self.assertEqual(chanlimit, self.chanlimit) # self.assertEqual(chanlimit, self.chanlimit)
# print(net, mean, sigrelay, relay) # print(net, mean, sigrelay, relay)
@patch("modules.chankeep.getChanFree") @patch("modules.chankeep.getChanFree")
def test_empty_chan_allocate(self, getchanfree): def test_empty_chan_allocate(self, getchanfree):
getchanfree.return_value = ({1: 600}, 600) # pretend we have 600 channels free getchanfree.return_value = ({1: 600}, 600) # pretend we have 600 channels free
eca = chankeep.emptyChanAllocate(self.net, self.chan_name_list, []) eca = chankeep.emptyChanAllocate(self.net, self.chan_name_list)
self.assertEqual(len(eca), 1) self.assertEqual(len(eca), 1)
num = list(eca.keys())[0] num = list(eca.keys())[0]
chans = eca[list(eca.keys())[0]] chans = eca[list(eca.keys())[0]]
@ -107,7 +109,7 @@ class TestChanKeep(TestCase):
self.assertCountEqual(chans, self.chan_name_list) self.assertCountEqual(chans, self.chan_name_list)
getchanfree.return_value = ({1: 100}, 10) getchanfree.return_value = ({1: 100}, 10)
eca = chankeep.emptyChanAllocate(self.net, self.chan_name_list, []) eca = chankeep.emptyChanAllocate(self.net, self.chan_name_list)
self.assertEqual(len(eca), 1) self.assertEqual(len(eca), 1)
num = list(eca.keys())[0] num = list(eca.keys())[0]
chans = eca[list(eca.keys())[0]] chans = eca[list(eca.keys())[0]]