diff --git a/db.py b/db.py index 0574fc5..48ba6cb 100644 --- a/db.py +++ b/db.py @@ -47,7 +47,7 @@ async def store_kafka_batch(data): log.debug(f"Storing Kafka batch of {len(data)} messages") producer = AIOKafkaProducer(bootstrap_servers="kafka:9092") await producer.start() - batch = producer.create_batch() + topicmap = {} for msg in data: if msg["type"] in TYPES_MAIN: # index = "main" @@ -72,23 +72,39 @@ async def store_kafka_batch(data): # ): # msg[key] = str(value) body = orjson.dumps(msg) - # orjson returns bytes - # body = str.encode(message) if "ts" not in msg: raise Exception("No TS in msg") - metadata = batch.append(key=None, value=body, timestamp=msg["ts"]) - if metadata is None: - partitions = await producer.partitions_for(KAFKA_TOPIC) - partition = random.choice(tuple(partitions)) - await producer.send_batch(batch, KAFKA_TOPIC, partition=partition) - log.debug(f"{batch.record_count()} messages sent to partition {partition}") - batch = producer.create_batch() - continue + if KAFKA_TOPIC not in topicmap: + topicmap[KAFKA_TOPIC] = [body] + else: + topicmap[KAFKA_TOPIC].append(body) - partitions = await producer.partitions_for(KAFKA_TOPIC) - partition = random.choice(tuple(partitions)) - await producer.send_batch(batch, KAFKA_TOPIC, partition=partition) - log.debug(f"{batch.record_count()} messages sent to partition {partition}") + for topic, messages in topicmap.items(): + batch = producer.create_batch() + for body in messages: + metadata = batch.append(key=None, value=body, timestamp=msg["ts"]) + if metadata is None: + partitions = await producer.partitions_for(topic) + partition = random.choice(tuple(partitions)) + await producer.send_batch(batch, topic, partition=partition) + log.debug( + ( + f"{batch.record_count()} messages sent to topic " + f"{topic} partition {partition}" + ) + ) + batch = producer.create_batch() + continue + + partitions = await producer.partitions_for(topic) + partition = random.choice(tuple(partitions)) + await producer.send_batch(batch, topic, partition=partition) + log.debug( + ( + f"{batch.record_count()} messages sent to topic " + f"{topic} partition {partition}" + ) + ) await producer.stop() diff --git a/sources/ch4.py b/sources/ch4.py index 5c3d6be..88b65fa 100644 --- a/sources/ch4.py +++ b/sources/ch4.py @@ -3,6 +3,7 @@ import asyncio import random import string from math import ceil +from os import getenv import aiohttp from numpy import array_split @@ -10,8 +11,6 @@ from numpy import array_split import db import util -from os import getenv - # CONFIGURATION # # Number of 4chan threads to request at once diff --git a/sources/ingest.py b/sources/ingest.py index 6b9f0a1..5f6b9e0 100644 --- a/sources/ingest.py +++ b/sources/ingest.py @@ -1,11 +1,11 @@ import asyncio +from os import getenv import orjson import db import util from processing import process -from os import getenv SOURCES = ["4ch", "irc", "dis"] KEYPREFIX = "queue."