|
|
|
@ -44,7 +44,7 @@ KEYPREFIX = "queue."
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def store_kafka_batch(data):
|
|
|
|
|
log.debug(f"Storing Kafka batch of {len(data)} messages")
|
|
|
|
|
# log.debug(f"Storing Kafka batch of {len(data)} messages")
|
|
|
|
|
producer = AIOKafkaProducer(bootstrap_servers="kafka:9092")
|
|
|
|
|
await producer.start()
|
|
|
|
|
topicmap = {}
|
|
|
|
@ -84,24 +84,28 @@ async def store_kafka_batch(data):
|
|
|
|
|
partitions = await producer.partitions_for(topic)
|
|
|
|
|
partition = random.choice(tuple(partitions))
|
|
|
|
|
await producer.send_batch(batch, topic, partition=partition)
|
|
|
|
|
log.debug(
|
|
|
|
|
(
|
|
|
|
|
f"{batch.record_count()} messages sent to topic "
|
|
|
|
|
f"{topic} partition {partition}"
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
# log.debug(
|
|
|
|
|
# (
|
|
|
|
|
# f"{batch.record_count()} messages sent to topic "
|
|
|
|
|
# f"{topic} partition {partition}"
|
|
|
|
|
# )
|
|
|
|
|
# )
|
|
|
|
|
batch = producer.create_batch()
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
partitions = await producer.partitions_for(topic)
|
|
|
|
|
partition = random.choice(tuple(partitions))
|
|
|
|
|
await producer.send_batch(batch, topic, partition=partition)
|
|
|
|
|
log.debug(
|
|
|
|
|
(
|
|
|
|
|
f"{batch.record_count()} messages sent to topic "
|
|
|
|
|
f"{topic} partition {partition}"
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
# log.debug(
|
|
|
|
|
# (
|
|
|
|
|
# f"{batch.record_count()} messages sent to topic "
|
|
|
|
|
# f"{topic} partition {partition}"
|
|
|
|
|
# )
|
|
|
|
|
# )
|
|
|
|
|
log.debug(
|
|
|
|
|
"Kafka batches sent: "
|
|
|
|
|
+ ", ".join([topic + ": " + str(len(topicmap[topic])) for topic in topicmap])
|
|
|
|
|
)
|
|
|
|
|
await producer.stop()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|