From 63081f68b7e13dfb42381ec025d6baad265e7949 Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Fri, 30 Sep 2022 07:22:22 +0100 Subject: [PATCH] Use only one Redis key for the queue to make chunk size more precise for thread allocation --- db.py | 14 +++++++------- docker-compose.yml | 4 ++-- docker/docker-compose.prod.yml | 4 ++-- legacy/modules/monitor.py | 2 +- monolith.py | 1 - processing/process.py | 5 ++--- sources/ingest.py | 21 +++++++++++---------- 7 files changed, 25 insertions(+), 26 deletions(-) diff --git a/db.py b/db.py index cfdb429..303a18e 100644 --- a/db.py +++ b/db.py @@ -40,7 +40,7 @@ MAIN_SRC_MAP = { TYPES_META = ["who"] TYPES_INT = ["conn", "highlight", "znc", "query", "self"] -KEYPREFIX = "queue." +KEYNAME = "queue" async def store_kafka_batch(data): @@ -113,12 +113,12 @@ async def queue_message(msg): """ Queue a message on the Redis buffer. """ - src = msg["src"] + # src = msg["src"] message = orjson.dumps(msg) - key = f"{KEYPREFIX}{src}" + # key = f"{KEYPREFIX}{src}" # log.debug(f"Queueing single message of string length {len(message)}") - await ar.sadd(key, message) + await ar.sadd(KEYNAME, message) async def queue_message_bulk(data): @@ -127,8 +127,8 @@ async def queue_message_bulk(data): """ # log.debug(f"Queueing message batch of length {len(data)}") for msg in data: - src = msg["src"] + # src = msg["src"] message = orjson.dumps(msg) - key = f"{KEYPREFIX}{src}" - await ar.sadd(key, message) + # key = f"{KEYPREFIX}{src}" + await ar.sadd(KEYNAME, message) diff --git a/docker-compose.yml b/docker-compose.yml index 29255aa..90e9e2e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -127,11 +127,11 @@ services: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_MESSAGE_MAX_BYTES: 2000000 - KAFKA_HEAP_OPTS: -Xmx2g + #KAFKA_HEAP_OPTS: -Xmx2g healthcheck: test: ["CMD", "kafka-topics.sh", "--list", "--bootstrap-server", "kafka:9092"] start_period: 15s - interval: 2s + interval: 30s timeout: 30s retries: 45 diff --git a/docker/docker-compose.prod.yml b/docker/docker-compose.prod.yml index 674608b..6241bb7 100644 --- a/docker/docker-compose.prod.yml +++ b/docker/docker-compose.prod.yml @@ -121,11 +121,11 @@ services: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_MESSAGE_MAX_BYTES: 2000000 - KAFKA_HEAP_OPTS: -Xmx2g + #KAFKA_HEAP_OPTS: -Xmx2g healthcheck: test: ["CMD", "kafka-topics.sh", "--list", "--bootstrap-server", "kafka:9092"] start_period: 15s - interval: 2s + interval: 30s timeout: 30s retries: 45 diff --git a/legacy/modules/monitor.py b/legacy/modules/monitor.py index c8f8e7e..e92c303 100644 --- a/legacy/modules/monitor.py +++ b/legacy/modules/monitor.py @@ -67,7 +67,7 @@ def parsemeta(numName, c): def queue_message(c): message = json.dumps(c) - main.g.sadd(main.config["Ingest"]["Key"], message) + main.g.sadd("queue", message) def event( diff --git a/monolith.py b/monolith.py index e391b50..3585e8f 100644 --- a/monolith.py +++ b/monolith.py @@ -1,5 +1,4 @@ import asyncio -import sys from os import getenv import uvloop diff --git a/processing/process.py b/processing/process.py index e0ba858..6110b92 100644 --- a/processing/process.py +++ b/processing/process.py @@ -101,18 +101,17 @@ hash_key = get_hash_key() @asyncio.coroutine async def spawn_processing_threads(data): len_data = len(data) - # log.debug(f"Spawning processing threads for batch of {len_data} messages") loop = asyncio.get_event_loop() tasks = [] - if len(data) < CPU_THREADS: + if len(data) < CPU_THREADS * 100: split_data = [data] else: msg_per_core = int(len(data) / CPU_THREADS) split_data = array_split(data, ceil(len(data) / msg_per_core)) for index, split in enumerate(split_data): - # log.debug(f"Delegating processing of {len(split)} messages to thread {index}") + log.debug(f"Delegating processing of {len(split)} messages to thread {index}") task = loop.run_in_executor(p, process_data, split) tasks.append(task) diff --git a/sources/ingest.py b/sources/ingest.py index f481a98..266abba 100644 --- a/sources/ingest.py +++ b/sources/ingest.py @@ -7,12 +7,13 @@ import db import util from processing import process -SOURCES = ["4ch", "irc", "dis"] +# SOURCES = ["4ch", "irc", "dis"] # DEBUG CODE REMOVE ME # SOURCES.remove("4ch") # SOURCES.remove("dis") # DEBUG CODE REMOVE ME -KEYPREFIX = "queue." +# KEYPREFIX = "queue." +KEYNAME = "queue" # Chunk size per source (divide by len(SOURCES) for total) CHUNK_SIZE = int(getenv("MONOLITH_INGEST_CHUNK_SIZE", "900")) @@ -39,13 +40,13 @@ class Ingest(object): async def get_chunk(self): items = [] - for source in SOURCES: - key = f"{KEYPREFIX}{source}" - chunk = await db.ar.spop(key, CHUNK_SIZE) - if not chunk: - continue - for item in chunk: - item = orjson.loads(item) - items.append(item) + # for source in SOURCES: + # key = f"{KEYPREFIX}{source}" + chunk = await db.ar.spop(KEYNAME, CHUNK_SIZE) + if not chunk: + return + for item in chunk: + item = orjson.loads(item) + items.append(item) if items: await process.spawn_processing_threads(items)