Implement indexing into Apache Druid #1

Closed
m wants to merge 263 commits from druid into master
8 changed files with 86 additions and 40 deletions
Showing only changes of commit c983a8e3b6 - Show all commits

View File

@ -6,7 +6,7 @@ from klein import Klein
from twisted.web.server import Request from twisted.web.server import Request
import main import main
from modules import chankeep, provision, userinfo from modules import chankeep, helpers, provision, userinfo
from modules.network import Network from modules.network import Network
from utils.logging.log import warn from utils.logging.log import warn
@ -473,10 +473,8 @@ class API(object):
def irc_list_network(self, request, net): def irc_list_network(self, request, net):
if net not in main.network.keys(): if net not in main.network.keys():
return dumps({"success": False, "reason": "no such net."}) return dumps({"success": False, "reason": "no such net."})
if 1 not in main.network[net].relays.keys(): first_relay = helpers.get_first_relay(net)
return dumps({"success": False, "reason": f"no first relay on {net}"}) if not first_relay:
name = f"{net}1" return dumps({"success": False, "reason": f"could not get first relay for {net}"})
if name not in main.IRCPool.keys(): first_relay.list()
return dumps({"success": False, "reason": f"first relay not active for {net}"}) return dumps({"success": True, "message": f"requested list with instance {first_relay.num} of {net}"})
main.IRCPool[name].list()
return dumps({"success": True, "message": f"requested list with first instance of {net}"})

View File

@ -1,4 +1,5 @@
import main import main
from modules import helpers
class ListCommand: class ListCommand:
@ -9,27 +10,24 @@ class ListCommand:
if authed: if authed:
if length == 1: if length == 1:
for i in main.network.keys(): for i in main.network.keys():
if 1 not in main.network[i].relays.keys(): first_relay = helpers.get_first_relay(i)
####
if not first_relay:
info("Network has no first instance: %s" % i) info("Network has no first instance: %s" % i)
continue continue
if not i + "1" in main.IRCPool.keys(): first_relay.list()
info("No IRC instance: %s - 1" % i) success(f"Requested list with instance {first_relay.num} of {i}")
continue
main.IRCPool[i + "1"].list()
success("Requested list with first instance of %s" % i)
return return
elif length == 2: elif length == 2:
if not spl[1] in main.network.keys(): if not spl[1] in main.network.keys():
failure("No such network: %s" % spl[1]) failure("No such network: %s" % spl[1])
return return
if 1 not in main.network[spl[1]].relays.keys(): first_relay = helpers.get_first_relay(spl[1])
failure("Network has no first instance") if not first_relay:
failure("Could not get first instance")
return return
if spl[1] + "1" not in main.IRCPool.keys(): first_relay.list()
failure("No IRC instance: %s - 1" % spl[1]) success(f"Requested list with instance {first_relay.num} of {spl[1]}")
return
main.IRCPool[spl[1] + "1"].list()
success("Requested list with first instance of %s" % spl[1])
return return
else: else:
incUsage("list") incUsage("list")

View File

@ -16,7 +16,7 @@ from twisted.words.protocols.irc import (
import main import main
from core.relay import sendRelayNotification from core.relay import sendRelayNotification
from modules import chankeep, counters, monitor, regproc, userinfo from modules import chankeep, counters, helpers, monitor, regproc, userinfo
from utils.dedup import dedup from utils.dedup import dedup
from utils.logging.debug import debug from utils.logging.debug import debug
from utils.logging.log import error, log, warn from utils.logging.log import error, log, warn
@ -458,11 +458,16 @@ class IRCBot(IRCClient):
def recheckList(self): def recheckList(self):
allRelays = chankeep.allRelaysActive(self.net) allRelays = chankeep.allRelaysActive(self.net)
if allRelays: if allRelays:
name = self.net + "1" debug(f"All relays active for {self.net}")
if main.IRCPool[name].wantList is True: first_relay = helpers.get_first_relay(self.net)
main.IRCPool[name].list(nocheck=True) if first_relay:
debug("Asking for a list for %s after final relay %i connected" % (self.net, self.num)) if first_relay.wantList is True:
if self.num == 1: # Only one instance should do a list first_relay.list(nocheck=True)
debug("Asking for a list for %s after final relay %i connected" % (self.net, self.num))
# name = self.net + "1"
# if self.num == 1: # Only one instance should do a list
if helpers.is_first_relay(self.net, self.num):
if self.chanlimit: if self.chanlimit:
if allRelays: if allRelays:
self.list() self.list()

0
modules/__init__.py Normal file
View File

View File

@ -9,10 +9,19 @@ from utils.logging.debug import debug
from utils.logging.log import error, log, warn from utils.logging.log import error, log, warn
def getActiveRelays(net):
activeRelays = [x for x in main.network[net].relays.keys() if main.network[net].relays[x]["enabled"]]
return activeRelays
def allRelaysActive(net): def allRelaysActive(net):
relayNum = len(main.network[net].relays.keys()) """
Check if all enabled relays are active and authenticated.
"""
activeRelays = getActiveRelays(net)
relayNum = len(activeRelays) + 1
existNum = 0 existNum = 0
for i in main.network[net].relays.keys(): for i in activeRelays:
name = net + str(i) name = net + str(i)
if name in main.IRCPool.keys(): if name in main.IRCPool.keys():
if main.IRCPool[name].authenticated: if main.IRCPool[name].authenticated:
@ -34,7 +43,7 @@ def getChanFree(net, new):
""" """
chanfree = {} chanfree = {}
chanlimits = set() chanlimits = set()
for i in main.network[net].relays.keys(): for i in getActiveRelays(net):
if i in new: if i in new:
continue continue
name = net + str(i) name = net + str(i)
@ -60,9 +69,8 @@ def emptyChanAllocate(net, flist, relay, new):
toalloc = len(flist) toalloc = len(flist)
if toalloc > sum(chanfree[0].values()): if toalloc > sum(chanfree[0].values()):
correction = round(toalloc - sum(chanfree[0].values()) / chanfree[1]) correction = round(toalloc - sum(chanfree[0].values()) / chanfree[1])
# print("correction", correction)
warn("Ran out of channel spaces, provisioning additional %i relays for %s" % (correction, net)) warn("Ran out of channel spaces, provisioning additional %i relays for %s" % (correction, net))
# newNums = modules.provision.provisionMultipleRelays(net, correction) modules.provision.provisionMultipleRelays(net, correction)
return False return False
for i in chanfree[0].keys(): for i in chanfree[0].keys():
for x in range(chanfree[0][i]): for x in range(chanfree[0][i]):
@ -88,7 +96,7 @@ def populateChans(net, clist, relay, new):
def notifyJoin(net): def notifyJoin(net):
for i in main.network[net].relays.keys(): for i in getActiveRelays(net):
name = net + str(i) name = net + str(i)
if name in main.IRCPool.keys(): if name in main.IRCPool.keys():
main.IRCPool[name].checkChannels() main.IRCPool[name].checkChannels()
@ -98,7 +106,7 @@ def minifyChans(net, listinfo):
if not allRelaysActive(net): if not allRelaysActive(net):
error("All relays for %s are not active, cannot minify list" % net) error("All relays for %s are not active, cannot minify list" % net)
return False return False
for i in main.network[net].relays.keys(): for i in getActiveRelays(net):
name = net + str(i) name = net + str(i)
for x in main.IRCPool[name].channels: for x in main.IRCPool[name].channels:
for y in listinfo: for y in listinfo:
@ -122,12 +130,12 @@ def keepChannels(net, listinfo, mean, sigrelay, relay):
error("Network %s is too big to cover: %i relays required" % (net, sigrelay)) error("Network %s is too big to cover: %i relays required" % (net, sigrelay))
return return
if coverAll: if coverAll:
needed = relay - len(main.network[net].relays.keys()) needed = relay - len(getActiveRelays(net))
newNums = modules.provision.provisionMultipleRelays(net, needed) newNums = modules.provision.provisionMultipleRelays(net, needed)
flist = [i[0] for i in listinfo] flist = [i[0] for i in listinfo]
populateChans(net, flist, relay, newNums) populateChans(net, flist, relay, newNums)
else: else:
needed = sigrelay - len(main.network[net].relays.keys()) needed = sigrelay - len(getActiveRelays(net))
newNums = modules.provision.provisionMultipleRelays(net, needed) newNums = modules.provision.provisionMultipleRelays(net, needed)
siglist = [i[0] for i in listinfo if int(i[1]) > mean] siglist = [i[0] for i in listinfo if int(i[1]) > mean]
populateChans(net, siglist, sigrelay, newNums) populateChans(net, siglist, sigrelay, newNums)
@ -156,7 +164,7 @@ def partSingle(net, channel):
:return: :return:
""" """
parted = [] parted = []
for i in main.network[net].relays.keys(): for i in getActiveRelays(net):
name = f"{net}{i}" name = f"{net}{i}"
if name in main.IRCPool.keys(): if name in main.IRCPool.keys():
if channel in main.IRCPool[name].channels: if channel in main.IRCPool[name].channels:

35
modules/helpers.py Normal file
View File

@ -0,0 +1,35 @@
import main
from modules import chankeep
def get_first_relay(net):
"""
Get the first relay in the network.
:param net: the network
:param num: number or relay
:return: IRCPool instance for the IRC bot
"""
cur_relay = 0
max_relay = len(main.network[net].relays.keys())+1
activeRelays = chankeep.getActiveRelays(net)
while cur_relay != max_relay:
cur_relay += 1
if cur_relay not in activeRelays:
continue
name = net + str(cur_relay)
if name in main.IRCPool.keys():
return main.IRCPool[name]
return None
def is_first_relay(net, num):
"""
Determine if we are the first relay for the network.
:param net: the network
:param num: number or relay
:return: True if we are the first relay, False otherwise
"""
cur_relay = 0
max_relay = len(main.network[net].relays.keys())
while cur_relay > max_relay:
name = net + str(cur_relay)
if name in main.IRCPool.keys():
return cur_relay == num

View File

@ -3,6 +3,7 @@ from twisted.internet import reactor
import main import main
import modules.regproc import modules.regproc
from utils.deliver_relay_commands import deliverRelayCommands from utils.deliver_relay_commands import deliverRelayCommands
from utils.logging.log import warn
def provisionUserNetworkData( def provisionUserNetworkData(
@ -72,13 +73,16 @@ def provisionRelay(num, network): # provision user and network data
main.network[network].port, main.network[network].port,
main.network[network].security, main.network[network].security,
main.network[network].auth, main.network[network].auth,
main.network[network].aliases[num]["password"] main.network[network].aliases[num]["password"],
) )
if main.config["ConnectOnCreate"]: if main.config["ConnectOnCreate"]:
reactor.callLater(10, main.network[network].start_bot, num) reactor.callLater(10, main.network[network].start_bot, num)
def provisionMultipleRelays(net, relaysNeeded): def provisionMultipleRelays(net, relaysNeeded):
if not main.config["ChanKeep"]["Provision"]:
warn(f"Asked to create {relaysNeeded} relays for {net}, but provisioning is disabled")
return 0
numsProvisioned = [] numsProvisioned = []
for i in range(relaysNeeded): for i in range(relaysNeeded):
num, alias = main.network[net].add_relay() num, alias = main.network[net].add_relay()

View File

@ -57,8 +57,6 @@ api_enabled = getenv("THRESHOLD_API_ENABLED", main.config["API"]["Enabled"]) in
api_address = getenv("THRESHOLD_API_HOST", main.config["API"]["Address"]) api_address = getenv("THRESHOLD_API_HOST", main.config["API"]["Address"])
api_port = int(getenv("THRESHOLD_API_PORT", main.config["API"]["Port"])) api_port = int(getenv("THRESHOLD_API_PORT", main.config["API"]["Port"]))
print("KEY", main.certPath + main.config["Key"])
print("CERT", main.certPath + main.config["Certificate"])
if __name__ == "__main__": if __name__ == "__main__":
listener = ServerFactory() listener = ServerFactory()
if listener_ssl is True: if listener_ssl is True: