From 87b81ac236655223c17352d3d1ca8e7fb158051f Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Fri, 13 Jan 2023 07:20:27 +0000 Subject: [PATCH] Retry Redis ingest if it failed --- db.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/db.py b/db.py index 19d3ea4..99d60a5 100644 --- a/db.py +++ b/db.py @@ -1,3 +1,4 @@ +import asyncio from os import getenv import aioredis @@ -25,7 +26,6 @@ ar = aioredis.from_url("redis://ssdb:1289", db=0) # Neptune redis for PubSub pr = aioredis.from_url("redis://redis_neptune:6379", db=10) - TYPES_MAIN = [ "msg", "notice", @@ -135,7 +135,17 @@ async def store_batch(data): # Pack the indexmap with msgpack and publish it to Neptune packed_index = msgpack.packb(indexmap, use_bin_type=True) - await pr.publish(MESSAGE_KEY, packed_index) + completed_publish = False + for i in range(10): + if completed_publish: + break + try: + await pr.publish(MESSAGE_KEY, packed_index) + completed_publish = True + except aioredis.exceptions.ConnectionError: + await asyncio.sleep(0.1) + if not completed_publish: + log.error("Failed to publish to Neptune") for index, index_messages in indexmap.items(): for message in index_messages: