From fc7450c33ae5204199c379a1033bef112065b562 Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Thu, 22 Sep 2022 17:39:29 +0100 Subject: [PATCH] Make debug output cleaner --- db.py | 30 +++++++++++++++++------------- processing/process.py | 13 +++++++++---- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/db.py b/db.py index 37aff5c..cfdb429 100644 --- a/db.py +++ b/db.py @@ -44,7 +44,7 @@ KEYPREFIX = "queue." async def store_kafka_batch(data): - log.debug(f"Storing Kafka batch of {len(data)} messages") + # log.debug(f"Storing Kafka batch of {len(data)} messages") producer = AIOKafkaProducer(bootstrap_servers="kafka:9092") await producer.start() topicmap = {} @@ -84,24 +84,28 @@ async def store_kafka_batch(data): partitions = await producer.partitions_for(topic) partition = random.choice(tuple(partitions)) await producer.send_batch(batch, topic, partition=partition) - log.debug( - ( - f"{batch.record_count()} messages sent to topic " - f"{topic} partition {partition}" - ) - ) + # log.debug( + # ( + # f"{batch.record_count()} messages sent to topic " + # f"{topic} partition {partition}" + # ) + # ) batch = producer.create_batch() continue partitions = await producer.partitions_for(topic) partition = random.choice(tuple(partitions)) await producer.send_batch(batch, topic, partition=partition) - log.debug( - ( - f"{batch.record_count()} messages sent to topic " - f"{topic} partition {partition}" - ) - ) + # log.debug( + # ( + # f"{batch.record_count()} messages sent to topic " + # f"{topic} partition {partition}" + # ) + # ) + log.debug( + "Kafka batches sent: " + + ", ".join([topic + ": " + str(len(topicmap[topic])) for topic in topicmap]) + ) await producer.stop() diff --git a/processing/process.py b/processing/process.py index af29917..e0ba858 100644 --- a/processing/process.py +++ b/processing/process.py @@ -101,7 +101,7 @@ hash_key = get_hash_key() @asyncio.coroutine async def spawn_processing_threads(data): len_data = len(data) - log.debug(f"Spawning processing threads for batch of {len_data} messages") + # log.debug(f"Spawning processing threads for batch of {len_data} messages") loop = asyncio.get_event_loop() tasks = [] @@ -112,18 +112,23 @@ async def spawn_processing_threads(data): msg_per_core = int(len(data) / CPU_THREADS) split_data = array_split(data, ceil(len(data) / msg_per_core)) for index, split in enumerate(split_data): - log.debug(f"Delegating processing of {len(split)} messages to thread {index}") + # log.debug(f"Delegating processing of {len(split)} messages to thread {index}") task = loop.run_in_executor(p, process_data, split) tasks.append(task) results = [await task for task in tasks] - log.debug(f"Results from processing of {len_data} messages: {len(results)}") + log.debug( + ( + f"Results from processing of {len_data} messages in " + f"{len(split_data)} threads: {len(results)}" + ) + ) # Join the results back from the split list flat_list = [item for sublist in results for item in sublist] await db.store_kafka_batch(flat_list) - log.debug(f"Finished processing {len_data} messages") + # log.debug(f"Finished processing {len_data} messages") def process_data(data):