diff --git a/db.py b/db.py index 19d3ea4..99d60a5 100644 --- a/db.py +++ b/db.py @@ -1,3 +1,4 @@ +import asyncio from os import getenv import aioredis @@ -25,7 +26,6 @@ ar = aioredis.from_url("redis://ssdb:1289", db=0) # Neptune redis for PubSub pr = aioredis.from_url("redis://redis_neptune:6379", db=10) - TYPES_MAIN = [ "msg", "notice", @@ -135,7 +135,17 @@ async def store_batch(data): # Pack the indexmap with msgpack and publish it to Neptune packed_index = msgpack.packb(indexmap, use_bin_type=True) - await pr.publish(MESSAGE_KEY, packed_index) + completed_publish = False + for i in range(10): + if completed_publish: + break + try: + await pr.publish(MESSAGE_KEY, packed_index) + completed_publish = True + except aioredis.exceptions.ConnectionError: + await asyncio.sleep(0.1) + if not completed_publish: + log.error("Failed to publish to Neptune") for index, index_messages in indexmap.items(): for message in index_messages: