Run processing in thread

This commit is contained in:
2022-09-04 21:29:00 +01:00
parent db23b31f30
commit 60c43b4eb5
7 changed files with 152 additions and 85 deletions

73
db.py
View File

@@ -1,12 +1,13 @@
import json
from pprint import pprint
import manticoresearch
from manticoresearch.rest import ApiException
from redis import StrictRedis
import util
from schemas.mc_s import schema
import ujson
from numpy import array_split
from math import ceil
configuration = manticoresearch.Configuration(host="http://monolith-db-1:9308")
api_client = manticoresearch.ApiClient(configuration)
api_instance = manticoresearch.IndexApi(api_client)
@@ -39,7 +40,7 @@ def store_message(msg):
]
body_post = ""
for item in body:
body_post += json.dumps(item)
body_post += ujson.dumps(item)
body_post += "\n"
#print(body_post)
@@ -50,43 +51,49 @@ def store_message(msg):
except ApiException as e:
print("Exception when calling IndexApi->bulk: %s\n" % e)
async def store_message_bulk(messages):
def store_message_bulk(data):
"""
Store a message into Manticore
:param msg: dict
"""
print("BULK", len(messages))
total = []
for msg in messages:
# normalise fields
for key, value in list(msg.items()):
if value is None:
del msg[key]
if key in schema:
if isinstance(value, int):
if schema[key].startswith("string"):
msg[key] = str(value)
print("BULK", len(data))
split_posts = array_split(data, ceil(len(data) / 10000))
for messages in split_posts:
print("PROCESSING SPLIT OF", len(messages), "MESSAGES")
total = []
for msg in messages:
# normalise fields
for key, value in list(msg.items()):
if value is None:
del msg[key]
if key in schema:
if isinstance(value, int):
if schema[key].startswith("string"):
msg[key] = str(value)
body = {
"insert": {
"index": "main",
"doc": msg
body = {
"insert": {
"index": "main",
"doc": msg
}
}
}
total.append(body)
total.append(body)
body_post = ""
for item in total:
body_post += json.dumps(item)
body_post += "\n"
body_post = ""
for item in total:
body_post += ujson.dumps(item)
body_post += "\n"
#print(body_post)
try:
# Bulk index operations
api_response = api_instance.bulk(body_post, async_req=True)
#print(api_response)
except ApiException as e:
print("Exception when calling IndexApi->bulk: %s\n" % e)
#print(body_post)
try:
# Bulk index operations
api_response = api_instance.bulk(body_post, async_req=True)
#print(api_response)
except ApiException as e:
print("Exception when calling IndexApi->bulk: %s\n" % e)
print("FINISHED PROCESSING SPLIT")
print("BULK FINISH")
def update_schema():
pass