diff --git a/processing/process.py b/processing/process.py index 2d04015..da6a218 100644 --- a/processing/process.py +++ b/processing/process.py @@ -123,15 +123,15 @@ async def spawn_processing_threads(data): tasks.append(task) results = [await task for task in tasks] + + # Join the results back from the split list + flat_list = [item for sublist in results for item in sublist] log.debug( ( f"Results from processing of {len_data} messages in " - f"{len(split_data)} threads: {len(results)}" + f"{len(split_data)} threads: {len(flat_list)}" ) ) - - # Join the results back from the split list - flat_list = [item for sublist in results for item in sublist] await db.store_kafka_batch(flat_list) # log.debug(f"Finished processing {len_data} messages")