diff --git a/db.py b/db.py index 5cf7a30..15cb27d 100644 --- a/db.py +++ b/db.py @@ -1,5 +1,6 @@ from math import ceil +import aioredis import manticoresearch import ujson from manticoresearch.rest import ApiException @@ -7,21 +8,51 @@ from numpy import array_split from redis import StrictRedis import util -from schemas.mc_s import schema -import aioredis +from schemas import mc_s + configuration = manticoresearch.Configuration(host="http://monolith-db-1:9308") api_client = manticoresearch.ApiClient(configuration) api_instance = manticoresearch.IndexApi(api_client) log = util.get_logger("db") + +# Redis (legacy) r = StrictRedis(unix_socket_path="/var/run/redis/redis.sock", db=0) -ar = aioredis.from_url("unix:///var/run/redis/redis.sock") + +# AIORedis +ar = aioredis.from_url("unix:///var/run/redis/redis.sock", db=0) + +TYPES_MAIN = [ + "msg", + "notice", + "action", + "part", + "join", + "kick", + "quit", + "nick", + "mode", + "topic", +] +TYPES_META = ["who"] +TYPES_INT = ["conn", "highlight", "znc", "query", "self"] + def store_message(msg): """ Store a message into Manticore :param msg: dict """ + # Duplicated to avoid extra function call + if msg["type"] in TYPES_MAIN: + index = "main" + schema = mc_s.schema_main + elif msg["type"] in TYPES_META: + index = "meta" + schema = mc_s.schema_meta + elif msg["type"] in TYPES_INT: + index = "internal" + schema = mc_s.schema_int # normalise fields for key, value in list(msg.items()): if value is None: @@ -31,7 +62,7 @@ def store_message(msg): if schema[key].startswith("string"): msg[key] = str(value) - body = [{"insert": {"index": "main", "doc": msg}}] + body = [{"insert": {"index": index, "doc": msg}}] body_post = "" for item in body: body_post += ujson.dumps(item) @@ -44,6 +75,7 @@ def store_message(msg): # print(api_response) except ApiException as e: print("Exception when calling IndexApi->bulk: %s\n" % e) + print("ATTEMPT", body_post) def store_message_bulk(data): @@ -59,6 +91,16 @@ def store_message_bulk(data): for messages in split_posts: total = [] for msg in messages: + # Duplicated to avoid extra function call (see above) + if msg["type"] in TYPES_MAIN: + index = "main" + schema = mc_s.schema_main + elif msg["type"] in TYPES_META: + index = "meta" + schema = mc_s.schema_meta + elif msg["type"] in TYPES_INT: + index = "internal" + schema = mc_s.schema_int # normalise fields for key, value in list(msg.items()): if value is None: @@ -68,7 +110,7 @@ def store_message_bulk(data): if schema[key].startswith("string"): msg[key] = str(value) - body = {"insert": {"index": "main", "doc": msg}} + body = {"insert": {"index": index, "doc": msg}} total.append(body) body_post = "" @@ -80,9 +122,10 @@ def store_message_bulk(data): try: # Bulk index operations api_response = api_instance.bulk(body_post) # , async_req=True - # print(api_response) + print(api_response) except ApiException as e: print("Exception when calling IndexApi->bulk: %s\n" % e) + print("ATTEMPT", body_post) def update_schema(): @@ -91,11 +134,19 @@ def update_schema(): def create_index(api_client): util_instance = manticoresearch.UtilsApi(api_client) - schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()]) - - create_query = f"create table if not exists main({schema_types}) engine='columnar'" - print("Schema types", create_query) - util_instance.sql(create_query) + schemas = { + "main": mc_s.schema_main, + "meta": mc_s.schema_meta, + "internal": mc_s.schema_int, + } + for name, schema in schemas.items(): + schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()]) + + create_query = ( + f"create table if not exists {name}({schema_types}) engine='columnar'" + ) + print("Schema types", create_query) + util_instance.sql(create_query) create_index(api_client) diff --git a/docker/docker-compose.prod.yml b/docker/docker-compose.prod.yml index 238b61d..7ff8920 100644 --- a/docker/docker-compose.prod.yml +++ b/docker/docker-compose.prod.yml @@ -13,6 +13,23 @@ services: depends_on: - db + threshold: + image: pathogen/threshold:latest + build: ./legacy/docker + volumes: + - ${PORTAINER_GIT_DIR}:/code + - ${THRESHOLD_CONFIG_DIR}:/code/legacy/conf/live + #- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates + - ${THRESHOLD_CERT_DIR}:/code/legacy/conf/cert + ports: + - "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}" + - "${THRESHOLD_RELAY_PORT}:${THRESHOLD_RELAY_PORT}" + - "${THRESHOLD_API_PORT}:${THRESHOLD_API_PORT}" + env_file: + - ../stack.env + volumes_from: + - tmp + db: image: manticoresearch/manticore restart: always diff --git a/legacy/core/bot.py b/legacy/core/bot.py index d62decb..05a3ecc 100644 --- a/legacy/core/bot.py +++ b/legacy/core/bot.py @@ -154,7 +154,7 @@ class IRCBot(IRCClient): def event(self, **cast): if "ts" not in cast.keys(): - cast["ts"] = str(datetime.now().isoformat()) + cast["ts"] = int(datetime.now().timestamp()) # remove odd stuff for i in list( @@ -832,7 +832,7 @@ class IRCBot(IRCClient): self.event(type="kick", muser=kicker, channel=channel, msg=message, user=kickee) def chanlessEvent(self, cast): - cast["ts"] = str(datetime.now().isoformat()) + cast["ts"] = int(datetime.now().timestamp()) cast["nick"], cast["ident"], cast["host"] = parsen(cast["muser"]) if dedup(self.name, cast): # Needs to be kept self.name until the dedup # function is converted to the new net, num diff --git a/monolith.py b/monolith.py index 6deb723..fbead0a 100644 --- a/monolith.py +++ b/monolith.py @@ -4,6 +4,7 @@ from os import getenv import util from sources.ch4 import Chan4 from sources.dis import DiscordClient +from sources.ingest import Ingest # For development # if not getenv("DISCORD_TOKEN", None): @@ -29,7 +30,11 @@ async def main(loop): log.info("Starting 4chan handler.") chan = Chan4() - await chan.run() + loop.create_task(chan.run()) + + log.info("Starting ingest handler.") + ingest = Ingest() + loop.create_task(ingest.run()) loop = asyncio.get_event_loop() diff --git a/schemas/__init__.py b/schemas/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/schemas/mc_s.py b/schemas/mc_s.py index 0b18db7..3784140 100644 --- a/schemas/mc_s.py +++ b/schemas/mc_s.py @@ -1,4 +1,4 @@ -schema = { +schema_main = { "id": "bigint", # 1 "archived": "int", @@ -130,3 +130,67 @@ schema = { # 1, 2 "version_tokens": "int", } + +schema_meta = { + "id": "bigint", + # 393598265, #main, Rust Programmer's Club + "channel": "text", + # 9f7b2e6a0e9b + "host": "text", + # "522, trans rights shill", myname + "ident": "text", + # The quick brown fox jumped over the lazy dog + "msg": "text", + # pol + "net": "text", + # André de Santa Cruz, santa + "nick": "text", + # 1, 2, 3, 4, 5, 6, ... + "num": "int", + # Greens + "realname": "text", + # irc.freenode.net + "server": "text", + # 4ch, irc, dis + "src": "string indexed attribute", + # true, false + "status": "bool", + # 2022-09-02T16:10:36 + "ts": "timestamp", + # msg, notice, update, who + "type": "string indexed attribute", +} + +schema_int = { + "id": "bigint", + # 393598265, #main, Rust Programmer's Club + "channel": "text", + # 9f7b2e6a0e9b + "host": "text", + # "522, trans rights shill", myname + "ident": "text", + # 0 + "mode": "string indexed attribute", + # b0n3 + "modearg": "string indexed attribute", + # The quick brown fox jumped over the lazy dog + "msg": "text", + # pol + "net": "text", + # André de Santa Cruz, santa + "nick": "text", + # 1, 2, 3, 4, 5, 6, ... + "num": "int", + # 4ch, irc, dis + "src": "string indexed attribute", + # true, false + "status": "bool", + # 2022-09-02T16:10:36 + "ts": "timestamp", + # Anonymous + "user": "text", + # msg, notice, update, who + "type": "string indexed attribute", + # msg, notice, update, who + "mtype": "string indexed attribute", +} diff --git a/sources/ingest.py b/sources/ingest.py new file mode 100644 index 0000000..afacd53 --- /dev/null +++ b/sources/ingest.py @@ -0,0 +1,33 @@ +import db +import util +import ujson +import asyncio + +SOURCES = ["irc"] +KEYPREFIX = "queue." +CHUNK_SIZE = 1000 +ITER_DELAY = 0.5 + +class Ingest(object): + def __init__(self): + name = self.__class__.__name__ + self.log = util.get_logger(name) + + async def run(self): + while True: + await self.process_chunk() + await asyncio.sleep(ITER_DELAY) + + async def process_chunk(self): + items = [] + for source in SOURCES: + key = f"{KEYPREFIX}{source}" + chunk = await db.ar.spop(key, CHUNK_SIZE) + if not chunk: + continue + self.log.info(f"Got chunk: {chunk}") + for item in chunk: + item = ujson.loads(item) + self.log.info(f"Got item: {item}") + items.append(item) + db.store_message_bulk(items)