monolith/db.py

153 lines
4.0 KiB
Python
Raw Normal View History

from os import getenv
2022-09-04 20:40:04 +00:00
import aioredis
import orjson
import redis
2022-11-22 20:15:02 +00:00
# Elasticsearch
from elasticsearch import AsyncElasticsearch
import util
trues = ("true", "1", "t", True)
2022-11-22 20:15:02 +00:00
# INDEX = "msg"
log = util.get_logger("db")
# Redis (legacy)
2022-10-21 10:14:51 +00:00
r = redis.from_url("redis://ssdb:1289", db=0)
# AIORedis
2022-10-21 10:14:51 +00:00
ar = aioredis.from_url("redis://ssdb:1289", db=0)
TYPES_MAIN = [
"msg",
"notice",
"action",
"part",
"join",
"kick",
"quit",
"nick",
"mode",
"topic",
"update",
]
2022-09-18 12:01:19 +00:00
MAIN_SRC_MAP = {
"dis": "main",
"irc": "restricted",
"4ch": "main",
}
TYPES_META = ["who"]
TYPES_INT = ["conn", "highlight", "znc", "query", "self"]
KEYNAME = "queue"
2022-11-22 20:15:02 +00:00
ELASTICSEARCH_USERNAME = getenv("ELASTICSEARCH_USERNAME", "elastic")
ELASTICSEARCH_PASSWORD = getenv("ELASTICSEARCH_PASSWORD", "changeme")
ELASTICSEARCH_HOST = getenv("ELASTICSEARCH_HOST", "localhost")
ELASTICSEARCH_TLS = getenv("ELASTICSEARCH_TLS", "false") in trues
client = None
# These are sometimes numeric, sometimes strings.
# If they are seen to be numeric first, ES will erroneously
# index them as "long" and then subsequently fail to index messages
# with strings in the field.
keyword_fields = ["nick_id", "user_id", "net_id"]
mapping_int = {
2022-11-22 20:15:02 +00:00
"mappings": {
"properties": {
"ts": {"type": "date", "format": "epoch_second"},
"file_tim": {"type": "date", "format": "epoch_millis"},
}
}
}
mapping = dict(mapping_int)
2022-11-22 20:15:02 +00:00
for field in keyword_fields:
mapping["mappings"]["properties"][field] = {"type": "text"}
del mapping_int["mappings"]["properties"]["file_tim"]
2022-11-22 20:15:02 +00:00
async def initialise_elasticsearch():
"""
Initialise the Elasticsearch client.
"""
auth = (ELASTICSEARCH_USERNAME, ELASTICSEARCH_PASSWORD)
client = AsyncElasticsearch(ELASTICSEARCH_HOST, http_auth=auth, verify_certs=False)
2022-11-23 19:02:31 +00:00
for index in ("main", "meta", "restricted", "internal"):
if index == "internal":
map_dict = mapping_int
else:
map_dict = mapping
2022-11-22 20:15:02 +00:00
if await client.indices.exists(index=index):
# update index with mapping
await client.indices.put_mapping(
index=index, properties=map_dict["mappings"]["properties"]
2022-11-22 20:15:02 +00:00
)
else:
await client.indices.create(index=index, mappings=map_dict["mappings"])
2022-11-22 20:15:02 +00:00
return client
2022-11-22 20:15:02 +00:00
async def store_batch(data):
global client
if not client:
client = await initialise_elasticsearch()
indexmap = {}
for msg in data:
if msg["type"] in TYPES_MAIN:
2022-09-18 12:02:06 +00:00
# index = "main"
2022-09-18 12:01:19 +00:00
index = MAIN_SRC_MAP[msg["src"]]
# schema = mc_s.schema_main
elif msg["type"] in TYPES_META:
index = "meta"
# schema = mc_s.schema_meta
elif msg["type"] in TYPES_INT:
index = "internal"
# schema = mc_s.schema_int
2022-11-22 20:15:02 +00:00
INDEX = index
2022-09-21 09:02:05 +00:00
# if key in schema:
# if isinstance(value, int):
# if schema[key].startswith("string") or schema[key].startswith(
# "text"
# ):
# msg[key] = str(value)
2022-11-22 20:15:02 +00:00
# body = orjson.dumps(msg)
if "ts" not in msg:
raise Exception("No TS in msg")
2022-11-22 20:15:02 +00:00
if INDEX not in indexmap:
indexmap[INDEX] = [msg]
else:
2022-11-22 20:15:02 +00:00
indexmap[INDEX].append(msg)
for index, index_messages in indexmap.items():
for message in index_messages:
result = await client.index(index=index, body=message)
if not result["result"] == "created":
log.error(f"Indexing failed: {result}")
log.debug(f"Indexed {len(data)} messages in ES")
async def queue_message(msg):
"""
Queue a message on the Redis buffer.
"""
message = orjson.dumps(msg)
2022-10-21 06:20:30 +00:00
await ar.lpush(KEYNAME, message)
2022-09-04 20:40:04 +00:00
async def queue_message_bulk(data):
"""
Queue multiple messages on the Redis buffer.
"""
for msg in data:
message = orjson.dumps(msg)
2022-10-21 06:20:30 +00:00
await ar.lpush(KEYNAME, message)