Send messages to Neptune Redis via PubSub

This commit is contained in:
Mark Veidemanis 2023-01-12 07:20:43 +00:00
parent 0ab67becff
commit 02071758b5
Signed by: m
GPG Key ID: 5ACFCEED46C0904F
4 changed files with 15 additions and 0 deletions

12
db.py
View File

@ -1,6 +1,7 @@
from os import getenv from os import getenv
import aioredis import aioredis
import msgpack
import orjson import orjson
import redis import redis
@ -21,6 +22,10 @@ r = redis.from_url("redis://ssdb:1289", db=0)
# AIORedis # AIORedis
ar = aioredis.from_url("redis://ssdb:1289", db=0) ar = aioredis.from_url("redis://ssdb:1289", db=0)
# Neptune redis for PubSub
pr = aioredis.from_url("redis://redis_neptune:6379", db=10)
TYPES_MAIN = [ TYPES_MAIN = [
"msg", "msg",
"notice", "notice",
@ -43,6 +48,7 @@ MAIN_SRC_MAP = {
TYPES_META = ["who"] TYPES_META = ["who"]
TYPES_INT = ["conn", "highlight", "znc", "query", "self"] TYPES_INT = ["conn", "highlight", "znc", "query", "self"]
KEYNAME = "queue" KEYNAME = "queue"
MESSAGE_KEY = "messages"
ELASTICSEARCH_USERNAME = getenv("ELASTICSEARCH_USERNAME", "elastic") ELASTICSEARCH_USERNAME = getenv("ELASTICSEARCH_USERNAME", "elastic")
ELASTICSEARCH_PASSWORD = getenv("ELASTICSEARCH_PASSWORD", "changeme") ELASTICSEARCH_PASSWORD = getenv("ELASTICSEARCH_PASSWORD", "changeme")
@ -127,6 +133,10 @@ async def store_batch(data):
else: else:
indexmap[INDEX].append(msg) indexmap[INDEX].append(msg)
# Pack the indexmap with msgpack and publish it to Neptune
packed_index = msgpack.packb(indexmap, use_bin_type=True)
await pr.publish(MESSAGE_KEY, packed_index)
for index, index_messages in indexmap.items(): for index, index_messages in indexmap.items():
for message in index_messages: for message in index_messages:
result = await client.index(index=index, body=message) result = await client.index(index=index, body=message)
@ -139,6 +149,7 @@ async def queue_message(msg):
""" """
Queue a message on the Redis buffer. Queue a message on the Redis buffer.
""" """
# TODO: msgpack
message = orjson.dumps(msg) message = orjson.dumps(msg)
await ar.lpush(KEYNAME, message) await ar.lpush(KEYNAME, message)
@ -148,5 +159,6 @@ async def queue_message_bulk(data):
Queue multiple messages on the Redis buffer. Queue multiple messages on the Redis buffer.
""" """
for msg in data: for msg in data:
# TODO: msgpack
message = orjson.dumps(msg) message = orjson.dumps(msg)
await ar.lpush(KEYNAME, message) await ar.lpush(KEYNAME, message)

View File

@ -21,3 +21,4 @@ python-Levenshtein
orjson orjson
uvloop uvloop
elasticsearch[async] elasticsearch[async]
msgpack

View File

@ -162,6 +162,7 @@ def process_data(chunk, index, chunk_size):
msg = db.r.rpop(KEYNAME) msg = db.r.rpop(KEYNAME)
if not msg: if not msg:
return return
# TODO: msgpack
msg = orjson.loads(msg) msg = orjson.loads(msg)
total_start = time.process_time() total_start = time.process_time()
# normalise fields # normalise fields

View File

@ -22,3 +22,4 @@ python-Levenshtein
orjson orjson
uvloop uvloop
elasticsearch[async] elasticsearch[async]
msgpack