|
|
|
@ -47,7 +47,7 @@ async def store_kafka_batch(data):
|
|
|
|
|
log.debug(f"Storing Kafka batch of {len(data)} messages")
|
|
|
|
|
producer = AIOKafkaProducer(bootstrap_servers="kafka:9092")
|
|
|
|
|
await producer.start()
|
|
|
|
|
batch = producer.create_batch()
|
|
|
|
|
topicmap = {}
|
|
|
|
|
for msg in data:
|
|
|
|
|
if msg["type"] in TYPES_MAIN:
|
|
|
|
|
# index = "main"
|
|
|
|
@ -72,23 +72,39 @@ async def store_kafka_batch(data):
|
|
|
|
|
# ):
|
|
|
|
|
# msg[key] = str(value)
|
|
|
|
|
body = orjson.dumps(msg)
|
|
|
|
|
# orjson returns bytes
|
|
|
|
|
# body = str.encode(message)
|
|
|
|
|
if "ts" not in msg:
|
|
|
|
|
raise Exception("No TS in msg")
|
|
|
|
|
metadata = batch.append(key=None, value=body, timestamp=msg["ts"])
|
|
|
|
|
if metadata is None:
|
|
|
|
|
partitions = await producer.partitions_for(KAFKA_TOPIC)
|
|
|
|
|
partition = random.choice(tuple(partitions))
|
|
|
|
|
await producer.send_batch(batch, KAFKA_TOPIC, partition=partition)
|
|
|
|
|
log.debug(f"{batch.record_count()} messages sent to partition {partition}")
|
|
|
|
|
batch = producer.create_batch()
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
partitions = await producer.partitions_for(KAFKA_TOPIC)
|
|
|
|
|
partition = random.choice(tuple(partitions))
|
|
|
|
|
await producer.send_batch(batch, KAFKA_TOPIC, partition=partition)
|
|
|
|
|
log.debug(f"{batch.record_count()} messages sent to partition {partition}")
|
|
|
|
|
if KAFKA_TOPIC not in topicmap:
|
|
|
|
|
topicmap[KAFKA_TOPIC] = [body]
|
|
|
|
|
else:
|
|
|
|
|
topicmap[KAFKA_TOPIC].append(body)
|
|
|
|
|
|
|
|
|
|
for topic, messages in topicmap.items():
|
|
|
|
|
batch = producer.create_batch()
|
|
|
|
|
for body in messages:
|
|
|
|
|
metadata = batch.append(key=None, value=body, timestamp=msg["ts"])
|
|
|
|
|
if metadata is None:
|
|
|
|
|
partitions = await producer.partitions_for(topic)
|
|
|
|
|
partition = random.choice(tuple(partitions))
|
|
|
|
|
await producer.send_batch(batch, topic, partition=partition)
|
|
|
|
|
log.debug(
|
|
|
|
|
(
|
|
|
|
|
f"{batch.record_count()} messages sent to topic "
|
|
|
|
|
f"{topic} partition {partition}"
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
batch = producer.create_batch()
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
partitions = await producer.partitions_for(topic)
|
|
|
|
|
partition = random.choice(tuple(partitions))
|
|
|
|
|
await producer.send_batch(batch, topic, partition=partition)
|
|
|
|
|
log.debug(
|
|
|
|
|
(
|
|
|
|
|
f"{batch.record_count()} messages sent to topic "
|
|
|
|
|
f"{topic} partition {partition}"
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
await producer.stop()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|