Add some environment variables to control debug output
This commit is contained in:
parent
e32b330ef4
commit
f774f4c2d2
10
db.py
10
db.py
|
@ -1,14 +1,19 @@
|
||||||
import random
|
import random
|
||||||
|
from os import getenv
|
||||||
|
|
||||||
import aioredis
|
import aioredis
|
||||||
import orjson
|
import orjson
|
||||||
|
import redis
|
||||||
|
|
||||||
# Kafka
|
# Kafka
|
||||||
from aiokafka import AIOKafkaProducer
|
from aiokafka import AIOKafkaProducer
|
||||||
import redis
|
|
||||||
|
|
||||||
import util
|
import util
|
||||||
|
|
||||||
|
trues = ("true", "1", "t", True)
|
||||||
|
|
||||||
|
MONOLITH_KAFKA_ENABLED = getenv("MONOLITH_KAFKA_ENABLED", "false").lower() in trues
|
||||||
|
|
||||||
# KAFKA_TOPIC = "msg"
|
# KAFKA_TOPIC = "msg"
|
||||||
|
|
||||||
log = util.get_logger("db")
|
log = util.get_logger("db")
|
||||||
|
@ -44,6 +49,9 @@ KEYNAME = "queue"
|
||||||
|
|
||||||
|
|
||||||
async def store_kafka_batch(data):
|
async def store_kafka_batch(data):
|
||||||
|
if not MONOLITH_KAFKA_ENABLED:
|
||||||
|
log.info(f"Not storing Kafka batch of length {len(data)}, Kafka is disabled.")
|
||||||
|
return
|
||||||
# log.debug(f"Storing Kafka batch of {len(data)} messages")
|
# log.debug(f"Storing Kafka batch of {len(data)} messages")
|
||||||
producer = AIOKafkaProducer(bootstrap_servers="kafka:9092")
|
producer = AIOKafkaProducer(bootstrap_servers="kafka:9092")
|
||||||
await producer.start()
|
await producer.start()
|
||||||
|
|
|
@ -15,6 +15,7 @@ from concurrent.futures import ProcessPoolExecutor
|
||||||
# For timestamp processing
|
# For timestamp processing
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from math import ceil
|
from math import ceil
|
||||||
|
from os import getenv
|
||||||
|
|
||||||
import orjson
|
import orjson
|
||||||
import regex
|
import regex
|
||||||
|
@ -51,6 +52,12 @@ import util
|
||||||
# 4chan schema
|
# 4chan schema
|
||||||
from schemas.ch4_s import ATTRMAP
|
from schemas.ch4_s import ATTRMAP
|
||||||
|
|
||||||
|
trues = ("true", "1", "t", True)
|
||||||
|
|
||||||
|
MONOLITH_PROCESS_PERFSTATS = (
|
||||||
|
getenv("MONOLITH_PROCESS_PERFSTATS", "false").lower() in trues
|
||||||
|
)
|
||||||
|
|
||||||
CUSTOM_FILTERS = [
|
CUSTOM_FILTERS = [
|
||||||
lambda x: x.lower(),
|
lambda x: x.lower(),
|
||||||
strip_tags, #
|
strip_tags, #
|
||||||
|
@ -267,6 +274,8 @@ def process_data(data):
|
||||||
# Add the mutated message to the return buffer
|
# Add the mutated message to the return buffer
|
||||||
to_store.append(msg)
|
to_store.append(msg)
|
||||||
total_time += (time.process_time() - total_start) * 1000
|
total_time += (time.process_time() - total_start) * 1000
|
||||||
|
|
||||||
|
if MONOLITH_PROCESS_PERFSTATS:
|
||||||
log.debug("=====================================")
|
log.debug("=====================================")
|
||||||
log.debug(f"Sentiment: {sentiment_time}")
|
log.debug(f"Sentiment: {sentiment_time}")
|
||||||
log.debug(f"Regex: {regex_time}")
|
log.debug(f"Regex: {regex_time}")
|
||||||
|
|
Loading…
Reference in New Issue