From e32b330ef49b038e99822a6aba45582ce80a7b4a Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Fri, 21 Oct 2022 07:20:30 +0100 Subject: [PATCH] Switch to SSDB for message queueing --- db.py | 6 ++---- docker/docker-compose.prod.yml | 13 +------------ legacy/modules/monitor.py | 2 +- sources/ingest.py | 5 ++++- 4 files changed, 8 insertions(+), 18 deletions(-) diff --git a/db.py b/db.py index ee6d471..996c586 100644 --- a/db.py +++ b/db.py @@ -44,8 +44,6 @@ KEYNAME = "queue" async def store_kafka_batch(data): - print("FAKE STORE KAFKA BATCH") - return # log.debug(f"Storing Kafka batch of {len(data)} messages") producer = AIOKafkaProducer(bootstrap_servers="kafka:9092") await producer.start() @@ -116,7 +114,7 @@ async def queue_message(msg): Queue a message on the Redis buffer. """ message = orjson.dumps(msg) - await ar.zadd(KEYNAME, message) + await ar.lpush(KEYNAME, message) async def queue_message_bulk(data): @@ -125,4 +123,4 @@ async def queue_message_bulk(data): """ for msg in data: message = orjson.dumps(msg) - await ar.zadd(KEYNAME, message) + await ar.lpush(KEYNAME, message) diff --git a/docker/docker-compose.prod.yml b/docker/docker-compose.prod.yml index 9edd3fe..1398cac 100644 --- a/docker/docker-compose.prod.yml +++ b/docker/docker-compose.prod.yml @@ -9,17 +9,6 @@ services: - ${PORTAINER_GIT_DIR}:/code env_file: - ../stack.env - # volumes_from: - # - tmp - # depends_on: - # broker: - # condition: service_started - # kafka: - # condition: service_healthy - # tmp: - # condition: service_started - # redis: - # condition: service_healthy threshold: image: pathogen/threshold:latest @@ -28,7 +17,7 @@ services: volumes: - ${PORTAINER_GIT_DIR}:/code - ${THRESHOLD_CONFIG_DIR}:/code/legacy/conf/live - #- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates + - ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates - ${THRESHOLD_CERT_DIR}:/code/legacy/conf/cert ports: - "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}" diff --git a/legacy/modules/monitor.py b/legacy/modules/monitor.py index e92c303..c9189b3 100644 --- a/legacy/modules/monitor.py +++ b/legacy/modules/monitor.py @@ -67,7 +67,7 @@ def parsemeta(numName, c): def queue_message(c): message = json.dumps(c) - main.g.sadd("queue", message) + main.g.lpush("queue", message) def event( diff --git a/sources/ingest.py b/sources/ingest.py index 39bf2a8..aae58df 100644 --- a/sources/ingest.py +++ b/sources/ingest.py @@ -36,7 +36,10 @@ class Ingest(object): items = [] # for source in SOURCES: # key = f"{KEYPREFIX}{source}" - chunk = await db.ar.zpop(KEYNAME, CHUNK_SIZE) + length = await db.ar.llen(KEYNAME) + start_num = length - CHUNK_SIZE + chunk = await db.ar.lrange(KEYNAME, start_num, -1) + # chunk = await db.ar.rpop(KEYNAME, CHUNK_SIZE) if not chunk: return for item in chunk: