Begin switching away from Redis

This commit is contained in:
Mark Veidemanis 2022-10-21 11:14:51 +01:00
parent 7482064aee
commit ab5e85c5c6
3 changed files with 23 additions and 47 deletions

10
db.py
View File

@ -5,7 +5,7 @@ import orjson
# Kafka # Kafka
from aiokafka import AIOKafkaProducer from aiokafka import AIOKafkaProducer
from redis import StrictRedis import redis
import util import util
@ -14,10 +14,10 @@ import util
log = util.get_logger("db") log = util.get_logger("db")
# Redis (legacy) # Redis (legacy)
r = StrictRedis(unix_socket_path="/var/run/redis/redis.sock", db=0) r = redis.from_url("redis://ssdb:1289", db=0)
# AIORedis # AIORedis
ar = aioredis.from_url("unix:///var/run/redis/redis.sock", db=0) ar = aioredis.from_url("redis://ssdb:1289", db=0)
TYPES_MAIN = [ TYPES_MAIN = [
"msg", "msg",
@ -116,7 +116,7 @@ async def queue_message(msg):
Queue a message on the Redis buffer. Queue a message on the Redis buffer.
""" """
message = orjson.dumps(msg) message = orjson.dumps(msg)
await ar.sadd(KEYNAME, message) await ar.zadd(KEYNAME, message)
async def queue_message_bulk(data): async def queue_message_bulk(data):
@ -125,4 +125,4 @@ async def queue_message_bulk(data):
""" """
for msg in data: for msg in data:
message = orjson.dumps(msg) message = orjson.dumps(msg)
await ar.sadd(KEYNAME, message) await ar.zadd(KEYNAME, message)

View File

@ -4,22 +4,22 @@ services:
app: app:
image: pathogen/monolith:latest image: pathogen/monolith:latest
container_name: monolith container_name: monolith
build: ./docker build: ${PORTAINER_GIT_DIR}/docker
volumes: volumes:
- ${PORTAINER_GIT_DIR}:/code - ${PORTAINER_GIT_DIR}:/code
env_file: env_file:
- ../stack.env - ../stack.env
volumes_from: # volumes_from:
- tmp # - tmp
depends_on: # depends_on:
# broker: # broker:
# condition: service_started # condition: service_started
# kafka: # kafka:
# condition: service_healthy # condition: service_healthy
tmp: # tmp:
condition: service_started # condition: service_started
redis: # redis:
condition: service_healthy # condition: service_healthy
threshold: threshold:
image: pathogen/threshold:latest image: pathogen/threshold:latest
@ -39,40 +39,16 @@ services:
# for development # for development
extra_hosts: extra_hosts:
- "host.docker.internal:host-gateway" - "host.docker.internal:host-gateway"
volumes_from:
- tmp
depends_on:
tmp:
condition: service_started
redis:
condition: service_healthy
tmp: ssdb:
image: busybox image: tsl0922/ssdb
container_name: tmp_monolith container_name: ssdb_monolith
command: chmod -R 777 /var/run/redis
volumes: volumes:
- /var/run/redis - ssdb_data:/ssdb/var
ports:
redis: - "1289:1289"
image: redis environment:
container_name: redis_monolith - SSDB_PORT=1289
command: redis-server /etc/redis.conf
ulimits:
nproc: 65535
nofile:
soft: 65535
hard: 65535
volumes:
- ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf
- redis_data:/data
volumes_from:
- tmp
healthcheck:
test: "redis-cli -s /var/run/redis/redis.sock ping"
interval: 2s
timeout: 2s
retries: 15
networks: networks:
default: default:
@ -80,4 +56,4 @@ networks:
name: pathogen name: pathogen
volumes: volumes:
redis_data: {} ssdb_data: {}

View File

@ -36,7 +36,7 @@ class Ingest(object):
items = [] items = []
# for source in SOURCES: # for source in SOURCES:
# key = f"{KEYPREFIX}{source}" # key = f"{KEYPREFIX}{source}"
chunk = await db.ar.spop(KEYNAME, CHUNK_SIZE) chunk = await db.ar.zpop(KEYNAME, CHUNK_SIZE)
if not chunk: if not chunk:
return return
for item in chunk: for item in chunk: