Implement indexing into Apache Druid #1

Closed
m wants to merge 263 commits from druid into master
4 changed files with 71 additions and 49 deletions
Showing only changes of commit 11f15ac960 - Show all commits

View File

@ -9,6 +9,7 @@ import sys
from string import digits from string import digits
from random import randint from random import randint
from copy import deepcopy from copy import deepcopy
from datetime import datetime
from modules import userinfo from modules import userinfo
from modules import counters from modules import counters
@ -200,73 +201,95 @@ class IRCBot(IRCClient):
del main.TempChan[self.net] del main.TempChan[self.net]
def event(self, **cast): def event(self, **cast):
if not "time" in cast.keys():
cast["time"] = str(datetime.now().isoformat())
# remove odd stuff
for i in list(cast.keys()): # Make a copy of the .keys() as Python 3 cannot handle iterating over for i in list(cast.keys()): # Make a copy of the .keys() as Python 3 cannot handle iterating over
if cast[i] == "": # a dictionary that changes length with each iteration if cast[i] == "": # a dictionary that changes length with each iteration
del cast[i] del cast[i]
# remove server stuff
if "muser" in cast.keys(): if "muser" in cast.keys():
if cast["muser"] == self.servername: if cast["muser"] == self.servername:
return return
if "channel" in cast.keys(): if "channel" in cast.keys():
if cast["channel"] == "*": if cast["channel"] == "*":
return return
##
# expand out the hostmask
if not {"nick", "ident", "host"}.issubset(set(cast.keys())): if not {"nick", "ident", "host"}.issubset(set(cast.keys())):
cast["nick"], cast["ident"], cast["host"] = parsen(cast["muser"]) cast["nick"], cast["ident"], cast["host"] = parsen(cast["muser"])
# handle ZNC stuff
if {"nick", "ident", "host", "msg"}.issubset(set(cast)): if {"nick", "ident", "host", "msg"}.issubset(set(cast)):
if "msg" in cast.keys(): if cast["ident"] == "znc" and cast["host"] == "znc.in":
if cast["ident"] == "znc" and cast["host"] == "znc.in": cast["type"] = "znc"
cast["type"] = "znc" cast["num"] = self.num
cast["num"] = self.num del cast["nick"]
del cast["nick"] del cast["ident"]
del cast["ident"] del cast["host"]
del cast["host"] del cast["channel"]
del cast["channel"] if "Disconnected from IRC" in cast["msg"]:
if "Disconnected from IRC" in cast["msg"]: log("ZNC disconnected on %s - %i" % (self.net, self.num))
log("ZNC disconnected on %s - %i" % (self.net, self.num)) self.isconnected = False
self.isconnected = False if "Connected!" in cast["msg"]:
if "Connected!" in cast["msg"]: log("ZNC connected on %s - %i" % (self.net, self.num))
log("ZNC connected on %s - %i" % (self.net, self.num)) self.isconnected = True
self.isconnected = True #
if not cast["type"] in ["query", "self", "highlight", "znc", "who"]:
if "channel" in cast.keys() and not cast["type"] == "mode": # don't handle modes here # don't reprocess the same message twice
if cast["channel"].lower() == self.nickname.lower(): # as they are channel == nickname # if the type is in that list, it's already been here, don't run it again
#castDup = deepcopy(cast) # however modes are not queries! if not cast["type"] in {"query", "self", "highlight", "znc", "who"}:
cast["mtype"] = cast["type"] cast["num"] = self.num
cast["type"] = "query" if "channel" in cast.keys():
cast["num"] = self.num if cast["type"] == "mode":
#self.event(**castDup) if self.nickname.lower() == cast["channel"].lower():
# Don't call self.event for this one because queries are not events on a castDup = deepcopy(cast)
# channel, but we still want to see them castDup["mtype"] = cast["type"]
castDup["type"] = "self"
self.event(**castDup)
if cast["modearg"]:
if self.nickname.lower() == cast["modearg"].lower():
castDup = deepcopy(cast)
castDup["mtype"] = cast["type"]
castDup["type"] = "self"
self.event(**castDup)
else:
if cast["channel"].lower() == self.nickname.lower():
cast["mtype"] = cast["type"]
cast["type"] = "query"
#self.event(**castDup)
# Don't call self.event for this one because queries are not events on a
# channel, but we still want to see them
# we have been kicked
if "user" in cast.keys(): if "user" in cast.keys():
if cast["user"].lower() == self.nickname.lower(): if cast["user"].lower() == self.nickname.lower():
cast["num"] = self.num
castDup = deepcopy(cast) castDup = deepcopy(cast)
castDup["mtype"] = cast["type"] castDup["mtype"] = cast["type"]
castDup["type"] = "self" castDup["type"] = "self"
cast["num"] = self.num
self.event(**castDup) self.event(**castDup)
# we sent a message/left/joined/kick someone/quit
if "nick" in cast.keys(): if "nick" in cast.keys():
if cast["nick"].lower() == self.nickname.lower(): if cast["nick"].lower() == self.nickname.lower():
cast["num"] = self.num
castDup = deepcopy(cast) castDup = deepcopy(cast)
castDup["mtype"] = cast["type"] castDup["mtype"] = cast["type"]
castDup["type"] = "self" castDup["type"] = "self"
cast["num"] = self.num
if not cast["channel"].lower() == self.nickname.lower(): # modes has been set on us directly # we have been mentioned in a msg/notice/action/part/quit/topic message
self.event(**castDup) # don't tell anyone else if "msg" in cast.keys(): # Don't highlight queries
if "msg" in cast.keys() and not cast["type"] == "query": # Don't highlight queries
if not cast["msg"] == None: if not cast["msg"] == None:
if self.nickname.lower() in cast["msg"].lower(): if self.nickname.lower() in cast["msg"].lower():
cast["num"] = self.num
castDup = deepcopy(cast) castDup = deepcopy(cast)
castDup["mtype"] = cast["type"] castDup["mtype"] = cast["type"]
castDup["type"] = "highlight" castDup["type"] = "highlight"
cast["num"] = self.num
self.event(**castDup) self.event(**castDup)
if not "net" in cast.keys(): if not "net" in cast.keys():
cast["net"] = self.net cast["net"] = self.net
if not "num" in cast.keys(): if not "num" in cast.keys():
print("no num", cast)
cast["num"] = self.num cast["num"] = self.num
counters.event(self.net, cast["type"]) counters.event(self.net, cast["type"])
monitor.event(self.net, cast) monitor.event(self.net, cast)
@ -596,22 +619,22 @@ class IRCBot(IRCClient):
#log("Can no longer cover %s, removing records" % channel)# as it will only be matched once -- #log("Can no longer cover %s, removing records" % channel)# as it will only be matched once --
# other bots have different nicknames so # other bots have different nicknames so
def left(self, user, channel, message): # even if they saw it, they wouldn't react def left(self, user, channel, message): # even if they saw it, they wouldn't react
self.event(type="part", muser=user, channel=channel, message=message) self.event(type="part", muser=user, channel=channel, msg=message)
self.botLeft(channel) self.botLeft(channel)
def userJoined(self, user, channel): def userJoined(self, user, channel):
self.event(type="join", muser=user, channel=channel) self.event(type="join", muser=user, channel=channel)
def userLeft(self, user, channel, message): def userLeft(self, user, channel, message):
self.event(type="part", muser=user, channel=channel, message=message) self.event(type="part", muser=user, channel=channel, msg=message)
def userQuit(self, user, quitMessage): def userQuit(self, user, quitMessage):
self.chanlessEvent({"type": "quit", "muser": user, "message": quitMessage}) self.chanlessEvent({"type": "quit", "muser": user, "msg": quitMessage})
def userKicked(self, kickee, channel, kicker, message): def userKicked(self, kickee, channel, kicker, message):
if kickee.lower() == self.nickname.lower(): if kickee.lower() == self.nickname.lower():
self.botLeft(channel) self.botLeft(channel)
self.event(type="kick", muser=kicker, channel=channel, message=message, user=kickee) self.event(type="kick", muser=kicker, channel=channel, msg=message, user=kickee)
def chanlessEvent(self, cast): def chanlessEvent(self, cast):
cast["nick"], cast["ident"], cast["host"] = parsen(cast["muser"]) cast["nick"], cast["ident"], cast["host"] = parsen(cast["muser"])
@ -635,13 +658,13 @@ class IRCBot(IRCClient):
self.chanlessEvent({"type": "nick", "muser": oldname, "user": newname}) self.chanlessEvent({"type": "nick", "muser": oldname, "user": newname})
def topicUpdated(self, user, channel, newTopic): def topicUpdated(self, user, channel, newTopic):
self.event(type="topic", muser=user, channel=channel, message= newTopic) self.event(type="topic", muser=user, channel=channel, msg=newTopic)
def modeChanged(self, user, channel, toset, modes, args): def modeChanged(self, user, channel, toset, modes, args):
argList = list(args) argList = list(args)
modeList = [i for i in modes] modeList = [i for i in modes]
for a, m in zip(argList, modeList): for a, m in zip(argList, modeList):
self.event(type="mode", muser=user, channel=channel, modes=m, status=toset, modeargs=a) self.event(type="mode", muser=user, channel=channel, mode=m, status=toset, modearg=a)
class IRCBotFactory(ReconnectingClientFactory): class IRCBotFactory(ReconnectingClientFactory):
def __init__(self, net, num=None, relayCommands=None, user=None, stage2=None): def __init__(self, net, num=None, relayCommands=None, user=None, stage2=None):

