diff --git a/db.py b/db.py index 996c586..7d265c5 100644 --- a/db.py +++ b/db.py @@ -1,14 +1,19 @@ import random +from os import getenv import aioredis import orjson +import redis # Kafka from aiokafka import AIOKafkaProducer -import redis import util +trues = ("true", "1", "t", True) + +MONOLITH_KAFKA_ENABLED = getenv("MONOLITH_KAFKA_ENABLED", "false").lower() in trues + # KAFKA_TOPIC = "msg" log = util.get_logger("db") @@ -44,6 +49,9 @@ KEYNAME = "queue" async def store_kafka_batch(data): + if not MONOLITH_KAFKA_ENABLED: + log.info(f"Not storing Kafka batch of length {len(data)}, Kafka is disabled.") + return # log.debug(f"Storing Kafka batch of {len(data)} messages") producer = AIOKafkaProducer(bootstrap_servers="kafka:9092") await producer.start() diff --git a/processing/process.py b/processing/process.py index 4082046..2d04015 100644 --- a/processing/process.py +++ b/processing/process.py @@ -15,6 +15,7 @@ from concurrent.futures import ProcessPoolExecutor # For timestamp processing from datetime import datetime from math import ceil +from os import getenv import orjson import regex @@ -51,6 +52,12 @@ import util # 4chan schema from schemas.ch4_s import ATTRMAP +trues = ("true", "1", "t", True) + +MONOLITH_PROCESS_PERFSTATS = ( + getenv("MONOLITH_PROCESS_PERFSTATS", "false").lower() in trues +) + CUSTOM_FILTERS = [ lambda x: x.lower(), strip_tags, # @@ -267,17 +274,19 @@ def process_data(data): # Add the mutated message to the return buffer to_store.append(msg) total_time += (time.process_time() - total_start) * 1000 - log.debug("=====================================") - log.debug(f"Sentiment: {sentiment_time}") - log.debug(f"Regex: {regex_time}") - log.debug(f"Polyglot: {polyglot_time}") - log.debug(f"Date: {date_time}") - log.debug(f"NLP: {nlp_time}") - log.debug(f"Normalise: {normalise_time}") - log.debug(f"Hash: {hash_time}") - log.debug(f"Normal2: {normal2_time}") - log.debug(f"Soup: {soup_time}") - log.debug(f"Total: {total_time}") - log.debug("=====================================") + + if MONOLITH_PROCESS_PERFSTATS: + log.debug("=====================================") + log.debug(f"Sentiment: {sentiment_time}") + log.debug(f"Regex: {regex_time}") + log.debug(f"Polyglot: {polyglot_time}") + log.debug(f"Date: {date_time}") + log.debug(f"NLP: {nlp_time}") + log.debug(f"Normalise: {normalise_time}") + log.debug(f"Hash: {hash_time}") + log.debug(f"Normal2: {normal2_time}") + log.debug(f"Soup: {soup_time}") + log.debug(f"Total: {total_time}") + log.debug("=====================================") return to_store