Properly process Redis buffered messages and ingest into Kafka

This commit is contained in:
2022-09-14 18:32:32 +01:00
parent fec0d379a6
commit 4ea77ac543
6 changed files with 190 additions and 133 deletions

179
db.py
View File

@@ -1,15 +1,15 @@
import random
from math import ceil
import aioredis
import manticoresearch
import ujson
from aiokafka import AIOKafkaProducer
from manticoresearch.rest import ApiException
from numpy import array_split
from redis import StrictRedis
import util
import random
from aiokafka import AIOKafkaProducer
# Manticore schema
from schemas import mc_s
@@ -21,6 +21,7 @@ api_instance = manticoresearch.IndexApi(api_client)
# Kafka
from aiokafka import AIOKafkaProducer
KAFKA_TOPIC = "msg"
log = util.get_logger("db")
@@ -51,7 +52,7 @@ KEYPREFIX = "queue."
async def store_kafka_batch(data):
print("STORING KAFKA BATCH")
producer = AIOKafkaProducer(bootstrap_servers='kafka:9092')
producer = AIOKafkaProducer(bootstrap_servers="kafka:9092")
await producer.start()
batch = producer.create_batch()
for msg in data:
@@ -70,67 +71,74 @@ async def store_kafka_batch(data):
del msg[key]
if key in schema:
if isinstance(value, int):
if schema[key].startswith("string") or schema[key].startswith("text"):
if schema[key].startswith("string") or schema[key].startswith(
"text"
):
msg[key] = str(value)
message = ujson.dumps(msg)
body = str.encode(message)
if "ts" not in msg:
# print("MSG WITHOUT TS", msg)
continue
metadata = batch.append(key=None, value=body, timestamp=msg["ts"])
if metadata is None:
partitions = await producer.partitions_for(KAFKA_TOPIC)
partition = random.choice(tuple(partitions))
await producer.send_batch(batch, KAFKA_TOPIC, partition=partition)
print("%d messages sent to partition %d"
% (batch.record_count(), partition))
print(
"%d messages sent to partition %d" % (batch.record_count(), partition)
)
batch = producer.create_batch()
continue
partitions = await producer.partitions_for(KAFKA_TOPIC)
partition = random.choice(tuple(partitions))
await producer.send_batch(batch, KAFKA_TOPIC, partition=partition)
print("%d messages sent to partition %d"
% (batch.record_count(), partition))
print("%d messages sent to partition %d" % (batch.record_count(), partition))
await producer.stop()
# def store_message(msg):
# """
# Store a message into Manticore
# :param msg: dict
# """
# store_kafka(msg)
# # Duplicated to avoid extra function call
# if msg["type"] in TYPES_MAIN:
# index = "main"
# schema = mc_s.schema_main
# elif msg["type"] in TYPES_META:
# index = "meta"
# schema = mc_s.schema_meta
# elif msg["type"] in TYPES_INT:
# index = "internal"
# schema = mc_s.schema_int
# # normalise fields
# for key, value in list(msg.items()):
# if value is None:
# del msg[key]
# if key in schema:
# if isinstance(value, int):
# if schema[key].startswith("string") or schema[key].startswith("text"):
# msg[key] = str(value)
# # Duplicated to avoid extra function call
# if msg["type"] in TYPES_MAIN:
# index = "main"
# schema = mc_s.schema_main
# elif msg["type"] in TYPES_META:
# index = "meta"
# schema = mc_s.schema_meta
# elif msg["type"] in TYPES_INT:
# index = "internal"
# schema = mc_s.schema_int
# # normalise fields
# for key, value in list(msg.items()):
# if value is None:
# del msg[key]
# if key in schema:
# if isinstance(value, int):
# if schema[key].startswith("string") or schema[key].startswith("text"):
# msg[key] = str(value)
# body = [{"insert": {"index": index, "doc": msg}}]
# body_post = ""
# for item in body:
# body_post += ujson.dumps(item)
# body_post += "\n"
# body = [{"insert": {"index": index, "doc": msg}}]
# body_post = ""
# for item in body:
# body_post += ujson.dumps(item)
# body_post += "\n"
# # print(body_post)
# try:
# # Bulk index operations
# print("FAKE POST")
# #api_response = api_instance.bulk(body_post) # , async_req=True
# # print(api_response)
# except ApiException as e:
# print("Exception when calling IndexApi->bulk: %s\n" % e)
# print("ATTEMPT", body_post)
# # print(body_post)
# try:
# # Bulk index operations
# print("FAKE POST")
# #api_response = api_instance.bulk(body_post) # , async_req=True
# # print(api_response)
# except ApiException as e:
# print("Exception when calling IndexApi->bulk: %s\n" % e)
# print("ATTEMPT", body_post)
async def queue_message(msg):
"""
@@ -139,9 +147,10 @@ async def queue_message(msg):
src = msg["src"]
message = ujson.dumps(msg)
key = "{KEYPREFIX}{src}"
key = f"{KEYPREFIX}{src}"
await ar.sadd(key, message)
async def queue_message_bulk(data):
"""
Queue multiple messages on the Redis buffer.
@@ -150,7 +159,7 @@ async def queue_message_bulk(data):
src = msg["src"]
message = ujson.dumps(msg)
key = "{KEYPREFIX}{src}"
key = f"{KEYPREFIX}{src}"
await ar.sadd(key, message)
@@ -176,50 +185,50 @@ def queue_message_bulk_sync(data):
# return
# for msg in data:
# store_kafka(msg)
# # 10000: maximum inserts we can submit to
# # Manticore as of Sept 2022
# split_posts = array_split(data, ceil(len(data) / 10000))
# for messages in split_posts:
# total = []
# for msg in messages:
# # Duplicated to avoid extra function call (see above)
# if msg["type"] in TYPES_MAIN:
# index = "main"
# schema = mc_s.schema_main
# elif msg["type"] in TYPES_META:
# index = "meta"
# schema = mc_s.schema_meta
# elif msg["type"] in TYPES_INT:
# index = "internal"
# schema = mc_s.schema_int
# # normalise fields
# for key, value in list(msg.items()):
# if value is None:
# del msg[key]
# if key in schema:
# if isinstance(value, int):
# if schema[key].startswith("string") or schema[key].startswith(
# "text"
# ):
# msg[key] = str(value)
# # 10000: maximum inserts we can submit to
# # Manticore as of Sept 2022
# split_posts = array_split(data, ceil(len(data) / 10000))
# for messages in split_posts:
# total = []
# for msg in messages:
# # Duplicated to avoid extra function call (see above)
# if msg["type"] in TYPES_MAIN:
# index = "main"
# schema = mc_s.schema_main
# elif msg["type"] in TYPES_META:
# index = "meta"
# schema = mc_s.schema_meta
# elif msg["type"] in TYPES_INT:
# index = "internal"
# schema = mc_s.schema_int
# # normalise fields
# for key, value in list(msg.items()):
# if value is None:
# del msg[key]
# if key in schema:
# if isinstance(value, int):
# if schema[key].startswith("string") or schema[key].startswith(
# "text"
# ):
# msg[key] = str(value)
# body = {"insert": {"index": index, "doc": msg}}
# total.append(body)
# body = {"insert": {"index": index, "doc": msg}}
# total.append(body)
# body_post = ""
# for item in total:
# body_post += ujson.dumps(item)
# body_post += "\n"
# body_post = ""
# for item in total:
# body_post += ujson.dumps(item)
# body_post += "\n"
# # print(body_post)
# try:
# # Bulk index operations
# print("FAKE POST")
# #api_response = api_instance.bulk(body_post) # , async_req=True
# #print(api_response)
# except ApiException as e:
# print("Exception when calling IndexApi->bulk: %s\n" % e)
# print("ATTEMPT", body_post)
# # print(body_post)
# try:
# # Bulk index operations
# print("FAKE POST")
# #api_response = api_instance.bulk(body_post) # , async_req=True
# #print(api_response)
# except ApiException as e:
# print("Exception when calling IndexApi->bulk: %s\n" % e)
# print("ATTEMPT", body_post)
# def update_schema():
@@ -243,5 +252,5 @@ def queue_message_bulk_sync(data):
# util_instance.sql(create_query)
#create_index(api_client)
#update_schema()
# create_index(api_client)
# update_schema()