Switch to SSDB for message queueing

This commit is contained in:
Mark Veidemanis 2022-10-21 07:20:30 +01:00
parent 8c596ec516
commit e32b330ef4
Signed by: m
GPG Key ID: 5ACFCEED46C0904F
4 changed files with 8 additions and 18 deletions

6
db.py
View File

@ -44,8 +44,6 @@ KEYNAME = "queue"
async def store_kafka_batch(data): async def store_kafka_batch(data):
print("FAKE STORE KAFKA BATCH")
return
# log.debug(f"Storing Kafka batch of {len(data)} messages") # log.debug(f"Storing Kafka batch of {len(data)} messages")
producer = AIOKafkaProducer(bootstrap_servers="kafka:9092") producer = AIOKafkaProducer(bootstrap_servers="kafka:9092")
await producer.start() await producer.start()
@ -116,7 +114,7 @@ async def queue_message(msg):
Queue a message on the Redis buffer. Queue a message on the Redis buffer.
""" """
message = orjson.dumps(msg) message = orjson.dumps(msg)
await ar.zadd(KEYNAME, message) await ar.lpush(KEYNAME, message)
async def queue_message_bulk(data): async def queue_message_bulk(data):
@ -125,4 +123,4 @@ async def queue_message_bulk(data):
""" """
for msg in data: for msg in data:
message = orjson.dumps(msg) message = orjson.dumps(msg)
await ar.zadd(KEYNAME, message) await ar.lpush(KEYNAME, message)

View File

@ -9,17 +9,6 @@ services:
- ${PORTAINER_GIT_DIR}:/code - ${PORTAINER_GIT_DIR}:/code
env_file: env_file:
- ../stack.env - ../stack.env
# volumes_from:
# - tmp
# depends_on:
# broker:
# condition: service_started
# kafka:
# condition: service_healthy
# tmp:
# condition: service_started
# redis:
# condition: service_healthy
threshold: threshold:
image: pathogen/threshold:latest image: pathogen/threshold:latest
@ -28,7 +17,7 @@ services:
volumes: volumes:
- ${PORTAINER_GIT_DIR}:/code - ${PORTAINER_GIT_DIR}:/code
- ${THRESHOLD_CONFIG_DIR}:/code/legacy/conf/live - ${THRESHOLD_CONFIG_DIR}:/code/legacy/conf/live
#- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates - ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates
- ${THRESHOLD_CERT_DIR}:/code/legacy/conf/cert - ${THRESHOLD_CERT_DIR}:/code/legacy/conf/cert
ports: ports:
- "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}" - "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}"

View File

@ -67,7 +67,7 @@ def parsemeta(numName, c):
def queue_message(c): def queue_message(c):
message = json.dumps(c) message = json.dumps(c)
main.g.sadd("queue", message) main.g.lpush("queue", message)
def event( def event(

View File

@ -36,7 +36,10 @@ class Ingest(object):
items = [] items = []
# for source in SOURCES: # for source in SOURCES:
# key = f"{KEYPREFIX}{source}" # key = f"{KEYPREFIX}{source}"
chunk = await db.ar.zpop(KEYNAME, CHUNK_SIZE) length = await db.ar.llen(KEYNAME)
start_num = length - CHUNK_SIZE
chunk = await db.ar.lrange(KEYNAME, start_num, -1)
# chunk = await db.ar.rpop(KEYNAME, CHUNK_SIZE)
if not chunk: if not chunk:
return return
for item in chunk: for item in chunk: