Implement indexing into Apache Druid #1

Closed
m wants to merge 263 commits from druid into master
2 changed files with 66 additions and 25 deletions
Showing only changes of commit c145e5cf18 - Show all commits

View File

@ -8,14 +8,30 @@ import modules.provision
from utils.logging.debug import debug, trace from utils.logging.debug import debug, trace
from utils.logging.log import error, log, warn from utils.logging.log import error, log, warn
def getEnabledRelays(net):
"""
Get a list of enabled relays for a network.
:param net: network
:rtype: list of int
:return: list of enabled relay numbers
"""
enabledRelays = [x for x in main.network[net].relays.keys() if main.network[net].relays[x]["enabled"]]
return enabledRelays
def getActiveRelays(net): def getActiveRelays(net):
""" """
Get a list of active relays for a network. Get a list of active relays for a network.
:param net: network :param net: network
:rtype: list of int :rtype: list of int
:return: list of active relay numbers""" :return: list of getEnabledRelays relay numbers
activeRelays = [x for x in main.network[net].relays.keys() if main.network[net].relays[x]["enabled"]] """
enabledRelays = getEnabledRelays(net)
activeRelays = []
for i in enabledRelays:
name = net + str(i)
if name in main.IRCPool.keys():
if main.IRCPool[name].authenticated and main.IRCPool[name].isconnected:
activeRelays.append(i)
return activeRelays return activeRelays
@ -27,27 +43,40 @@ def allRelaysActive(net):
:return: True if all relays are active and authenticated, False otherwise :return: True if all relays are active and authenticated, False otherwise
""" """
activeRelays = getActiveRelays(net) activeRelays = getActiveRelays(net)
debug(f"allRelaysActive() active relays for {net}: {activeRelays}") enabledRelays = getEnabledRelays(net)
relayNum = len(activeRelays) relaysActive = len(activeRelays) == len(enabledRelays)
existNum = 0 debug(f"allRelaysActive() {net}: {relaysActive} ({activeRelays}/{enabledRelays})")
for i in activeRelays: return relaysActive
def getAverageChanlimit(net):
"""
Get the average channel limit for a network.
:param net: network
:rtype: int
:return: average channel limit
"""
total = 0
for i in getActiveRelays(net):
name = net + str(i) name = net + str(i)
if name in main.IRCPool.keys(): if name in main.IRCPool.keys():
if main.IRCPool[name].authenticated and main.IRCPool[name].isconnected: total += main.IRCPool[name].chanlimit
existNum += 1 avg_chanlimit = total / len(getActiveRelays(net))
else: debug(f"getAverageChanlimit() {net}: {avg_chanlimit}")
debug(f"allRelaysActive() {name} is not authenticated or connected") return avg_chanlimit
debug(
(
f"allRelaysActive() {name} auth:{main.IRCPool[name].authenticated} "
f"connected:{main.IRCPool[name].isconnected}"
)
)
debug(f"allRelaysActive() finished, {existNum}/{relayNum} relays active for {net}")
if existNum == relayNum:
return True
return False
def getSumChanlimit(net):
"""
Get the sum of all channel limits for a network.
:param net: network
:rtype: int
:return: sum of channel limits
"""
total = 0
for i in getActiveRelays(net):
name = net + str(i)
if name in main.IRCPool.keys():
total += main.IRCPool[name].chanlimit
return total
def getChanFree(net, new): def getChanFree(net, new):
""" """
@ -245,17 +274,27 @@ def keepChannels(net, listinfo, mean, sigrelay, relay, chanlimit):
debug(f"keepChannels() max_chans:{max_chans}") debug(f"keepChannels() max_chans:{max_chans}")
if coverAll: if coverAll:
needed = relay - len(getActiveRelays(net)) needed = relay - len(getActiveRelays(net))
if needed:
debug(f"keepChannels() coverAll asking to provision {needed} relays for {net} relay:{relay}") debug(f"keepChannels() coverAll asking to provision {needed} relays for {net} relay:{relay}")
newNums = modules.provision.provisionMultipleRelays(net, needed) newNums = modules.provision.provisionMultipleRelays(net, needed)
else:
newNums = []
flist = [i[0] for i in listinfo] flist = [i[0] for i in listinfo]
chosen = sorted(flist, reverse=True, key=lambda x: x[1])[:max_chans] chosen = sorted(flist, reverse=True, key=lambda x: x[1])[:max_chans]
debug(f"keepChannels() {net}: joining {len(chosen)}/{len(flist)} channels")
trace(f"keepChannels() {net}: joining:{chosen}")
populateChans(net, chosen, newNums) populateChans(net, chosen, newNums)
else: else:
needed = sigrelay - len(getActiveRelays(net)) needed = sigrelay - len(getActiveRelays(net))
if needed:
debug(f"keepChannels() NOT coverAll asking to provision {needed} relays for {net} sigrelay:{sigrelay}") debug(f"keepChannels() NOT coverAll asking to provision {needed} relays for {net} sigrelay:{sigrelay}")
newNums = modules.provision.provisionMultipleRelays(net, needed) newNums = modules.provision.provisionMultipleRelays(net, needed)
else:
newNums = []
siglist = [i[0] for i in listinfo if int(i[1]) > mean] siglist = [i[0] for i in listinfo if int(i[1]) > mean]
chosen = sorted(siglist, reverse=True, key=lambda x: x[1])[:max_chans] chosen = sorted(flist, reverse=True, key=lambda x: x[1])[:max_chans]
debug(f"keepChannels() {net}: joining {len(chosen)}/{len(flist)} channels")
trace(f"keepChannels() {net}: joining:{chosen}")
populateChans(net, chosen, newNums) populateChans(net, chosen, newNums)
notifyJoin(net) notifyJoin(net)

View File

@ -75,6 +75,8 @@ def provisionRelay(num, network): # provision user and network data
def provisionMultipleRelays(net, relaysNeeded): def provisionMultipleRelays(net, relaysNeeded):
if not relaysNeeded:
return []
if not main.config["ChanKeep"]["Provision"]: if not main.config["ChanKeep"]["Provision"]:
warn(f"Asked to create {relaysNeeded} relays for {net}, but provisioning is disabled") warn(f"Asked to create {relaysNeeded} relays for {net}, but provisioning is disabled")
return [] return []