Don't muddle up the topics when sending Kafka batches
This commit is contained in:
parent
e0803d4934
commit
027c43b60a
34
db.py
34
db.py
|
@ -47,7 +47,7 @@ async def store_kafka_batch(data):
|
||||||
log.debug(f"Storing Kafka batch of {len(data)} messages")
|
log.debug(f"Storing Kafka batch of {len(data)} messages")
|
||||||
producer = AIOKafkaProducer(bootstrap_servers="kafka:9092")
|
producer = AIOKafkaProducer(bootstrap_servers="kafka:9092")
|
||||||
await producer.start()
|
await producer.start()
|
||||||
batch = producer.create_batch()
|
topicmap = {}
|
||||||
for msg in data:
|
for msg in data:
|
||||||
if msg["type"] in TYPES_MAIN:
|
if msg["type"] in TYPES_MAIN:
|
||||||
# index = "main"
|
# index = "main"
|
||||||
|
@ -72,23 +72,39 @@ async def store_kafka_batch(data):
|
||||||
# ):
|
# ):
|
||||||
# msg[key] = str(value)
|
# msg[key] = str(value)
|
||||||
body = orjson.dumps(msg)
|
body = orjson.dumps(msg)
|
||||||
# orjson returns bytes
|
|
||||||
# body = str.encode(message)
|
|
||||||
if "ts" not in msg:
|
if "ts" not in msg:
|
||||||
raise Exception("No TS in msg")
|
raise Exception("No TS in msg")
|
||||||
|
if KAFKA_TOPIC not in topicmap:
|
||||||
|
topicmap[KAFKA_TOPIC] = [body]
|
||||||
|
else:
|
||||||
|
topicmap[KAFKA_TOPIC].append(body)
|
||||||
|
|
||||||
|
for topic, messages in topicmap.items():
|
||||||
|
batch = producer.create_batch()
|
||||||
|
for body in messages:
|
||||||
metadata = batch.append(key=None, value=body, timestamp=msg["ts"])
|
metadata = batch.append(key=None, value=body, timestamp=msg["ts"])
|
||||||
if metadata is None:
|
if metadata is None:
|
||||||
partitions = await producer.partitions_for(KAFKA_TOPIC)
|
partitions = await producer.partitions_for(topic)
|
||||||
partition = random.choice(tuple(partitions))
|
partition = random.choice(tuple(partitions))
|
||||||
await producer.send_batch(batch, KAFKA_TOPIC, partition=partition)
|
await producer.send_batch(batch, topic, partition=partition)
|
||||||
log.debug(f"{batch.record_count()} messages sent to partition {partition}")
|
log.debug(
|
||||||
|
(
|
||||||
|
f"{batch.record_count()} messages sent to topic "
|
||||||
|
f"{topic} partition {partition}"
|
||||||
|
)
|
||||||
|
)
|
||||||
batch = producer.create_batch()
|
batch = producer.create_batch()
|
||||||
continue
|
continue
|
||||||
|
|
||||||
partitions = await producer.partitions_for(KAFKA_TOPIC)
|
partitions = await producer.partitions_for(topic)
|
||||||
partition = random.choice(tuple(partitions))
|
partition = random.choice(tuple(partitions))
|
||||||
await producer.send_batch(batch, KAFKA_TOPIC, partition=partition)
|
await producer.send_batch(batch, topic, partition=partition)
|
||||||
log.debug(f"{batch.record_count()} messages sent to partition {partition}")
|
log.debug(
|
||||||
|
(
|
||||||
|
f"{batch.record_count()} messages sent to topic "
|
||||||
|
f"{topic} partition {partition}"
|
||||||
|
)
|
||||||
|
)
|
||||||
await producer.stop()
|
await producer.stop()
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,7 @@ import asyncio
|
||||||
import random
|
import random
|
||||||
import string
|
import string
|
||||||
from math import ceil
|
from math import ceil
|
||||||
|
from os import getenv
|
||||||
|
|
||||||
import aiohttp
|
import aiohttp
|
||||||
from numpy import array_split
|
from numpy import array_split
|
||||||
|
@ -10,8 +11,6 @@ from numpy import array_split
|
||||||
import db
|
import db
|
||||||
import util
|
import util
|
||||||
|
|
||||||
from os import getenv
|
|
||||||
|
|
||||||
# CONFIGURATION #
|
# CONFIGURATION #
|
||||||
|
|
||||||
# Number of 4chan threads to request at once
|
# Number of 4chan threads to request at once
|
||||||
|
|
|
@ -1,11 +1,11 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
|
from os import getenv
|
||||||
|
|
||||||
import orjson
|
import orjson
|
||||||
|
|
||||||
import db
|
import db
|
||||||
import util
|
import util
|
||||||
from processing import process
|
from processing import process
|
||||||
from os import getenv
|
|
||||||
|
|
||||||
SOURCES = ["4ch", "irc", "dis"]
|
SOURCES = ["4ch", "irc", "dis"]
|
||||||
KEYPREFIX = "queue."
|
KEYPREFIX = "queue."
|
||||||
|
|
Loading…
Reference in New Issue