View File

@ -1,7 +1,6 @@
from twisted.internet.protocol import Protocol, Factory, ClientFactory from twisted.internet.protocol import Protocol, Factory, ClientFactory
from json import dumps, loads from json import dumps, loads
from copy import deepcopy from copy import deepcopy
from datetime import datetime
import main import main
from utils.logging.log import * from utils.logging.log import *
@ -136,6 +135,4 @@ def sendRelayNotification(cast):
for i in main.relayConnections.keys(): for i in main.relayConnections.keys():
if main.relayConnections[i].authed: if main.relayConnections[i].authed:
if cast["type"] in main.relayConnections[i].subscriptions: if cast["type"] in main.relayConnections[i].subscriptions:
newCast = deepcopy(cast) main.relayConnections[i].send(dumps(cast))
newCast["time"] = str(datetime.now().isoformat())
main.relayConnections[i].send(dumps(newCast))

View File

@ -1,6 +1,5 @@
from copy import deepcopy from copy import deepcopy
from json import dumps from json import dumps
from datetime import datetime
import main import main
from core.relay import sendRelayNotification from core.relay import sendRelayNotification
@ -9,8 +8,8 @@ from modules import regproc
from utils.dedup import dedup from utils.dedup import dedup
order = ["type", "net", "num", "channel", "msg", "nick", order = ["type", "net", "num", "channel", "msg", "nick",
"ident", "host", "mtype", "user", "modes", "modeargs" "ident", "host", "mtype", "user", "mode", "modearg",
"realname", "server", "status"] "realname", "server", "status", "time"]
def testNetTarget(name, target): def testNetTarget(name, target):
called = False called = False
@ -88,6 +87,7 @@ def event(numName, c): # yes I'm using a short variable because otherwise it goe
if "muser" in c.keys(): if "muser" in c.keys():
del c["muser"] del c["muser"]
sendRelayNotification({k: c[k] for k in order if k in c}) # Sort dict keys according to order sendRelayNotification({k: c[k] for k in order if k in c}) # Sort dict keys according to order
# only monitors below # only monitors below

View File

@ -1,11 +1,13 @@
from datetime import datetime from datetime import datetime
from csiphash import siphash24 from csiphash import siphash24
from copy import deepcopy
from json import dumps from json import dumps
import main import main
from utils.logging.debug import debug from utils.logging.debug import debug
def dedup(numName, c): def dedup(numName, b):
# deduplication c = deepcopy(b)
del c["time"]
c["approxtime"] = str(datetime.utcnow().timestamp())[:main.config["Tweaks"]["DedupPrecision"]] c["approxtime"] = str(datetime.utcnow().timestamp())[:main.config["Tweaks"]["DedupPrecision"]]
castHash = siphash24(main.hashKey, dumps(c, sort_keys=True).encode("utf-8")) castHash = siphash24(main.hashKey, dumps(c, sort_keys=True).encode("utf-8"))
del c["approxtime"] del c["approxtime"]