diff --git a/db.py b/db.py index 45ae7c1..19d3ea4 100644 --- a/db.py +++ b/db.py @@ -1,6 +1,7 @@ from os import getenv import aioredis +import msgpack import orjson import redis @@ -21,6 +22,10 @@ r = redis.from_url("redis://ssdb:1289", db=0) # AIORedis ar = aioredis.from_url("redis://ssdb:1289", db=0) +# Neptune redis for PubSub +pr = aioredis.from_url("redis://redis_neptune:6379", db=10) + + TYPES_MAIN = [ "msg", "notice", @@ -43,6 +48,7 @@ MAIN_SRC_MAP = { TYPES_META = ["who"] TYPES_INT = ["conn", "highlight", "znc", "query", "self"] KEYNAME = "queue" +MESSAGE_KEY = "messages" ELASTICSEARCH_USERNAME = getenv("ELASTICSEARCH_USERNAME", "elastic") ELASTICSEARCH_PASSWORD = getenv("ELASTICSEARCH_PASSWORD", "changeme") @@ -127,6 +133,10 @@ async def store_batch(data): else: indexmap[INDEX].append(msg) + # Pack the indexmap with msgpack and publish it to Neptune + packed_index = msgpack.packb(indexmap, use_bin_type=True) + await pr.publish(MESSAGE_KEY, packed_index) + for index, index_messages in indexmap.items(): for message in index_messages: result = await client.index(index=index, body=message) @@ -139,6 +149,7 @@ async def queue_message(msg): """ Queue a message on the Redis buffer. """ + # TODO: msgpack message = orjson.dumps(msg) await ar.lpush(KEYNAME, message) @@ -148,5 +159,6 @@ async def queue_message_bulk(data): Queue multiple messages on the Redis buffer. """ for msg in data: + # TODO: msgpack message = orjson.dumps(msg) await ar.lpush(KEYNAME, message) diff --git a/docker/requirements.txt b/docker/requirements.txt index 2cc570f..6c8a352 100644 --- a/docker/requirements.txt +++ b/docker/requirements.txt @@ -21,3 +21,4 @@ python-Levenshtein orjson uvloop elasticsearch[async] +msgpack diff --git a/processing/process.py b/processing/process.py index 20f2eed..846f882 100644 --- a/processing/process.py +++ b/processing/process.py @@ -162,6 +162,7 @@ def process_data(chunk, index, chunk_size): msg = db.r.rpop(KEYNAME) if not msg: return + # TODO: msgpack msg = orjson.loads(msg) total_start = time.process_time() # normalise fields diff --git a/requirements.txt b/requirements.txt index 866bafa..22ecb0f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -22,3 +22,4 @@ python-Levenshtein orjson uvloop elasticsearch[async] +msgpack