103 lines
2.9 KiB
Python
103 lines
2.9 KiB
Python
from math import ceil
|
|
|
|
import manticoresearch
|
|
import ujson
|
|
from manticoresearch.rest import ApiException
|
|
from numpy import array_split
|
|
from redis import StrictRedis
|
|
|
|
import util
|
|
from schemas.mc_s import schema
|
|
import aioredis
|
|
configuration = manticoresearch.Configuration(host="http://monolith-db-1:9308")
|
|
api_client = manticoresearch.ApiClient(configuration)
|
|
api_instance = manticoresearch.IndexApi(api_client)
|
|
|
|
log = util.get_logger("db")
|
|
r = StrictRedis(unix_socket_path="/var/run/redis/redis.sock", db=0)
|
|
ar = aioredis.from_url("unix:///var/run/redis/redis.sock")
|
|
|
|
def store_message(msg):
|
|
"""
|
|
Store a message into Manticore
|
|
:param msg: dict
|
|
"""
|
|
# normalise fields
|
|
for key, value in list(msg.items()):
|
|
if value is None:
|
|
del msg[key]
|
|
if key in schema:
|
|
if isinstance(value, int):
|
|
if schema[key].startswith("string"):
|
|
msg[key] = str(value)
|
|
|
|
body = [{"insert": {"index": "main", "doc": msg}}]
|
|
body_post = ""
|
|
for item in body:
|
|
body_post += ujson.dumps(item)
|
|
body_post += "\n"
|
|
|
|
# print(body_post)
|
|
try:
|
|
# Bulk index operations
|
|
api_response = api_instance.bulk(body_post) # , async_req=True
|
|
# print(api_response)
|
|
except ApiException as e:
|
|
print("Exception when calling IndexApi->bulk: %s\n" % e)
|
|
|
|
|
|
def store_message_bulk(data):
|
|
"""
|
|
Store a message into Manticore
|
|
:param msg: dict
|
|
"""
|
|
if not data:
|
|
return
|
|
# 10000: maximum inserts we can submit to
|
|
# Manticore as of Sept 2022
|
|
split_posts = array_split(data, ceil(len(data) / 10000))
|
|
for messages in split_posts:
|
|
total = []
|
|
for msg in messages:
|
|
# normalise fields
|
|
for key, value in list(msg.items()):
|
|
if value is None:
|
|
del msg[key]
|
|
if key in schema:
|
|
if isinstance(value, int):
|
|
if schema[key].startswith("string"):
|
|
msg[key] = str(value)
|
|
|
|
body = {"insert": {"index": "main", "doc": msg}}
|
|
total.append(body)
|
|
|
|
body_post = ""
|
|
for item in total:
|
|
body_post += ujson.dumps(item)
|
|
body_post += "\n"
|
|
|
|
# print(body_post)
|
|
try:
|
|
# Bulk index operations
|
|
api_response = api_instance.bulk(body_post) # , async_req=True
|
|
# print(api_response)
|
|
except ApiException as e:
|
|
print("Exception when calling IndexApi->bulk: %s\n" % e)
|
|
|
|
|
|
def update_schema():
|
|
pass
|
|
|
|
|
|
def create_index(api_client):
|
|
util_instance = manticoresearch.UtilsApi(api_client)
|
|
schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()])
|
|
|
|
create_query = f"create table if not exists main({schema_types}) engine='columnar'"
|
|
print("Schema types", create_query)
|
|
util_instance.sql(create_query)
|
|
|
|
|
|
create_index(api_client)
|
|
update_schema()
|