Make debug output cleaner

This commit is contained in:
Mark Veidemanis 2022-09-22 17:39:29 +01:00
parent 0e9a016e2a
commit fc7450c33a
Signed by: m
GPG Key ID: 5ACFCEED46C0904F
2 changed files with 26 additions and 17 deletions

30
db.py
View File

@ -44,7 +44,7 @@ KEYPREFIX = "queue."
async def store_kafka_batch(data): async def store_kafka_batch(data):
log.debug(f"Storing Kafka batch of {len(data)} messages") # log.debug(f"Storing Kafka batch of {len(data)} messages")
producer = AIOKafkaProducer(bootstrap_servers="kafka:9092") producer = AIOKafkaProducer(bootstrap_servers="kafka:9092")
await producer.start() await producer.start()
topicmap = {} topicmap = {}
@ -84,24 +84,28 @@ async def store_kafka_batch(data):
partitions = await producer.partitions_for(topic) partitions = await producer.partitions_for(topic)
partition = random.choice(tuple(partitions)) partition = random.choice(tuple(partitions))
await producer.send_batch(batch, topic, partition=partition) await producer.send_batch(batch, topic, partition=partition)
log.debug( # log.debug(
( # (
f"{batch.record_count()} messages sent to topic " # f"{batch.record_count()} messages sent to topic "
f"{topic} partition {partition}" # f"{topic} partition {partition}"
) # )
) # )
batch = producer.create_batch() batch = producer.create_batch()
continue continue
partitions = await producer.partitions_for(topic) partitions = await producer.partitions_for(topic)
partition = random.choice(tuple(partitions)) partition = random.choice(tuple(partitions))
await producer.send_batch(batch, topic, partition=partition) await producer.send_batch(batch, topic, partition=partition)
log.debug( # log.debug(
( # (
f"{batch.record_count()} messages sent to topic " # f"{batch.record_count()} messages sent to topic "
f"{topic} partition {partition}" # f"{topic} partition {partition}"
) # )
) # )
log.debug(
"Kafka batches sent: "
+ ", ".join([topic + ": " + str(len(topicmap[topic])) for topic in topicmap])
)
await producer.stop() await producer.stop()

View File

@ -101,7 +101,7 @@ hash_key = get_hash_key()
@asyncio.coroutine @asyncio.coroutine
async def spawn_processing_threads(data): async def spawn_processing_threads(data):
len_data = len(data) len_data = len(data)
log.debug(f"Spawning processing threads for batch of {len_data} messages") # log.debug(f"Spawning processing threads for batch of {len_data} messages")
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
tasks = [] tasks = []
@ -112,18 +112,23 @@ async def spawn_processing_threads(data):
msg_per_core = int(len(data) / CPU_THREADS) msg_per_core = int(len(data) / CPU_THREADS)
split_data = array_split(data, ceil(len(data) / msg_per_core)) split_data = array_split(data, ceil(len(data) / msg_per_core))
for index, split in enumerate(split_data): for index, split in enumerate(split_data):
log.debug(f"Delegating processing of {len(split)} messages to thread {index}") # log.debug(f"Delegating processing of {len(split)} messages to thread {index}")
task = loop.run_in_executor(p, process_data, split) task = loop.run_in_executor(p, process_data, split)
tasks.append(task) tasks.append(task)
results = [await task for task in tasks] results = [await task for task in tasks]
log.debug(f"Results from processing of {len_data} messages: {len(results)}") log.debug(
(
f"Results from processing of {len_data} messages in "
f"{len(split_data)} threads: {len(results)}"
)
)
# Join the results back from the split list # Join the results back from the split list
flat_list = [item for sublist in results for item in sublist] flat_list = [item for sublist in results for item in sublist]
await db.store_kafka_batch(flat_list) await db.store_kafka_batch(flat_list)
log.debug(f"Finished processing {len_data} messages") # log.debug(f"Finished processing {len_data} messages")
def process_data(data): def process_data(data):