import random import aioredis import orjson # Kafka from aiokafka import AIOKafkaProducer from redis import StrictRedis import util # KAFKA_TOPIC = "msg" log = util.get_logger("db") # Redis (legacy) r = StrictRedis(unix_socket_path="/var/run/redis/redis.sock", db=0) # AIORedis ar = aioredis.from_url("unix:///var/run/redis/redis.sock", db=0) TYPES_MAIN = [ "msg", "notice", "action", "part", "join", "kick", "quit", "nick", "mode", "topic", "update", ] MAIN_SRC_MAP = { "dis": "main", "irc": "restricted", "4ch": "main", } TYPES_META = ["who"] TYPES_INT = ["conn", "highlight", "znc", "query", "self"] KEYPREFIX = "queue." async def store_kafka_batch(data): log.debug(f"Storing Kafka batch of {len(data)} messages") producer = AIOKafkaProducer(bootstrap_servers="kafka:9092") await producer.start() topicmap = {} for msg in data: if msg["type"] in TYPES_MAIN: # index = "main" index = MAIN_SRC_MAP[msg["src"]] # schema = mc_s.schema_main elif msg["type"] in TYPES_META: index = "meta" # schema = mc_s.schema_meta elif msg["type"] in TYPES_INT: index = "internal" # schema = mc_s.schema_int KAFKA_TOPIC = index # normalise fields for key, value in list(msg.items()): if value is None: del msg[key] # if key in schema: # if isinstance(value, int): # if schema[key].startswith("string") or schema[key].startswith( # "text" # ): # msg[key] = str(value) body = orjson.dumps(msg) if "ts" not in msg: raise Exception("No TS in msg") if KAFKA_TOPIC not in topicmap: topicmap[KAFKA_TOPIC] = [body] else: topicmap[KAFKA_TOPIC].append(body) for topic, messages in topicmap.items(): batch = producer.create_batch() for body in messages: metadata = batch.append(key=None, value=body, timestamp=msg["ts"]) if metadata is None: partitions = await producer.partitions_for(topic) partition = random.choice(tuple(partitions)) await producer.send_batch(batch, topic, partition=partition) log.debug( ( f"{batch.record_count()} messages sent to topic " f"{topic} partition {partition}" ) ) batch = producer.create_batch() continue partitions = await producer.partitions_for(topic) partition = random.choice(tuple(partitions)) await producer.send_batch(batch, topic, partition=partition) log.debug( ( f"{batch.record_count()} messages sent to topic " f"{topic} partition {partition}" ) ) await producer.stop() async def queue_message(msg): """ Queue a message on the Redis buffer. """ src = msg["src"] message = orjson.dumps(msg) key = f"{KEYPREFIX}{src}" # log.debug(f"Queueing single message of string length {len(message)}") await ar.sadd(key, message) async def queue_message_bulk(data): """ Queue multiple messages on the Redis buffer. """ # log.debug(f"Queueing message batch of length {len(data)}") for msg in data: src = msg["src"] message = orjson.dumps(msg) key = f"{KEYPREFIX}{src}" await ar.sadd(key, message)