Retry Redis ingest if it failed
This commit is contained in:
parent
02071758b5
commit
87b81ac236
12
db.py
12
db.py
|
@ -1,3 +1,4 @@
|
||||||
|
import asyncio
|
||||||
from os import getenv
|
from os import getenv
|
||||||
|
|
||||||
import aioredis
|
import aioredis
|
||||||
|
@ -25,7 +26,6 @@ ar = aioredis.from_url("redis://ssdb:1289", db=0)
|
||||||
# Neptune redis for PubSub
|
# Neptune redis for PubSub
|
||||||
pr = aioredis.from_url("redis://redis_neptune:6379", db=10)
|
pr = aioredis.from_url("redis://redis_neptune:6379", db=10)
|
||||||
|
|
||||||
|
|
||||||
TYPES_MAIN = [
|
TYPES_MAIN = [
|
||||||
"msg",
|
"msg",
|
||||||
"notice",
|
"notice",
|
||||||
|
@ -135,7 +135,17 @@ async def store_batch(data):
|
||||||
|
|
||||||
# Pack the indexmap with msgpack and publish it to Neptune
|
# Pack the indexmap with msgpack and publish it to Neptune
|
||||||
packed_index = msgpack.packb(indexmap, use_bin_type=True)
|
packed_index = msgpack.packb(indexmap, use_bin_type=True)
|
||||||
|
completed_publish = False
|
||||||
|
for i in range(10):
|
||||||
|
if completed_publish:
|
||||||
|
break
|
||||||
|
try:
|
||||||
await pr.publish(MESSAGE_KEY, packed_index)
|
await pr.publish(MESSAGE_KEY, packed_index)
|
||||||
|
completed_publish = True
|
||||||
|
except aioredis.exceptions.ConnectionError:
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
if not completed_publish:
|
||||||
|
log.error("Failed to publish to Neptune")
|
||||||
|
|
||||||
for index, index_messages in indexmap.items():
|
for index, index_messages in indexmap.items():
|
||||||
for message in index_messages:
|
for message in index_messages:
|
||||||
|
|
Loading…
Reference in New Issue