diff --git a/db.py b/db.py index 303a18e..6b20baf 100644 --- a/db.py +++ b/db.py @@ -113,11 +113,7 @@ async def queue_message(msg): """ Queue a message on the Redis buffer. """ - # src = msg["src"] message = orjson.dumps(msg) - - # key = f"{KEYPREFIX}{src}" - # log.debug(f"Queueing single message of string length {len(message)}") await ar.sadd(KEYNAME, message) @@ -125,10 +121,6 @@ async def queue_message_bulk(data): """ Queue multiple messages on the Redis buffer. """ - # log.debug(f"Queueing message batch of length {len(data)}") for msg in data: - # src = msg["src"] message = orjson.dumps(msg) - - # key = f"{KEYPREFIX}{src}" await ar.sadd(KEYNAME, message) diff --git a/sources/ingest.py b/sources/ingest.py index 266abba..f88d23d 100644 --- a/sources/ingest.py +++ b/sources/ingest.py @@ -7,15 +7,9 @@ import db import util from processing import process -# SOURCES = ["4ch", "irc", "dis"] -# DEBUG CODE REMOVE ME -# SOURCES.remove("4ch") -# SOURCES.remove("dis") -# DEBUG CODE REMOVE ME -# KEYPREFIX = "queue." KEYNAME = "queue" -# Chunk size per source (divide by len(SOURCES) for total) +# Chunk size CHUNK_SIZE = int(getenv("MONOLITH_INGEST_CHUNK_SIZE", "900")) ITER_DELAY = float(getenv("MONOLITH_INGEST_ITER_DELAY", "0.5"))