Implement threshold writing to Redis and manticore ingesting from Redis
This commit is contained in:
71
db.py
71
db.py
@@ -1,5 +1,6 @@
|
||||
from math import ceil
|
||||
|
||||
import aioredis
|
||||
import manticoresearch
|
||||
import ujson
|
||||
from manticoresearch.rest import ApiException
|
||||
@@ -7,21 +8,51 @@ from numpy import array_split
|
||||
from redis import StrictRedis
|
||||
|
||||
import util
|
||||
from schemas.mc_s import schema
|
||||
import aioredis
|
||||
from schemas import mc_s
|
||||
|
||||
configuration = manticoresearch.Configuration(host="http://monolith-db-1:9308")
|
||||
api_client = manticoresearch.ApiClient(configuration)
|
||||
api_instance = manticoresearch.IndexApi(api_client)
|
||||
|
||||
log = util.get_logger("db")
|
||||
|
||||
# Redis (legacy)
|
||||
r = StrictRedis(unix_socket_path="/var/run/redis/redis.sock", db=0)
|
||||
ar = aioredis.from_url("unix:///var/run/redis/redis.sock")
|
||||
|
||||
# AIORedis
|
||||
ar = aioredis.from_url("unix:///var/run/redis/redis.sock", db=0)
|
||||
|
||||
TYPES_MAIN = [
|
||||
"msg",
|
||||
"notice",
|
||||
"action",
|
||||
"part",
|
||||
"join",
|
||||
"kick",
|
||||
"quit",
|
||||
"nick",
|
||||
"mode",
|
||||
"topic",
|
||||
]
|
||||
TYPES_META = ["who"]
|
||||
TYPES_INT = ["conn", "highlight", "znc", "query", "self"]
|
||||
|
||||
|
||||
def store_message(msg):
|
||||
"""
|
||||
Store a message into Manticore
|
||||
:param msg: dict
|
||||
"""
|
||||
# Duplicated to avoid extra function call
|
||||
if msg["type"] in TYPES_MAIN:
|
||||
index = "main"
|
||||
schema = mc_s.schema_main
|
||||
elif msg["type"] in TYPES_META:
|
||||
index = "meta"
|
||||
schema = mc_s.schema_meta
|
||||
elif msg["type"] in TYPES_INT:
|
||||
index = "internal"
|
||||
schema = mc_s.schema_int
|
||||
# normalise fields
|
||||
for key, value in list(msg.items()):
|
||||
if value is None:
|
||||
@@ -31,7 +62,7 @@ def store_message(msg):
|
||||
if schema[key].startswith("string"):
|
||||
msg[key] = str(value)
|
||||
|
||||
body = [{"insert": {"index": "main", "doc": msg}}]
|
||||
body = [{"insert": {"index": index, "doc": msg}}]
|
||||
body_post = ""
|
||||
for item in body:
|
||||
body_post += ujson.dumps(item)
|
||||
@@ -44,6 +75,7 @@ def store_message(msg):
|
||||
# print(api_response)
|
||||
except ApiException as e:
|
||||
print("Exception when calling IndexApi->bulk: %s\n" % e)
|
||||
print("ATTEMPT", body_post)
|
||||
|
||||
|
||||
def store_message_bulk(data):
|
||||
@@ -59,6 +91,16 @@ def store_message_bulk(data):
|
||||
for messages in split_posts:
|
||||
total = []
|
||||
for msg in messages:
|
||||
# Duplicated to avoid extra function call (see above)
|
||||
if msg["type"] in TYPES_MAIN:
|
||||
index = "main"
|
||||
schema = mc_s.schema_main
|
||||
elif msg["type"] in TYPES_META:
|
||||
index = "meta"
|
||||
schema = mc_s.schema_meta
|
||||
elif msg["type"] in TYPES_INT:
|
||||
index = "internal"
|
||||
schema = mc_s.schema_int
|
||||
# normalise fields
|
||||
for key, value in list(msg.items()):
|
||||
if value is None:
|
||||
@@ -68,7 +110,7 @@ def store_message_bulk(data):
|
||||
if schema[key].startswith("string"):
|
||||
msg[key] = str(value)
|
||||
|
||||
body = {"insert": {"index": "main", "doc": msg}}
|
||||
body = {"insert": {"index": index, "doc": msg}}
|
||||
total.append(body)
|
||||
|
||||
body_post = ""
|
||||
@@ -80,9 +122,10 @@ def store_message_bulk(data):
|
||||
try:
|
||||
# Bulk index operations
|
||||
api_response = api_instance.bulk(body_post) # , async_req=True
|
||||
# print(api_response)
|
||||
print(api_response)
|
||||
except ApiException as e:
|
||||
print("Exception when calling IndexApi->bulk: %s\n" % e)
|
||||
print("ATTEMPT", body_post)
|
||||
|
||||
|
||||
def update_schema():
|
||||
@@ -91,11 +134,19 @@ def update_schema():
|
||||
|
||||
def create_index(api_client):
|
||||
util_instance = manticoresearch.UtilsApi(api_client)
|
||||
schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()])
|
||||
schemas = {
|
||||
"main": mc_s.schema_main,
|
||||
"meta": mc_s.schema_meta,
|
||||
"internal": mc_s.schema_int,
|
||||
}
|
||||
for name, schema in schemas.items():
|
||||
schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()])
|
||||
|
||||
create_query = f"create table if not exists main({schema_types}) engine='columnar'"
|
||||
print("Schema types", create_query)
|
||||
util_instance.sql(create_query)
|
||||
create_query = (
|
||||
f"create table if not exists {name}({schema_types}) engine='columnar'"
|
||||
)
|
||||
print("Schema types", create_query)
|
||||
util_instance.sql(create_query)
|
||||
|
||||
|
||||
create_index(api_client)
|
||||
|
||||
Reference in New Issue
Block a user