Implement indexing into Apache Druid #1

Closed
m wants to merge 263 commits from druid into master
1 changed files with 99 additions and 17 deletions
Showing only changes of commit ddc9af0ddf - Show all commits

View File

@ -10,6 +10,11 @@ from utils.logging.log import error, log, warn
def getActiveRelays(net): def getActiveRelays(net):
"""
Get a list of active relays for a network.
:param net: network
:rtype: list of int
:return: list of active relay numbers"""
activeRelays = [x for x in main.network[net].relays.keys() if main.network[net].relays[x]["enabled"]] activeRelays = [x for x in main.network[net].relays.keys() if main.network[net].relays[x]["enabled"]]
return activeRelays return activeRelays
@ -17,6 +22,9 @@ def getActiveRelays(net):
def allRelaysActive(net): def allRelaysActive(net):
""" """
Check if all enabled relays are active and authenticated. Check if all enabled relays are active and authenticated.
:param net: network
:rtype: bool
:return: True if all relays are active and authenticated, False otherwise
""" """
activeRelays = getActiveRelays(net) activeRelays = getActiveRelays(net)
debug(f"allRelaysActive() active relays for {net}: {activeRelays}") debug(f"allRelaysActive() active relays for {net}: {activeRelays}")
@ -62,6 +70,12 @@ def getChanFree(net, new):
def getTotalChans(net): def getTotalChans(net):
"""
Get the total number of channels on all relays for a network.
:param net: network
:rtype: int
:return: total number of channels
"""
total = 0 total = 0
for i in getActiveRelays(net): for i in getActiveRelays(net):
name = net + str(i) name = net + str(i)
@ -71,16 +85,31 @@ def getTotalChans(net):
def emptyChanAllocate(net, flist, new): def emptyChanAllocate(net, flist, new):
"""
Allocate channels to relays.
:param net: network
:param flist: list of channels to allocate
:param new: list of newly provisioned relays to account for
:rtype: dict
:return: dictionary of {relay: list of channels}"""
# Get the free channel spaces for each relay
chanfree = getChanFree(net, new) chanfree = getChanFree(net, new)
if not chanfree: if not chanfree:
return return
# Pretend the newly provisioned relays are already on the network
for i in new: for i in new:
chanfree[0][i] = chanfree[1] chanfree[0][i] = chanfree[1]
allocated = {} allocated = {}
# Copy the list since we're going to mutate it
toalloc = len(flist) toalloc = len(flist)
# Used to correct allocations and provision additional relays # Used to correct allocations and provision additional relays
# if the math since the last LIST is a bit wrong # if the math since the last LIST is a bit wrong
# toalloc:2148 free:{1: 250} chanlimit:250 correction:2147 # toalloc:2148 free:{1: 250} chanlimit:250 correction:2147
newlist = list(flist)
if toalloc > sum(chanfree[0].values()): if toalloc > sum(chanfree[0].values()):
sum_free = sum(chanfree[0].values()) # 250 sum_free = sum(chanfree[0].values()) # 250
chans_not_covered = toalloc - sum_free # 2148 - 250 = 1898 chans_not_covered = toalloc - sum_free # 2148 - 250 = 1898
@ -108,10 +137,9 @@ def emptyChanAllocate(net, flist, new):
# Let's do the best we can in the circumstances. # Let's do the best we can in the circumstances.
debug(f"emptyChanAllocate() cannot create additional relays for {net}") debug(f"emptyChanAllocate() cannot create additional relays for {net}")
debug(f"emptyChanAllocate() {chans_not_covered} channels cannot be covered") debug(f"emptyChanAllocate() {chans_not_covered} channels cannot be covered")
flist = flist[:sum_free] newlist = newlist[:sum_free]
debug(f"emptyChanAllocate() flist truncated to {sum_free}, length nis now {len(flist)}") debug(f"emptyChanAllocate() flist truncated to {sum_free}, length nis now {len(flist)}")
trace(f"emptyChanAllocate() best effort allocation: {flist}") trace(f"emptyChanAllocate() best effort allocation: {flist}")
newlist = list(flist)
for i in chanfree[0].keys(): for i in chanfree[0].keys():
for x in range(chanfree[0][i]): for x in range(chanfree[0][i]):
if not len(newlist): if not len(newlist):
@ -124,6 +152,12 @@ def emptyChanAllocate(net, flist, new):
def populateChans(net, clist, new): def populateChans(net, clist, new):
"""
Populate channels on relays.
Stores channels to join in a list in main.TempChan[net][num]
:param net: network
:param clist: list of channels to join
:param new: list of newly provisioned relays to account for"""
# divided = array_split(clist, relay) # divided = array_split(clist, relay)
allocated = emptyChanAllocate(net, clist, new) allocated = emptyChanAllocate(net, clist, new)
if not allocated: if not allocated:
@ -136,6 +170,11 @@ def populateChans(net, clist, new):
def notifyJoin(net): def notifyJoin(net):
"""
Notify relays to join channels.
They will pull from main.TempChan and remove channels they join.
:param net: network
"""
for i in getActiveRelays(net): for i in getActiveRelays(net):
name = net + str(i) name = net + str(i)
if name in main.IRCPool.keys(): if name in main.IRCPool.keys():
@ -143,6 +182,14 @@ def notifyJoin(net):
def minifyChans(net, listinfo): def minifyChans(net, listinfo):
"""
Remove channels from listinfo that are already covered by a relay.
:param net: network
:param listinfo: list of channels to check
:type listinfo: list of [channel, num_users]
:return: list of channels with joined channels removed
:rtype: list of [channel, num_users]
"""
if not allRelaysActive(net): if not allRelaysActive(net):
error("All relays for %s are not active, cannot minify list" % net) error("All relays for %s are not active, cannot minify list" % net)
return False return False
@ -159,6 +206,19 @@ def minifyChans(net, listinfo):
def keepChannels(net, listinfo, mean, sigrelay, relay, chanlimit): def keepChannels(net, listinfo, mean, sigrelay, relay, chanlimit):
"""
Minify channels, determine whether we can cover all the channels
on the network, or need to use 'significant' mode.
Truncate the channel list to available channel spaces.
Allocate these channels to relays.
Notify relays that they should pull from TempChan to join.
:param net: network
:param listinfo: list of [channel, num_users] lists
:param mean: mean of channel population
:param sigrelay: number of relays needed to cover significant channels
:param relay: number of relays needed to cover all channels
:param chanlimit: maximum number of channels to allocate to a relay
"""
listinfo = minifyChans(net, listinfo) listinfo = minifyChans(net, listinfo)
if not listinfo: if not listinfo:
return return
@ -193,6 +253,14 @@ def keepChannels(net, listinfo, mean, sigrelay, relay, chanlimit):
def joinSingle(net, channel): def joinSingle(net, channel):
"""
Join a channel on a relay.
Use ECA to determine which relay to join on.
:param net: network
:param channel: channel to join
:return: relay number that joined the channel
:rtype: int
"""
eca = emptyChanAllocate(net, [channel], []) eca = emptyChanAllocate(net, [channel], [])
if not eca: if not eca:
return False return False
@ -209,9 +277,10 @@ def joinSingle(net, channel):
def partSingle(net, channel): def partSingle(net, channel):
""" """
Iterate over all the relays of net and part channels matching channel. Iterate over all the relays of net and part channels matching channel.
:param net: :param net: network
:param channel: :param channel: channel to part
:return: :return: list of relays that parted the channel
:rtype: list of str
""" """
parted = [] parted = []
for i in getActiveRelays(net): for i in getActiveRelays(net):
@ -224,6 +293,9 @@ def partSingle(net, channel):
def nukeNetwork(net): def nukeNetwork(net):
"""
Remove network records.
:param net: network"""
# purgeRecords(net) # purgeRecords(net)
# p = main.g.pipeline() # p = main.g.pipeline()
main.g.delete("analytics.list." + net) main.g.delete("analytics.list." + net)
@ -236,6 +308,21 @@ def nukeNetwork(net):
def _initialList(net, num, listinfo, chanlimit): def _initialList(net, num, listinfo, chanlimit):
"""
Called when a relay receives a full LIST response.
Run statistics to determine how many channels are significant.
This is done by adding all the numbers of users on the channels together,
then dividing by the number of channels.
* cumul - cumulative sum of all channel membership
* siglength - number of significant channels
* listlength - number of channels in the list
* sigrelay - number of relays needed to cover siglength
* relay - number of relays needed to cover all channels
:param net: network
:param num: relay number
:param listinfo: list of [channel, num_users] lists
:param chanlimit: maximum number of channels the relay can join
"""
listlength = len(listinfo) listlength = len(listinfo)
cumul = 0 cumul = 0
try: try:
@ -258,9 +345,11 @@ def _initialList(net, num, listinfo, chanlimit):
sigrelay = ceil(siglength / chanlimit) sigrelay = ceil(siglength / chanlimit)
relay = ceil(listlength / chanlimit) relay = ceil(listlength / chanlimit)
# netbase = "list.%s" % net
abase = "analytics.list.%s" % net abase = "analytics.list.%s" % net
p = main.g.pipeline() p = main.g.pipeline()
# See docstring for meanings
p.hset(abase, "mean", mean) p.hset(abase, "mean", mean)
p.hset(abase, "total", listlength) p.hset(abase, "total", listlength)
p.hset(abase, "sigtotal", siglength) p.hset(abase, "sigtotal", siglength)
@ -296,16 +385,9 @@ def _initialList(net, num, listinfo, chanlimit):
def initialList(net, num, listinfo, chanlimit): def initialList(net, num, listinfo, chanlimit):
"""
Run _initialList in a thread.
See above docstring.
"""
deferToThread(_initialList, net, num, deepcopy(listinfo), chanlimit) deferToThread(_initialList, net, num, deepcopy(listinfo), chanlimit)
def chankeep_handler(net, num, listinfo, chanlimit):
"""
Handle a channel keep request.
:param net:
:param num:
:param listinfo:
:param chanlimit:
:return:
"""
listinfo, mean, sigrelay, relay = _initialList(net, num, listinfo, chanlimit)