import asyncio from os import getenv import aioredis import msgpack import orjson import redis # Elasticsearch from elasticsearch import AsyncElasticsearch import util trues = ("true", "1", "t", True) # INDEX = "msg" log = util.get_logger("db") # Redis (legacy) r = redis.from_url("redis://ssdb:1289", db=0) # AIORedis ar = aioredis.from_url("redis://ssdb:1289", db=0) # Neptune redis for PubSub pr = aioredis.from_url("redis://redis_neptune:6379", db=10) TYPES_MAIN = [ "msg", "notice", "action", "part", "join", "kick", "quit", "nick", "mode", "topic", "update", ] MAIN_SRC_MAP = { "dis": "main", "irc": "restricted", "4ch": "main", } TYPES_META = ["who"] TYPES_INT = ["conn", "highlight", "znc", "query", "self"] KEYNAME = "queue" MESSAGE_KEY = "messages" ELASTICSEARCH_USERNAME = getenv("ELASTICSEARCH_USERNAME", "elastic") ELASTICSEARCH_PASSWORD = getenv("ELASTICSEARCH_PASSWORD", "changeme") ELASTICSEARCH_HOST = getenv("ELASTICSEARCH_HOST", "localhost") ELASTICSEARCH_TLS = getenv("ELASTICSEARCH_TLS", "false") in trues client = None # These are sometimes numeric, sometimes strings. # If they are seen to be numeric first, ES will erroneously # index them as "long" and then subsequently fail to index messages # with strings in the field. keyword_fields = ["nick_id", "user_id", "net_id"] mapping_int = { "mappings": { "properties": { "ts": {"type": "date", "format": "epoch_second"}, "file_tim": {"type": "date", "format": "epoch_millis"}, } } } mapping = dict(mapping_int) for field in keyword_fields: mapping["mappings"]["properties"][field] = {"type": "text"} del mapping_int["mappings"]["properties"]["file_tim"] async def initialise_elasticsearch(): """ Initialise the Elasticsearch client. """ auth = (ELASTICSEARCH_USERNAME, ELASTICSEARCH_PASSWORD) client = AsyncElasticsearch(ELASTICSEARCH_HOST, http_auth=auth, verify_certs=False) for index in ("main", "meta", "restricted", "internal"): if index == "internal": map_dict = mapping_int else: map_dict = mapping if await client.indices.exists(index=index): # update index with mapping await client.indices.put_mapping( index=index, properties=map_dict["mappings"]["properties"] ) else: await client.indices.create(index=index, mappings=map_dict["mappings"]) return client async def store_batch(data): global client if not client: client = await initialise_elasticsearch() indexmap = {} for msg in data: if msg["type"] in TYPES_MAIN: # index = "main" index = MAIN_SRC_MAP[msg["src"]] # schema = mc_s.schema_main elif msg["type"] in TYPES_META: index = "meta" # schema = mc_s.schema_meta elif msg["type"] in TYPES_INT: index = "internal" # schema = mc_s.schema_int INDEX = index # if key in schema: # if isinstance(value, int): # if schema[key].startswith("string") or schema[key].startswith( # "text" # ): # msg[key] = str(value) # body = orjson.dumps(msg) if "ts" not in msg: raise Exception("No TS in msg") if INDEX not in indexmap: indexmap[INDEX] = [msg] else: indexmap[INDEX].append(msg) # Pack the indexmap with msgpack and publish it to Neptune packed_index = msgpack.packb(indexmap, use_bin_type=True) completed_publish = False for i in range(10): if completed_publish: break try: await pr.publish(MESSAGE_KEY, packed_index) completed_publish = True except aioredis.exceptions.ConnectionError: await asyncio.sleep(0.1) if not completed_publish: log.error("Failed to publish to Neptune") for index, index_messages in indexmap.items(): for message in index_messages: result = await client.index(index=index, body=message) if not result["result"] == "created": log.error(f"Indexing failed: {result}") log.debug(f"Indexed {len(data)} messages in ES") async def queue_message(msg): """ Queue a message on the Redis buffer. """ # TODO: msgpack message = orjson.dumps(msg) await ar.lpush(KEYNAME, message) async def queue_message_bulk(data): """ Queue multiple messages on the Redis buffer. """ for msg in data: # TODO: msgpack message = orjson.dumps(msg) await ar.lpush(KEYNAME, message)