Implement indexing into Apache Druid #1

Closed
m wants to merge 263 commits from druid into master
3 changed files with 16 additions and 2 deletions
Showing only changes of commit 43c5625b3b - Show all commits

View File

@ -61,6 +61,7 @@ class IRCBot(IRCClient):
self.name = net + str(num)
alias = main.alias[num]
relay = main.network[self.net].relays[num]
self.netinst = main.network[self.net]
self.nickname = alias["nick"]
self.realname = alias["realname"]
self.username = alias["nick"].lower() + "/" + relay["net"]
@ -518,6 +519,12 @@ class IRCBot(IRCClient):
warn("Invalid chanlimit: %s" % chanlimit)
if self.chanlimit == 0:
self.chanlimit = 200 # don't take the piss if it's not limited
net_inst_chanlimit = self.netinst.chanlimit
if net_inst_chanlimit:
if self.chanlimit > net_inst_chanlimit:
self.chanlimit = net_inst_chanlimit
warn(f"Chanlimit on {self.net} too high, setting to {self.chanlimit}")
if not regproc.needToRegister(self.net): # if we need to register, only recheck on auth confirmation
self.recheckList()

View File

@ -237,6 +237,8 @@ def keepChannels(net, listinfo, mean, sigrelay, relay):
debug(f"keepChannels() slots_used:{chan_slots_used}")
# max_chans = (chanlimit * num_instances) - chan_slots_used
max_chans = getSumChanlimit(net) - chan_slots_used
if max_chans < 0:
max_chans = 0
debug(f"keepChannels() max_chans:{max_chans}")
if coverAll:
# needed = relay - len(getActiveRelays(net))
@ -259,7 +261,7 @@ def keepChannels(net, listinfo, mean, sigrelay, relay):
# newNums = []
siglist = [i[0] for i in listinfo if int(i[1]) > mean]
chosen = sorted(siglist, reverse=True, key=lambda x: x[1])[:max_chans]
debug(f"keepChannels() {net}: joining {len(chosen)}/{len(flist)} channels")
debug(f"keepChannels() {net}: joining {len(chosen)}/{len(siglist)} channels")
trace(f"keepChannels() {net}: joining:{chosen}")
populateChans(net, chosen)
notifyJoin(net)
@ -360,6 +362,10 @@ def _initialList(net, num, listinfo):
sigrelay = ceil(siglength / avg_chanlimit)
relay = ceil(listlength / avg_chanlimit)
cur_relays = len(getActiveRelays(net))
sig_relays_missing = sigrelay - cur_relays
all_relays_missing = relay - cur_relays
abase = "analytics.list.%s" % net
main.g.delete(abase)
p = main.g.pipeline()
@ -377,6 +383,8 @@ def _initialList(net, num, listinfo):
p.hset(abase, "relays_for_all_chans", relay)
p.hset(abase, "relays_for_big_chans", sigrelay)
p.hset(abase, "relays_for_small_chans", ceil(insiglength / avg_chanlimit))
p.hset(abase, "sig_relays_missing", sig_relays_missing)
p.hset(abase, "all_relays_missing", all_relays_missing)
debug(
(
f"_initialList() net:{net} num:{num} listlength:{listlength} "

View File

@ -86,7 +86,6 @@ class TestChanKeep(TestCase):
# list_insig = [x for x in listinfo_num if x < mean]
list_sig = [x for x in listinfo if x[1] > mean]
chosen = sorted(list_sig, reverse=True, key=lambda x: x[1])[:max_chans]
print("CHOSEN", chosen)
self.assertEqual(len(chosen), 5)
@patch("modules.chankeep.keepChannels")