import asyncio from math import ceil from os import getenv from time import sleep import aiomysql import aioredis import manticoresearch import msgpack import orjson from manticoresearch.rest import ApiException from numpy import array_split from redis import StrictRedis import util from schemas import mc_s mysql_pool = None configuration = manticoresearch.Configuration(host="http://127.0.0.1:9308") api_client = manticoresearch.ApiClient(configuration) api_instance = manticoresearch.IndexApi(api_client) log = util.get_logger("db") # Redis (legacy) # r = StrictRedis(unix_socket_path="/var/run/redis/redis.sock", db=0) r = StrictRedis( host="127.0.0.1", # Replace with your Redis server's IP address port=1289, # Replace with your Redis server's port db=0, # Database number ) # AIORedis # ar = aioredis.from_url("unix:///var/run/redis/redis.sock", db=0) ar = aioredis.from_url("redis://127.0.0.1:1289", db=0) # /var/run/neptune-redis.sock # db = 10 pr = aioredis.from_url("unix://var/run/neptune-redis.sock", db=10) # fr = aioredis.from_url("unix://var/run/fisk-redis.sock", db=10) fr = aioredis.from_url("unix://var/run/redis.sock", db=10) # pr = aioredis.from_url("redis://redis_neptune:6379", db=10, password=getenv("REDIS_PASSWORD")) KEYNAME = "queue" MESSAGE_KEY = "messages" OHLC_MESSAGE_KEY = "ohlc" TYPES_MAIN = [ "msg", "notice", "action", "part", "join", "kick", "quit", "nick", "mode", "topic", "update", ] TYPES_META = ["who"] TYPES_INT = ["conn", "highlight", "znc", "query", "self"] async def init_mysql_pool(): """ Initialize the MySQL connection pool. """ global mysql_pool mysql_pool = await aiomysql.create_pool( host="127.0.0.1", port=9306, db="Manticore", minsize=1, maxsize=10 ) async def rts_store_message(index, data): """ Store a RTS message into MySQL using an existing connection pool. Prioritizes instant PubSub delivery, with minimal data storage overhead. :param index: str :param data: dict """ # Publish to Redis PubSub packed_index = msgpack.packb({"index": index, "data": data}, use_bin_type=True) try: await fr.publish(OHLC_MESSAGE_KEY, packed_index) except aioredis.exceptions.ConnectionError as e: raise e await asyncio.sleep(0.1) # Insert data into MySQL try: async with mysql_pool.acquire() as conn: async with conn.cursor() as cur: # Insert data into the table query = f""" INSERT INTO {index} (s, o, c, h, l, v, a, i, t, t2, ts) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ # Bind the values directly await cur.execute( query, ( data["s"], # symbol data["o"], # open data["c"], # close data["h"], # high data["l"], # low data["v"], # volume_base data["a"], # volume_quote data["i"], # interval data["t"], # start_time data["t2"], # end_time data["ts"], # event_time ), ) await conn.commit() log.debug(f"Stored data for {data['s']} in MySQL.") except aiomysql.Error as e: log.error(f"MySQL error: {e}") async def store_batch(data): """ Store a message into Manticore :param data: list """ if not data: return # 10000: maximum inserts we can submit to # Manticore as of Sept 2022 split_posts = array_split(data, ceil(len(data) / 10000)) for messages in split_posts: total = [] indexmap = {} for msg in messages: if msg["type"] in TYPES_MAIN: index = "main" schema = mc_s.schema_main elif msg["type"] in TYPES_META: index = "meta" schema = mc_s.schema_meta elif msg["type"] in TYPES_INT: index = "internal" schema = mc_s.schema_int # normalise fields for key, value in list(msg.items()): if value is None: del msg[key] if key in schema: if isinstance(value, int): if schema[key].startswith("string") or schema[key].startswith( "text" ): msg[key] = str(value) body = {"insert": {"index": index, "doc": msg}} total.append(body) if "ts" not in msg: raise Exception("No TS in msg") if index not in indexmap: indexmap[index] = [msg] else: indexmap[index].append(msg) # END MSG IN MESSAGES # Pack the indexmap with msgpack and publish it to Neptune packed_index = msgpack.packb(indexmap, use_bin_type=True) completed_publish = False for i in range(10): if completed_publish: break try: await pr.publish(MESSAGE_KEY, packed_index) completed_publish = True except aioredis.exceptions.ConnectionError as e: raise e await asyncio.sleep(0.1) if not completed_publish: log.error("Failed to publish to Neptune") body_post = "" for item in total: # print("ITEM", item) body_post += orjson.dumps(item).decode("utf-8") body_post += "\n" # print("BODY POST INDEX", index, body_post) try: # Bulk index operations api_response = api_instance.bulk(body_post) # , async_req=True except ApiException as e: log.error("Exception when calling IndexApi->bulk: %s\n" % e) log.error("body_post attempted to send", body_post) log.info(f"Completed ingest to MC of length {len(total)}") # END MESSAGES IN SPLIT def update_schema(): pass def create_index(api_client): util_instance = manticoresearch.UtilsApi(api_client) schemas = { "main": mc_s.schema_main, "rule_storage": mc_s.schema_rule_storage, "meta": mc_s.schema_meta, "internal": mc_s.schema_int, } for name, schema in schemas.items(): schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()]) create_query = ( f"create table if not exists {name}({schema_types}) engine='columnar'" ) print("Schema types", create_query) util_instance.sql(create_query) async def queue_message(msg): """ Queue a message on the Redis buffer. """ # TODO: msgpack message = orjson.dumps(msg) await ar.lpush(KEYNAME, message) async def queue_message_bulk(data): """ Queue multiple messages on the Redis buffer. """ for msg in data: # TODO: msgpack message = orjson.dumps(msg) await ar.lpush(KEYNAME, message)