import random from os import getenv import aioredis import orjson import redis # Kafka from aiokafka import AIOKafkaProducer import util trues = ("true", "1", "t", True) MONOLITH_KAFKA_ENABLED = getenv("MONOLITH_KAFKA_ENABLED", "false").lower() in trues # KAFKA_TOPIC = "msg" log = util.get_logger("db") # Redis (legacy) r = redis.from_url("redis://ssdb:1289", db=0) # AIORedis ar = aioredis.from_url("redis://ssdb:1289", db=0) TYPES_MAIN = [ "msg", "notice", "action", "part", "join", "kick", "quit", "nick", "mode", "topic", "update", ] MAIN_SRC_MAP = { "dis": "main", "irc": "restricted", "4ch": "main", } TYPES_META = ["who"] TYPES_INT = ["conn", "highlight", "znc", "query", "self"] KEYNAME = "queue" async def store_kafka_batch(data): if not MONOLITH_KAFKA_ENABLED: log.info(f"Not storing Kafka batch of length {len(data)}, Kafka is disabled.") return # log.debug(f"Storing Kafka batch of {len(data)} messages") producer = AIOKafkaProducer(bootstrap_servers="kafka:9092") await producer.start() topicmap = {} for msg in data: if msg["type"] in TYPES_MAIN: # index = "main" index = MAIN_SRC_MAP[msg["src"]] # schema = mc_s.schema_main elif msg["type"] in TYPES_META: index = "meta" # schema = mc_s.schema_meta elif msg["type"] in TYPES_INT: index = "internal" # schema = mc_s.schema_int KAFKA_TOPIC = index # if key in schema: # if isinstance(value, int): # if schema[key].startswith("string") or schema[key].startswith( # "text" # ): # msg[key] = str(value) body = orjson.dumps(msg) if "ts" not in msg: raise Exception("No TS in msg") if KAFKA_TOPIC not in topicmap: topicmap[KAFKA_TOPIC] = [body] else: topicmap[KAFKA_TOPIC].append(body) for topic, messages in topicmap.items(): batch = producer.create_batch() for body in messages: metadata = batch.append(key=None, value=body, timestamp=msg["ts"]) if metadata is None: partitions = await producer.partitions_for(topic) partition = random.choice(tuple(partitions)) await producer.send_batch(batch, topic, partition=partition) # log.debug( # ( # f"{batch.record_count()} messages sent to topic " # f"{topic} partition {partition}" # ) # ) batch = producer.create_batch() continue partitions = await producer.partitions_for(topic) partition = random.choice(tuple(partitions)) await producer.send_batch(batch, topic, partition=partition) # log.debug( # ( # f"{batch.record_count()} messages sent to topic " # f"{topic} partition {partition}" # ) # ) log.debug( "Kafka batches sent: " + ", ".join([tpc + ": " + str(len(topicmap[tpc])) for tpc in topicmap]) ) await producer.stop() async def queue_message(msg): """ Queue a message on the Redis buffer. """ message = orjson.dumps(msg) await ar.lpush(KEYNAME, message) async def queue_message_bulk(data): """ Queue multiple messages on the Redis buffer. """ for msg in data: message = orjson.dumps(msg) await ar.lpush(KEYNAME, message)