From ab5e85c5c6a38845037079bdb61cb53b94fa22ae Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Fri, 21 Oct 2022 11:14:51 +0100 Subject: [PATCH] Begin switching away from Redis --- db.py | 10 +++--- docker/docker-compose.prod.yml | 58 ++++++++++------------------------ sources/ingest.py | 2 +- 3 files changed, 23 insertions(+), 47 deletions(-) diff --git a/db.py b/db.py index 715751c..ee6d471 100644 --- a/db.py +++ b/db.py @@ -5,7 +5,7 @@ import orjson # Kafka from aiokafka import AIOKafkaProducer -from redis import StrictRedis +import redis import util @@ -14,10 +14,10 @@ import util log = util.get_logger("db") # Redis (legacy) -r = StrictRedis(unix_socket_path="/var/run/redis/redis.sock", db=0) +r = redis.from_url("redis://ssdb:1289", db=0) # AIORedis -ar = aioredis.from_url("unix:///var/run/redis/redis.sock", db=0) +ar = aioredis.from_url("redis://ssdb:1289", db=0) TYPES_MAIN = [ "msg", @@ -116,7 +116,7 @@ async def queue_message(msg): Queue a message on the Redis buffer. """ message = orjson.dumps(msg) - await ar.sadd(KEYNAME, message) + await ar.zadd(KEYNAME, message) async def queue_message_bulk(data): @@ -125,4 +125,4 @@ async def queue_message_bulk(data): """ for msg in data: message = orjson.dumps(msg) - await ar.sadd(KEYNAME, message) + await ar.zadd(KEYNAME, message) diff --git a/docker/docker-compose.prod.yml b/docker/docker-compose.prod.yml index f189125..9edd3fe 100644 --- a/docker/docker-compose.prod.yml +++ b/docker/docker-compose.prod.yml @@ -4,22 +4,22 @@ services: app: image: pathogen/monolith:latest container_name: monolith - build: ./docker + build: ${PORTAINER_GIT_DIR}/docker volumes: - ${PORTAINER_GIT_DIR}:/code env_file: - ../stack.env - volumes_from: - - tmp - depends_on: + # volumes_from: + # - tmp + # depends_on: # broker: # condition: service_started # kafka: # condition: service_healthy - tmp: - condition: service_started - redis: - condition: service_healthy + # tmp: + # condition: service_started + # redis: + # condition: service_healthy threshold: image: pathogen/threshold:latest @@ -39,40 +39,16 @@ services: # for development extra_hosts: - "host.docker.internal:host-gateway" - volumes_from: - - tmp - depends_on: - tmp: - condition: service_started - redis: - condition: service_healthy - tmp: - image: busybox - container_name: tmp_monolith - command: chmod -R 777 /var/run/redis + ssdb: + image: tsl0922/ssdb + container_name: ssdb_monolith volumes: - - /var/run/redis - - redis: - image: redis - container_name: redis_monolith - command: redis-server /etc/redis.conf - ulimits: - nproc: 65535 - nofile: - soft: 65535 - hard: 65535 - volumes: - - ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf - - redis_data:/data - volumes_from: - - tmp - healthcheck: - test: "redis-cli -s /var/run/redis/redis.sock ping" - interval: 2s - timeout: 2s - retries: 15 + - ssdb_data:/ssdb/var + ports: + - "1289:1289" + environment: + - SSDB_PORT=1289 networks: default: @@ -80,4 +56,4 @@ networks: name: pathogen volumes: - redis_data: {} + ssdb_data: {} diff --git a/sources/ingest.py b/sources/ingest.py index f88d23d..39bf2a8 100644 --- a/sources/ingest.py +++ b/sources/ingest.py @@ -36,7 +36,7 @@ class Ingest(object): items = [] # for source in SOURCES: # key = f"{KEYPREFIX}{source}" - chunk = await db.ar.spop(KEYNAME, CHUNK_SIZE) + chunk = await db.ar.zpop(KEYNAME, CHUNK_SIZE) if not chunk: return for item in chunk: