Ingest into Kafka and queue messages better
This commit is contained in:
291
db.py
291
db.py
@@ -8,12 +8,21 @@ from numpy import array_split
|
||||
from redis import StrictRedis
|
||||
|
||||
import util
|
||||
import random
|
||||
from aiokafka import AIOKafkaProducer
|
||||
|
||||
# Manticore schema
|
||||
from schemas import mc_s
|
||||
|
||||
# Manticore
|
||||
configuration = manticoresearch.Configuration(host="http://monolith-db-1:9308")
|
||||
api_client = manticoresearch.ApiClient(configuration)
|
||||
api_instance = manticoresearch.IndexApi(api_client)
|
||||
|
||||
# Kafka
|
||||
from aiokafka import AIOKafkaProducer
|
||||
KAFKA_TOPIC = "msg"
|
||||
|
||||
log = util.get_logger("db")
|
||||
|
||||
# Redis (legacy)
|
||||
@@ -37,121 +46,201 @@ TYPES_MAIN = [
|
||||
]
|
||||
TYPES_META = ["who"]
|
||||
TYPES_INT = ["conn", "highlight", "znc", "query", "self"]
|
||||
KEYPREFIX = "queue."
|
||||
|
||||
|
||||
def store_message(msg):
|
||||
async def store_kafka_batch(data):
|
||||
print("STORING KAFKA BATCH")
|
||||
producer = AIOKafkaProducer(bootstrap_servers='kafka:9092')
|
||||
await producer.start()
|
||||
batch = producer.create_batch()
|
||||
for msg in data:
|
||||
if msg["type"] in TYPES_MAIN:
|
||||
index = "main"
|
||||
schema = mc_s.schema_main
|
||||
elif msg["type"] in TYPES_META:
|
||||
index = "meta"
|
||||
schema = mc_s.schema_meta
|
||||
elif msg["type"] in TYPES_INT:
|
||||
index = "internal"
|
||||
schema = mc_s.schema_int
|
||||
# normalise fields
|
||||
for key, value in list(msg.items()):
|
||||
if value is None:
|
||||
del msg[key]
|
||||
if key in schema:
|
||||
if isinstance(value, int):
|
||||
if schema[key].startswith("string") or schema[key].startswith("text"):
|
||||
msg[key] = str(value)
|
||||
message = ujson.dumps(msg)
|
||||
body = str.encode(message)
|
||||
metadata = batch.append(key=None, value=body, timestamp=msg["ts"])
|
||||
if metadata is None:
|
||||
partitions = await producer.partitions_for(KAFKA_TOPIC)
|
||||
partition = random.choice(tuple(partitions))
|
||||
await producer.send_batch(batch, KAFKA_TOPIC, partition=partition)
|
||||
print("%d messages sent to partition %d"
|
||||
% (batch.record_count(), partition))
|
||||
batch = producer.create_batch()
|
||||
continue
|
||||
|
||||
partitions = await producer.partitions_for(KAFKA_TOPIC)
|
||||
partition = random.choice(tuple(partitions))
|
||||
await producer.send_batch(batch, KAFKA_TOPIC, partition=partition)
|
||||
print("%d messages sent to partition %d"
|
||||
% (batch.record_count(), partition))
|
||||
await producer.stop()
|
||||
|
||||
# def store_message(msg):
|
||||
# """
|
||||
# Store a message into Manticore
|
||||
# :param msg: dict
|
||||
# """
|
||||
# store_kafka(msg)
|
||||
# # Duplicated to avoid extra function call
|
||||
# if msg["type"] in TYPES_MAIN:
|
||||
# index = "main"
|
||||
# schema = mc_s.schema_main
|
||||
# elif msg["type"] in TYPES_META:
|
||||
# index = "meta"
|
||||
# schema = mc_s.schema_meta
|
||||
# elif msg["type"] in TYPES_INT:
|
||||
# index = "internal"
|
||||
# schema = mc_s.schema_int
|
||||
# # normalise fields
|
||||
# for key, value in list(msg.items()):
|
||||
# if value is None:
|
||||
# del msg[key]
|
||||
# if key in schema:
|
||||
# if isinstance(value, int):
|
||||
# if schema[key].startswith("string") or schema[key].startswith("text"):
|
||||
# msg[key] = str(value)
|
||||
|
||||
# body = [{"insert": {"index": index, "doc": msg}}]
|
||||
# body_post = ""
|
||||
# for item in body:
|
||||
# body_post += ujson.dumps(item)
|
||||
# body_post += "\n"
|
||||
|
||||
# # print(body_post)
|
||||
# try:
|
||||
# # Bulk index operations
|
||||
# print("FAKE POST")
|
||||
# #api_response = api_instance.bulk(body_post) # , async_req=True
|
||||
# # print(api_response)
|
||||
# except ApiException as e:
|
||||
# print("Exception when calling IndexApi->bulk: %s\n" % e)
|
||||
# print("ATTEMPT", body_post)
|
||||
|
||||
async def queue_message(msg):
|
||||
"""
|
||||
Store a message into Manticore
|
||||
:param msg: dict
|
||||
Queue a message on the Redis buffer.
|
||||
"""
|
||||
# Duplicated to avoid extra function call
|
||||
if msg["type"] in TYPES_MAIN:
|
||||
index = "main"
|
||||
schema = mc_s.schema_main
|
||||
elif msg["type"] in TYPES_META:
|
||||
index = "meta"
|
||||
schema = mc_s.schema_meta
|
||||
elif msg["type"] in TYPES_INT:
|
||||
index = "internal"
|
||||
schema = mc_s.schema_int
|
||||
# normalise fields
|
||||
for key, value in list(msg.items()):
|
||||
if value is None:
|
||||
del msg[key]
|
||||
if key in schema:
|
||||
if isinstance(value, int):
|
||||
if schema[key].startswith("string") or schema[key].startswith("text"):
|
||||
msg[key] = str(value)
|
||||
src = msg["src"]
|
||||
message = ujson.dumps(msg)
|
||||
|
||||
body = [{"insert": {"index": index, "doc": msg}}]
|
||||
body_post = ""
|
||||
for item in body:
|
||||
body_post += ujson.dumps(item)
|
||||
body_post += "\n"
|
||||
key = "{KEYPREFIX}{src}"
|
||||
await ar.sadd(key, message)
|
||||
|
||||
# print(body_post)
|
||||
try:
|
||||
# Bulk index operations
|
||||
print("FAKE POST")
|
||||
#api_response = api_instance.bulk(body_post) # , async_req=True
|
||||
# print(api_response)
|
||||
except ApiException as e:
|
||||
print("Exception when calling IndexApi->bulk: %s\n" % e)
|
||||
print("ATTEMPT", body_post)
|
||||
|
||||
|
||||
def store_message_bulk(data):
|
||||
async def queue_message_bulk(data):
|
||||
"""
|
||||
Store a message into Manticore
|
||||
:param msg: dict
|
||||
Queue multiple messages on the Redis buffer.
|
||||
"""
|
||||
if not data:
|
||||
return
|
||||
# 10000: maximum inserts we can submit to
|
||||
# Manticore as of Sept 2022
|
||||
split_posts = array_split(data, ceil(len(data) / 10000))
|
||||
for messages in split_posts:
|
||||
total = []
|
||||
for msg in messages:
|
||||
# Duplicated to avoid extra function call (see above)
|
||||
if msg["type"] in TYPES_MAIN:
|
||||
index = "main"
|
||||
schema = mc_s.schema_main
|
||||
elif msg["type"] in TYPES_META:
|
||||
index = "meta"
|
||||
schema = mc_s.schema_meta
|
||||
elif msg["type"] in TYPES_INT:
|
||||
index = "internal"
|
||||
schema = mc_s.schema_int
|
||||
# normalise fields
|
||||
for key, value in list(msg.items()):
|
||||
if value is None:
|
||||
del msg[key]
|
||||
if key in schema:
|
||||
if isinstance(value, int):
|
||||
if schema[key].startswith("string") or schema[key].startswith(
|
||||
"text"
|
||||
):
|
||||
msg[key] = str(value)
|
||||
for msg in data:
|
||||
src = msg["src"]
|
||||
message = ujson.dumps(msg)
|
||||
|
||||
body = {"insert": {"index": index, "doc": msg}}
|
||||
total.append(body)
|
||||
|
||||
body_post = ""
|
||||
for item in total:
|
||||
body_post += ujson.dumps(item)
|
||||
body_post += "\n"
|
||||
|
||||
# print(body_post)
|
||||
try:
|
||||
# Bulk index operations
|
||||
print("FAKE POST")
|
||||
#api_response = api_instance.bulk(body_post) # , async_req=True
|
||||
#print(api_response)
|
||||
except ApiException as e:
|
||||
print("Exception when calling IndexApi->bulk: %s\n" % e)
|
||||
print("ATTEMPT", body_post)
|
||||
key = "{KEYPREFIX}{src}"
|
||||
await ar.sadd(key, message)
|
||||
|
||||
|
||||
def update_schema():
|
||||
pass
|
||||
# For now, make a normal function until we go full async
|
||||
def queue_message_bulk_sync(data):
|
||||
"""
|
||||
Queue multiple messages on the Redis buffer.
|
||||
"""
|
||||
for msg in data:
|
||||
src = msg["src"]
|
||||
message = ujson.dumps(msg)
|
||||
|
||||
key = "{KEYPREFIX}{src}"
|
||||
r.sadd(key, message)
|
||||
|
||||
|
||||
def create_index(api_client):
|
||||
util_instance = manticoresearch.UtilsApi(api_client)
|
||||
schemas = {
|
||||
"main": mc_s.schema_main,
|
||||
"meta": mc_s.schema_meta,
|
||||
"internal": mc_s.schema_int,
|
||||
}
|
||||
for name, schema in schemas.items():
|
||||
schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()])
|
||||
# def store_message_bulk(data):
|
||||
# """
|
||||
# Store a message into Manticore
|
||||
# :param msg: dict
|
||||
# """
|
||||
# if not data:
|
||||
# return
|
||||
# for msg in data:
|
||||
# store_kafka(msg)
|
||||
# # 10000: maximum inserts we can submit to
|
||||
# # Manticore as of Sept 2022
|
||||
# split_posts = array_split(data, ceil(len(data) / 10000))
|
||||
# for messages in split_posts:
|
||||
# total = []
|
||||
# for msg in messages:
|
||||
# # Duplicated to avoid extra function call (see above)
|
||||
# if msg["type"] in TYPES_MAIN:
|
||||
# index = "main"
|
||||
# schema = mc_s.schema_main
|
||||
# elif msg["type"] in TYPES_META:
|
||||
# index = "meta"
|
||||
# schema = mc_s.schema_meta
|
||||
# elif msg["type"] in TYPES_INT:
|
||||
# index = "internal"
|
||||
# schema = mc_s.schema_int
|
||||
# # normalise fields
|
||||
# for key, value in list(msg.items()):
|
||||
# if value is None:
|
||||
# del msg[key]
|
||||
# if key in schema:
|
||||
# if isinstance(value, int):
|
||||
# if schema[key].startswith("string") or schema[key].startswith(
|
||||
# "text"
|
||||
# ):
|
||||
# msg[key] = str(value)
|
||||
|
||||
create_query = (
|
||||
f"create table if not exists {name}({schema_types}) engine='columnar'"
|
||||
)
|
||||
print("Schema types", create_query)
|
||||
util_instance.sql(create_query)
|
||||
# body = {"insert": {"index": index, "doc": msg}}
|
||||
# total.append(body)
|
||||
|
||||
# body_post = ""
|
||||
# for item in total:
|
||||
# body_post += ujson.dumps(item)
|
||||
# body_post += "\n"
|
||||
|
||||
# # print(body_post)
|
||||
# try:
|
||||
# # Bulk index operations
|
||||
# print("FAKE POST")
|
||||
# #api_response = api_instance.bulk(body_post) # , async_req=True
|
||||
# #print(api_response)
|
||||
# except ApiException as e:
|
||||
# print("Exception when calling IndexApi->bulk: %s\n" % e)
|
||||
# print("ATTEMPT", body_post)
|
||||
|
||||
|
||||
# def update_schema():
|
||||
# pass
|
||||
|
||||
|
||||
# def create_index(api_client):
|
||||
# util_instance = manticoresearch.UtilsApi(api_client)
|
||||
# schemas = {
|
||||
# "main": mc_s.schema_main,
|
||||
# "meta": mc_s.schema_meta,
|
||||
# "internal": mc_s.schema_int,
|
||||
# }
|
||||
# for name, schema in schemas.items():
|
||||
# schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()])
|
||||
|
||||
# create_query = (
|
||||
# f"create table if not exists {name}({schema_types}) engine='columnar'"
|
||||
# )
|
||||
# print("Schema types", create_query)
|
||||
# util_instance.sql(create_query)
|
||||
|
||||
|
||||
#create_index(api_client)
|
||||
|
||||
Reference in New Issue
Block a user