from math import ceil import aioredis import manticoresearch import orjson from manticoresearch.rest import ApiException from numpy import array_split from redis import StrictRedis import msgpack import asyncio import util from schemas import mc_s from os import getenv from time import sleep configuration = manticoresearch.Configuration(host="http://monolith_db:9308") api_client = manticoresearch.ApiClient(configuration) api_instance = manticoresearch.IndexApi(api_client) log = util.get_logger("db") # Redis (legacy) # r = StrictRedis(unix_socket_path="/var/run/redis/redis.sock", db=0) r = StrictRedis( host="ssdb_monolith", # Replace with your Redis server's IP address port=1289, # Replace with your Redis server's port db=0 # Database number ) # AIORedis # ar = aioredis.from_url("unix:///var/run/redis/redis.sock", db=0) ar = aioredis.from_url( "redis://ssdb_monolith:1289", db=0 ) pr = aioredis.from_url("redis://redis_neptune:6379", db=10, password=getenv("REDIS_PASSWORD")) KEYNAME = "queue" MESSAGE_KEY = "messages" TYPES_MAIN = [ "msg", "notice", "action", "part", "join", "kick", "quit", "nick", "mode", "topic", "update", ] TYPES_META = ["who"] TYPES_INT = ["conn", "highlight", "znc", "query", "self"] # def store_message(msg): # """ # Store a message into Manticore # :param msg: dict # """ # # Duplicated to avoid extra function call # if msg["type"] in TYPES_MAIN: # index = "main" # schema = mc_s.schema_main # elif msg["type"] in TYPES_META: # index = "meta" # schema = mc_s.schema_meta # elif msg["type"] in TYPES_INT: # index = "internal" # schema = mc_s.schema_int # # normalise fields # for key, value in list(msg.items()): # if value is None: # del msg[key] # if key in schema: # if isinstance(value, int): # if schema[key].startswith("string") or schema[key].startswith("text"): # msg[key] = str(value) # body = [{"insert": {"index": index, "doc": msg}}] # body_post = "" # for item in body: # body_post += orjson.dumps(item) # body_post += "\n" # # print(body_post) # try: # # Bulk index operations # api_response = api_instance.bulk(body_post) # , async_req=True # # print(api_response) # except ApiException as e: # print("Exception when calling IndexApi->bulk: %s\n" % e) # print("ATTEMPT", body_post) async def store_batch(data): """ Store a message into Manticore :param msg: dict """ if not data: return # 10000: maximum inserts we can submit to # Manticore as of Sept 2022 split_posts = array_split(data, ceil(len(data) / 10000)) for messages in split_posts: total = [] indexmap = {} for msg in messages: if msg["type"] in TYPES_MAIN: index = "main" schema = mc_s.schema_main elif msg["type"] in TYPES_META: index = "meta" schema = mc_s.schema_meta elif msg["type"] in TYPES_INT: index = "internal" schema = mc_s.schema_int # normalise fields for key, value in list(msg.items()): if value is None: del msg[key] if key in schema: if isinstance(value, int): if schema[key].startswith("string") or schema[key].startswith( "text" ): msg[key] = str(value) body = {"insert": {"index": index, "doc": msg}} total.append(body) if "ts" not in msg: raise Exception("No TS in msg") if index not in indexmap: indexmap[index] = [msg] else: indexmap[index].append(msg) # END MSG IN MESSAGES # Pack the indexmap with msgpack and publish it to Neptune packed_index = msgpack.packb(indexmap, use_bin_type=True) completed_publish = False for i in range(10): if completed_publish: break try: await pr.publish(MESSAGE_KEY, packed_index) completed_publish = True except aioredis.exceptions.ConnectionError as e: raise e await asyncio.sleep(0.1) if not completed_publish: log.error("Failed to publish to Neptune") body_post = "" for item in total: print("ITEM", item) body_post += orjson.dumps(item).decode("utf-8") body_post += "\n" # print(body_post) try: # Bulk index operations api_response = api_instance.bulk(body_post) # , async_req=True except ApiException as e: print("Exception when calling IndexApi->bulk: %s\n" % e) print(f"Completed ingest to MC of length {len(total)}") # END MESSAGES IN SPLIT def update_schema(): pass def create_index(api_client): util_instance = manticoresearch.UtilsApi(api_client) schemas = { "main": mc_s.schema_main, "meta": mc_s.schema_meta, "internal": mc_s.schema_int, } for name, schema in schemas.items(): schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()]) create_query = ( f"create table if not exists {name}({schema_types}) engine='columnar'" ) print("Schema types", create_query) util_instance.sql(create_query) async def queue_message(msg): """ Queue a message on the Redis buffer. """ # TODO: msgpack message = orjson.dumps(msg) await ar.lpush(KEYNAME, message) async def queue_message_bulk(data): """ Queue multiple messages on the Redis buffer. """ for msg in data: # TODO: msgpack message = orjson.dumps(msg) await ar.lpush(KEYNAME, message) created = False while not created: try: create_index(api_client) created = True except Exception as e: print(f"Error creating index: {e}") sleep(1) # Block the thread, just wait for the DB update_schema()