2024-12-29 17:35:57 +00:00
|
|
|
from math import ceil
|
2022-09-04 20:40:04 +00:00
|
|
|
|
2022-09-07 06:20:30 +00:00
|
|
|
import aioredis
|
2024-12-29 17:35:57 +00:00
|
|
|
import manticoresearch
|
2022-09-16 16:09:49 +00:00
|
|
|
import orjson
|
2024-12-29 17:35:57 +00:00
|
|
|
from manticoresearch.rest import ApiException
|
|
|
|
from numpy import array_split
|
|
|
|
from redis import StrictRedis
|
|
|
|
import msgpack
|
|
|
|
import asyncio
|
2022-09-02 21:30:45 +00:00
|
|
|
|
|
|
|
import util
|
2024-12-29 17:35:57 +00:00
|
|
|
from schemas import mc_s
|
|
|
|
from os import getenv
|
|
|
|
from time import sleep
|
2022-09-13 21:17:46 +00:00
|
|
|
|
2024-12-29 17:35:57 +00:00
|
|
|
configuration = manticoresearch.Configuration(host="http://monolith_db:9308")
|
|
|
|
api_client = manticoresearch.ApiClient(configuration)
|
|
|
|
api_instance = manticoresearch.IndexApi(api_client)
|
2022-09-13 21:17:46 +00:00
|
|
|
|
2022-09-02 21:30:45 +00:00
|
|
|
log = util.get_logger("db")
|
2022-09-07 06:20:30 +00:00
|
|
|
|
|
|
|
# Redis (legacy)
|
2024-12-29 17:35:57 +00:00
|
|
|
# r = StrictRedis(unix_socket_path="/var/run/redis/redis.sock", db=0)
|
|
|
|
r = StrictRedis(
|
|
|
|
host="ssdb_monolith", # Replace with your Redis server's IP address
|
|
|
|
port=1289, # Replace with your Redis server's port
|
|
|
|
db=0 # Database number
|
|
|
|
)
|
2022-09-07 06:20:30 +00:00
|
|
|
# AIORedis
|
2024-12-29 17:35:57 +00:00
|
|
|
# ar = aioredis.from_url("unix:///var/run/redis/redis.sock", db=0)
|
|
|
|
ar = aioredis.from_url(
|
|
|
|
"redis://ssdb_monolith:1289",
|
|
|
|
db=0
|
|
|
|
)
|
|
|
|
pr = aioredis.from_url("redis://redis_neptune:6379", db=10, password=getenv("REDIS_PASSWORD"))
|
|
|
|
|
|
|
|
KEYNAME = "queue"
|
|
|
|
MESSAGE_KEY = "messages"
|
2022-09-07 06:20:30 +00:00
|
|
|
|
2023-01-12 07:20:43 +00:00
|
|
|
|
2022-09-07 06:20:30 +00:00
|
|
|
TYPES_MAIN = [
|
|
|
|
"msg",
|
|
|
|
"notice",
|
|
|
|
"action",
|
|
|
|
"part",
|
|
|
|
"join",
|
|
|
|
"kick",
|
|
|
|
"quit",
|
|
|
|
"nick",
|
|
|
|
"mode",
|
|
|
|
"topic",
|
2022-09-07 06:20:30 +00:00
|
|
|
"update",
|
2022-09-07 06:20:30 +00:00
|
|
|
]
|
|
|
|
TYPES_META = ["who"]
|
|
|
|
TYPES_INT = ["conn", "highlight", "znc", "query", "self"]
|
2022-11-22 20:15:02 +00:00
|
|
|
|
|
|
|
|
2024-12-29 17:35:57 +00:00
|
|
|
# def store_message(msg):
|
|
|
|
# """
|
|
|
|
# Store a message into Manticore
|
|
|
|
# :param msg: dict
|
|
|
|
# """
|
|
|
|
# # Duplicated to avoid extra function call
|
|
|
|
# if msg["type"] in TYPES_MAIN:
|
|
|
|
# index = "main"
|
|
|
|
# schema = mc_s.schema_main
|
|
|
|
# elif msg["type"] in TYPES_META:
|
|
|
|
# index = "meta"
|
|
|
|
# schema = mc_s.schema_meta
|
|
|
|
# elif msg["type"] in TYPES_INT:
|
|
|
|
# index = "internal"
|
|
|
|
# schema = mc_s.schema_int
|
|
|
|
# # normalise fields
|
|
|
|
# for key, value in list(msg.items()):
|
|
|
|
# if value is None:
|
|
|
|
# del msg[key]
|
|
|
|
# if key in schema:
|
|
|
|
# if isinstance(value, int):
|
|
|
|
# if schema[key].startswith("string") or schema[key].startswith("text"):
|
|
|
|
# msg[key] = str(value)
|
|
|
|
|
|
|
|
# body = [{"insert": {"index": index, "doc": msg}}]
|
|
|
|
# body_post = ""
|
|
|
|
# for item in body:
|
|
|
|
# body_post += orjson.dumps(item)
|
|
|
|
# body_post += "\n"
|
|
|
|
|
|
|
|
# # print(body_post)
|
|
|
|
# try:
|
|
|
|
# # Bulk index operations
|
|
|
|
# api_response = api_instance.bulk(body_post) # , async_req=True
|
|
|
|
# # print(api_response)
|
|
|
|
# except ApiException as e:
|
|
|
|
# print("Exception when calling IndexApi->bulk: %s\n" % e)
|
|
|
|
# print("ATTEMPT", body_post)
|
2022-11-22 21:42:35 +00:00
|
|
|
|
|
|
|
|
2024-12-29 17:35:57 +00:00
|
|
|
async def store_batch(data):
|
2022-11-22 20:15:02 +00:00
|
|
|
"""
|
2024-12-29 17:35:57 +00:00
|
|
|
Store a message into Manticore
|
|
|
|
:param msg: dict
|
2022-11-22 20:15:02 +00:00
|
|
|
"""
|
2024-12-29 17:35:57 +00:00
|
|
|
if not data:
|
|
|
|
return
|
|
|
|
# 10000: maximum inserts we can submit to
|
|
|
|
# Manticore as of Sept 2022
|
|
|
|
split_posts = array_split(data, ceil(len(data) / 10000))
|
|
|
|
for messages in split_posts:
|
|
|
|
total = []
|
|
|
|
indexmap = {}
|
|
|
|
for msg in messages:
|
|
|
|
if msg["type"] in TYPES_MAIN:
|
|
|
|
index = "main"
|
|
|
|
schema = mc_s.schema_main
|
|
|
|
elif msg["type"] in TYPES_META:
|
|
|
|
index = "meta"
|
|
|
|
schema = mc_s.schema_meta
|
|
|
|
elif msg["type"] in TYPES_INT:
|
|
|
|
index = "internal"
|
|
|
|
schema = mc_s.schema_int
|
|
|
|
# normalise fields
|
|
|
|
for key, value in list(msg.items()):
|
|
|
|
if value is None:
|
|
|
|
del msg[key]
|
|
|
|
if key in schema:
|
|
|
|
if isinstance(value, int):
|
|
|
|
if schema[key].startswith("string") or schema[key].startswith(
|
|
|
|
"text"
|
|
|
|
):
|
|
|
|
msg[key] = str(value)
|
|
|
|
|
|
|
|
body = {"insert": {"index": index, "doc": msg}}
|
|
|
|
total.append(body)
|
|
|
|
if "ts" not in msg:
|
|
|
|
raise Exception("No TS in msg")
|
|
|
|
if index not in indexmap:
|
|
|
|
indexmap[index] = [msg]
|
|
|
|
else:
|
|
|
|
indexmap[index].append(msg)
|
|
|
|
# END MSG IN MESSAGES
|
|
|
|
|
|
|
|
# Pack the indexmap with msgpack and publish it to Neptune
|
|
|
|
packed_index = msgpack.packb(indexmap, use_bin_type=True)
|
|
|
|
completed_publish = False
|
|
|
|
for i in range(10):
|
|
|
|
if completed_publish:
|
|
|
|
break
|
|
|
|
try:
|
|
|
|
await pr.publish(MESSAGE_KEY, packed_index)
|
|
|
|
completed_publish = True
|
|
|
|
except aioredis.exceptions.ConnectionError as e:
|
|
|
|
raise e
|
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
if not completed_publish:
|
|
|
|
log.error("Failed to publish to Neptune")
|
|
|
|
|
|
|
|
body_post = ""
|
|
|
|
for item in total:
|
|
|
|
print("ITEM", item)
|
|
|
|
body_post += orjson.dumps(item).decode("utf-8")
|
|
|
|
body_post += "\n"
|
|
|
|
|
|
|
|
# print(body_post)
|
2022-11-22 20:15:02 +00:00
|
|
|
|
2022-09-13 21:17:46 +00:00
|
|
|
|
2023-01-13 07:20:27 +00:00
|
|
|
try:
|
2024-12-29 17:35:57 +00:00
|
|
|
# Bulk index operations
|
|
|
|
api_response = api_instance.bulk(body_post) # , async_req=True
|
|
|
|
except ApiException as e:
|
|
|
|
print("Exception when calling IndexApi->bulk: %s\n" % e)
|
|
|
|
print(f"Completed ingest to MC of length {len(total)}")
|
|
|
|
# END MESSAGES IN SPLIT
|
|
|
|
|
2023-01-12 07:20:43 +00:00
|
|
|
|
2024-12-29 17:35:57 +00:00
|
|
|
def update_schema():
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
def create_index(api_client):
|
|
|
|
util_instance = manticoresearch.UtilsApi(api_client)
|
|
|
|
schemas = {
|
|
|
|
"main": mc_s.schema_main,
|
|
|
|
"meta": mc_s.schema_meta,
|
|
|
|
"internal": mc_s.schema_int,
|
|
|
|
}
|
|
|
|
for name, schema in schemas.items():
|
|
|
|
schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()])
|
|
|
|
|
|
|
|
create_query = (
|
|
|
|
f"create table if not exists {name}({schema_types}) engine='columnar'"
|
|
|
|
)
|
|
|
|
print("Schema types", create_query)
|
|
|
|
util_instance.sql(create_query)
|
2022-09-13 21:17:46 +00:00
|
|
|
|
2022-09-14 17:32:32 +00:00
|
|
|
|
2022-09-13 21:17:46 +00:00
|
|
|
async def queue_message(msg):
|
|
|
|
"""
|
|
|
|
Queue a message on the Redis buffer.
|
|
|
|
"""
|
2023-01-12 07:20:43 +00:00
|
|
|
# TODO: msgpack
|
2022-09-16 16:09:49 +00:00
|
|
|
message = orjson.dumps(msg)
|
2022-10-21 06:20:30 +00:00
|
|
|
await ar.lpush(KEYNAME, message)
|
2022-09-04 20:40:04 +00:00
|
|
|
|
2022-09-14 17:32:32 +00:00
|
|
|
|
2022-09-13 21:17:46 +00:00
|
|
|
async def queue_message_bulk(data):
|
2022-09-02 21:30:45 +00:00
|
|
|
"""
|
2022-09-13 21:17:46 +00:00
|
|
|
Queue multiple messages on the Redis buffer.
|
2022-09-02 21:30:45 +00:00
|
|
|
"""
|
2022-09-13 21:17:46 +00:00
|
|
|
for msg in data:
|
2023-01-12 07:20:43 +00:00
|
|
|
# TODO: msgpack
|
2022-09-16 16:09:49 +00:00
|
|
|
message = orjson.dumps(msg)
|
2022-10-21 06:20:30 +00:00
|
|
|
await ar.lpush(KEYNAME, message)
|
2024-12-29 17:35:57 +00:00
|
|
|
|
|
|
|
|
|
|
|
created = False
|
|
|
|
while not created:
|
|
|
|
try:
|
|
|
|
create_index(api_client)
|
|
|
|
created = True
|
|
|
|
except Exception as e:
|
|
|
|
print(f"Error creating index: {e}")
|
|
|
|
sleep(1) # Block the thread, just wait for the DB
|
|
|
|
update_schema()
|