import main from utils.logging.log import * from utils.logging.debug import * from copy import deepcopy from twisted.internet.threads import deferToThread def provisionInstances(net, relaysNeeded): #num, alias = pass def purgeRecords(net): base = "list.%s" % net p = main.g.pipeline() existingChans = main.g.smembers("list."+net) for i in existingChans: p.delete(base+"."+i.decode("utf-8")) p.execute() def _nukeNetwork(net): purgeRecords(net) p = main.g.pipeline() p.delete("analytics.list."+net) p.delete("list."+net) p.execute() def nukeNetwork(net): deferToThread(_nukeNetwork, net) def _initialList(net, num, listinfo, chanlimit): #listinfo = sorted(listinfo, key=lambda x: xdd[0]) listlength = len(listinfo) cumul = 0 try: cumul += sum(int(i[1]) for i in listinfo) except TypeError: warn("Bad LIST data received from %s - %i" % (net, num)) return mean = round(cumul/listlength, 2) siglength = 0 insiglength = 0 sigcumul = 0 insigcumul = 0 for i in listinfo: if int(i[1]) > mean: siglength += 1 sigcumul += int(i[1]) elif int(i[1]) < mean: insiglength += 1 insigcumul += int(i[1]) if not net in main.network.keys(): warn("Cannot write list info - no network entry for %s" % net) return sigrelay = round(siglength/chanlimit, 2) netbase = "list.%s" % net abase = "analytics.list.%s" % net p = main.g.pipeline() p.hset(abase, "mean", mean) p.hset(abase, "total", listlength) p.hset(abase, "sigtotal", siglength) p.hset(abase, "insigtotal", insiglength) p.hset(abase, "sigperc", round(siglength/listlength*100, 2)) p.hset(abase, "insigperc", round(insiglength/listlength*100, 2)) p.hset(abase, "cumul", cumul) p.hset(abase, "sigcumul", sigcumul) p.hset(abase, "insigcumul", insigcumul) p.hset(abase, "relay", round(listlength/chanlimit, 2)) p.hset(abase, "sigrelay", sigrelay) p.hset(abase, "insigrelay", round(insiglength/chanlimit, 2)) # Purge existing records before writing purgeRecords(net) for i in listinfo: p.rpush(netbase+"."+i[0], i[1]) p.rpush(netbase+"."+i[0], i[2]) p.sadd(netbase, i[0]) p.execute() debug("List parsing completed on %s" % net) def initialList(net, num, listinfo, chanlimit): deferToThread(_initialList, net, num, deepcopy(listinfo), chanlimit)