135 lines
3.6 KiB
Python
135 lines
3.6 KiB
Python
import random
|
|
from os import getenv
|
|
|
|
import aioredis
|
|
import orjson
|
|
import redis
|
|
|
|
# Kafka
|
|
from aiokafka import AIOKafkaProducer
|
|
|
|
import util
|
|
|
|
trues = ("true", "1", "t", True)
|
|
|
|
MONOLITH_KAFKA_ENABLED = getenv("MONOLITH_KAFKA_ENABLED", "false").lower() in trues
|
|
|
|
# KAFKA_TOPIC = "msg"
|
|
|
|
log = util.get_logger("db")
|
|
|
|
# Redis (legacy)
|
|
r = redis.from_url("redis://ssdb:1289", db=0)
|
|
|
|
# AIORedis
|
|
ar = aioredis.from_url("redis://ssdb:1289", db=0)
|
|
|
|
TYPES_MAIN = [
|
|
"msg",
|
|
"notice",
|
|
"action",
|
|
"part",
|
|
"join",
|
|
"kick",
|
|
"quit",
|
|
"nick",
|
|
"mode",
|
|
"topic",
|
|
"update",
|
|
]
|
|
MAIN_SRC_MAP = {
|
|
"dis": "main",
|
|
"irc": "restricted",
|
|
"4ch": "main",
|
|
}
|
|
|
|
TYPES_META = ["who"]
|
|
TYPES_INT = ["conn", "highlight", "znc", "query", "self"]
|
|
KEYNAME = "queue"
|
|
|
|
|
|
async def store_kafka_batch(data):
|
|
if not MONOLITH_KAFKA_ENABLED:
|
|
log.info(f"Not storing Kafka batch of length {len(data)}, Kafka is disabled.")
|
|
return
|
|
# log.debug(f"Storing Kafka batch of {len(data)} messages")
|
|
producer = AIOKafkaProducer(bootstrap_servers="kafka:9092")
|
|
await producer.start()
|
|
topicmap = {}
|
|
for msg in data:
|
|
if msg["type"] in TYPES_MAIN:
|
|
# index = "main"
|
|
index = MAIN_SRC_MAP[msg["src"]]
|
|
# schema = mc_s.schema_main
|
|
elif msg["type"] in TYPES_META:
|
|
index = "meta"
|
|
# schema = mc_s.schema_meta
|
|
elif msg["type"] in TYPES_INT:
|
|
index = "internal"
|
|
# schema = mc_s.schema_int
|
|
|
|
KAFKA_TOPIC = index
|
|
|
|
# if key in schema:
|
|
# if isinstance(value, int):
|
|
# if schema[key].startswith("string") or schema[key].startswith(
|
|
# "text"
|
|
# ):
|
|
# msg[key] = str(value)
|
|
body = orjson.dumps(msg)
|
|
if "ts" not in msg:
|
|
raise Exception("No TS in msg")
|
|
if KAFKA_TOPIC not in topicmap:
|
|
topicmap[KAFKA_TOPIC] = [body]
|
|
else:
|
|
topicmap[KAFKA_TOPIC].append(body)
|
|
|
|
for topic, messages in topicmap.items():
|
|
batch = producer.create_batch()
|
|
for body in messages:
|
|
metadata = batch.append(key=None, value=body, timestamp=msg["ts"])
|
|
if metadata is None:
|
|
partitions = await producer.partitions_for(topic)
|
|
partition = random.choice(tuple(partitions))
|
|
await producer.send_batch(batch, topic, partition=partition)
|
|
# log.debug(
|
|
# (
|
|
# f"{batch.record_count()} messages sent to topic "
|
|
# f"{topic} partition {partition}"
|
|
# )
|
|
# )
|
|
batch = producer.create_batch()
|
|
continue
|
|
|
|
partitions = await producer.partitions_for(topic)
|
|
partition = random.choice(tuple(partitions))
|
|
await producer.send_batch(batch, topic, partition=partition)
|
|
# log.debug(
|
|
# (
|
|
# f"{batch.record_count()} messages sent to topic "
|
|
# f"{topic} partition {partition}"
|
|
# )
|
|
# )
|
|
log.debug(
|
|
"Kafka batches sent: "
|
|
+ ", ".join([topic + ": " + str(len(topicmap[topic])) for topic in topicmap])
|
|
)
|
|
await producer.stop()
|
|
|
|
|
|
async def queue_message(msg):
|
|
"""
|
|
Queue a message on the Redis buffer.
|
|
"""
|
|
message = orjson.dumps(msg)
|
|
await ar.lpush(KEYNAME, message)
|
|
|
|
|
|
async def queue_message_bulk(data):
|
|
"""
|
|
Queue multiple messages on the Redis buffer.
|
|
"""
|
|
for msg in data:
|
|
message = orjson.dumps(msg)
|
|
await ar.lpush(KEYNAME, message)
|