import manticoresearch from manticoresearch.rest import ApiException from redis import StrictRedis import util from schemas.mc_s import schema import ujson from numpy import array_split from math import ceil configuration = manticoresearch.Configuration(host="http://monolith-db-1:9308") api_client = manticoresearch.ApiClient(configuration) api_instance = manticoresearch.IndexApi(api_client) log = util.get_logger("db") r = StrictRedis(unix_socket_path="/var/run/redis/redis.sock", db=0) def store_message(msg): """ Store a message into Manticore :param msg: dict """ print("DISCORD MSGLEN", len(msg["msg"])) # normalise fields for key, value in list(msg.items()): if value is None: del msg[key] if key in schema: if isinstance(value, int): if schema[key].startswith("string"): msg[key] = str(value) body = [ { "insert": { "index": "main", "doc": msg } } ] body_post = "" for item in body: body_post += ujson.dumps(item) body_post += "\n" #print(body_post) try: # Bulk index operations api_response = api_instance.bulk(body_post, async_req=True) #print(api_response) except ApiException as e: print("Exception when calling IndexApi->bulk: %s\n" % e) def store_message_bulk(data): """ Store a message into Manticore :param msg: dict """ print("BULK", len(data)) split_posts = array_split(data, ceil(len(data) / 10000)) for messages in split_posts: print("PROCESSING SPLIT OF", len(messages), "MESSAGES") total = [] for msg in messages: # normalise fields for key, value in list(msg.items()): if value is None: del msg[key] if key in schema: if isinstance(value, int): if schema[key].startswith("string"): msg[key] = str(value) body = { "insert": { "index": "main", "doc": msg } } total.append(body) body_post = "" for item in total: body_post += ujson.dumps(item) body_post += "\n" #print(body_post) try: # Bulk index operations api_response = api_instance.bulk(body_post, async_req=True) #print(api_response) except ApiException as e: print("Exception when calling IndexApi->bulk: %s\n" % e) print("FINISHED PROCESSING SPLIT") print("BULK FINISH") def update_schema(): pass def create_index(api_client): util_instance = manticoresearch.UtilsApi(api_client) schema_types = ", ".join([f"{k} {v}" for k,v in schema.items()]) create_query = f"create table if not exists main({schema_types}) engine='columnar'" print("Schema types", create_query) util_instance.sql(create_query) create_index(api_client) update_schema()