Properly process Redis buffered messages and ingest into Kafka

Mark Veidemanis 2 years ago
parent c5f01c3084
commit f432e9b29e

189
db.py

@ -1,15 +1,15 @@
import random
from math import ceil from math import ceil
import aioredis import aioredis
import manticoresearch import manticoresearch
import ujson import ujson
from aiokafka import AIOKafkaProducer
from manticoresearch.rest import ApiException from manticoresearch.rest import ApiException
from numpy import array_split from numpy import array_split
from redis import StrictRedis from redis import StrictRedis
import util import util
import random
from aiokafka import AIOKafkaProducer
# Manticore schema # Manticore schema
from schemas import mc_s from schemas import mc_s
@ -21,6 +21,7 @@ api_instance = manticoresearch.IndexApi(api_client)
# Kafka # Kafka
from aiokafka import AIOKafkaProducer from aiokafka import AIOKafkaProducer
KAFKA_TOPIC = "msg" KAFKA_TOPIC = "msg"
log = util.get_logger("db") log = util.get_logger("db")
@ -51,7 +52,7 @@ KEYPREFIX = "queue."
async def store_kafka_batch(data): async def store_kafka_batch(data):
print("STORING KAFKA BATCH") print("STORING KAFKA BATCH")
producer = AIOKafkaProducer(bootstrap_servers='kafka:9092') producer = AIOKafkaProducer(bootstrap_servers="kafka:9092")
await producer.start() await producer.start()
batch = producer.create_batch() batch = producer.create_batch()
for msg in data: for msg in data:
@ -70,67 +71,74 @@ async def store_kafka_batch(data):
del msg[key] del msg[key]
if key in schema: if key in schema:
if isinstance(value, int): if isinstance(value, int):
if schema[key].startswith("string") or schema[key].startswith("text"): if schema[key].startswith("string") or schema[key].startswith(
"text"
):
msg[key] = str(value) msg[key] = str(value)
message = ujson.dumps(msg) message = ujson.dumps(msg)
body = str.encode(message) body = str.encode(message)
if "ts" not in msg:
# print("MSG WITHOUT TS", msg)
continue
metadata = batch.append(key=None, value=body, timestamp=msg["ts"]) metadata = batch.append(key=None, value=body, timestamp=msg["ts"])
if metadata is None: if metadata is None:
partitions = await producer.partitions_for(KAFKA_TOPIC) partitions = await producer.partitions_for(KAFKA_TOPIC)
partition = random.choice(tuple(partitions)) partition = random.choice(tuple(partitions))
await producer.send_batch(batch, KAFKA_TOPIC, partition=partition) await producer.send_batch(batch, KAFKA_TOPIC, partition=partition)
print("%d messages sent to partition %d" print(
% (batch.record_count(), partition)) "%d messages sent to partition %d" % (batch.record_count(), partition)
)
batch = producer.create_batch() batch = producer.create_batch()
continue continue
partitions = await producer.partitions_for(KAFKA_TOPIC) partitions = await producer.partitions_for(KAFKA_TOPIC)
partition = random.choice(tuple(partitions)) partition = random.choice(tuple(partitions))
await producer.send_batch(batch, KAFKA_TOPIC, partition=partition) await producer.send_batch(batch, KAFKA_TOPIC, partition=partition)
print("%d messages sent to partition %d" print("%d messages sent to partition %d" % (batch.record_count(), partition))
% (batch.record_count(), partition))
await producer.stop() await producer.stop()
# def store_message(msg): # def store_message(msg):
# """ # """
# Store a message into Manticore # Store a message into Manticore
# :param msg: dict # :param msg: dict
# """ # """
# store_kafka(msg) # store_kafka(msg)
# # Duplicated to avoid extra function call # # Duplicated to avoid extra function call
# if msg["type"] in TYPES_MAIN: # if msg["type"] in TYPES_MAIN:
# index = "main" # index = "main"
# schema = mc_s.schema_main # schema = mc_s.schema_main
# elif msg["type"] in TYPES_META: # elif msg["type"] in TYPES_META:
# index = "meta" # index = "meta"
# schema = mc_s.schema_meta # schema = mc_s.schema_meta
# elif msg["type"] in TYPES_INT: # elif msg["type"] in TYPES_INT:
# index = "internal" # index = "internal"
# schema = mc_s.schema_int # schema = mc_s.schema_int
# # normalise fields # # normalise fields
# for key, value in list(msg.items()): # for key, value in list(msg.items()):
# if value is None: # if value is None:
# del msg[key] # del msg[key]
# if key in schema: # if key in schema:
# if isinstance(value, int): # if isinstance(value, int):
# if schema[key].startswith("string") or schema[key].startswith("text"): # if schema[key].startswith("string") or schema[key].startswith("text"):
# msg[key] = str(value) # msg[key] = str(value)
# body = [{"insert": {"index": index, "doc": msg}}] # body = [{"insert": {"index": index, "doc": msg}}]
# body_post = "" # body_post = ""
# for item in body: # for item in body:
# body_post += ujson.dumps(item) # body_post += ujson.dumps(item)
# body_post += "\n" # body_post += "\n"
# # print(body_post) # # print(body_post)
# try: # try:
# # Bulk index operations # # Bulk index operations
# print("FAKE POST") # print("FAKE POST")
# #api_response = api_instance.bulk(body_post) # , async_req=True # #api_response = api_instance.bulk(body_post) # , async_req=True
# # print(api_response) # # print(api_response)
# except ApiException as e: # except ApiException as e:
# print("Exception when calling IndexApi->bulk: %s\n" % e) # print("Exception when calling IndexApi->bulk: %s\n" % e)
# print("ATTEMPT", body_post) # print("ATTEMPT", body_post)
async def queue_message(msg): async def queue_message(msg):
""" """
@ -139,9 +147,10 @@ async def queue_message(msg):
src = msg["src"] src = msg["src"]
message = ujson.dumps(msg) message = ujson.dumps(msg)
key = "{KEYPREFIX}{src}" key = f"{KEYPREFIX}{src}"
await ar.sadd(key, message) await ar.sadd(key, message)
async def queue_message_bulk(data): async def queue_message_bulk(data):
""" """
Queue multiple messages on the Redis buffer. Queue multiple messages on the Redis buffer.
@ -150,7 +159,7 @@ async def queue_message_bulk(data):
src = msg["src"] src = msg["src"]
message = ujson.dumps(msg) message = ujson.dumps(msg)
key = "{KEYPREFIX}{src}" key = f"{KEYPREFIX}{src}"
await ar.sadd(key, message) await ar.sadd(key, message)
@ -176,50 +185,50 @@ def queue_message_bulk_sync(data):
# return # return
# for msg in data: # for msg in data:
# store_kafka(msg) # store_kafka(msg)
# # 10000: maximum inserts we can submit to # # 10000: maximum inserts we can submit to
# # Manticore as of Sept 2022 # # Manticore as of Sept 2022
# split_posts = array_split(data, ceil(len(data) / 10000)) # split_posts = array_split(data, ceil(len(data) / 10000))
# for messages in split_posts: # for messages in split_posts:
# total = [] # total = []
# for msg in messages: # for msg in messages:
# # Duplicated to avoid extra function call (see above) # # Duplicated to avoid extra function call (see above)
# if msg["type"] in TYPES_MAIN: # if msg["type"] in TYPES_MAIN:
# index = "main" # index = "main"
# schema = mc_s.schema_main # schema = mc_s.schema_main
# elif msg["type"] in TYPES_META: # elif msg["type"] in TYPES_META:
# index = "meta" # index = "meta"
# schema = mc_s.schema_meta # schema = mc_s.schema_meta
# elif msg["type"] in TYPES_INT: # elif msg["type"] in TYPES_INT:
# index = "internal" # index = "internal"
# schema = mc_s.schema_int # schema = mc_s.schema_int
# # normalise fields # # normalise fields
# for key, value in list(msg.items()): # for key, value in list(msg.items()):
# if value is None: # if value is None:
# del msg[key] # del msg[key]
# if key in schema: # if key in schema:
# if isinstance(value, int): # if isinstance(value, int):
# if schema[key].startswith("string") or schema[key].startswith( # if schema[key].startswith("string") or schema[key].startswith(
# "text" # "text"
# ): # ):
# msg[key] = str(value) # msg[key] = str(value)
# body = {"insert": {"index": index, "doc": msg}} # body = {"insert": {"index": index, "doc": msg}}
# total.append(body) # total.append(body)
# body_post = "" # body_post = ""
# for item in total: # for item in total:
# body_post += ujson.dumps(item) # body_post += ujson.dumps(item)
# body_post += "\n" # body_post += "\n"
# # print(body_post) # # print(body_post)
# try: # try:
# # Bulk index operations # # Bulk index operations
# print("FAKE POST") # print("FAKE POST")
# #api_response = api_instance.bulk(body_post) # , async_req=True # #api_response = api_instance.bulk(body_post) # , async_req=True
# #print(api_response) # #print(api_response)
# except ApiException as e: # except ApiException as e:
# print("Exception when calling IndexApi->bulk: %s\n" % e) # print("Exception when calling IndexApi->bulk: %s\n" % e)
# print("ATTEMPT", body_post) # print("ATTEMPT", body_post)
# def update_schema(): # def update_schema():
@ -243,5 +252,5 @@ def queue_message_bulk_sync(data):
# util_instance.sql(create_query) # util_instance.sql(create_query)
#create_index(api_client) # create_index(api_client)
#update_schema() # update_schema()

@ -19,7 +19,11 @@ services:
- .env - .env
volumes_from: volumes_from:
- tmp - tmp
# depends_on: depends_on:
- broker
- kafka
- tmp
- redis
# - db # - db
threshold: threshold:
@ -52,12 +56,16 @@ services:
- 9093:9090 - 9093:9090
environment: environment:
- DRUID_BROKER_URL=http://broker:8082 - DRUID_BROKER_URL=http://broker:8082
depends_on:
- broker
metabase: metabase:
container_name: metabase container_name: metabase
image: metabase/metabase:latest image: metabase/metabase:latest
ports: ports:
- 3001:3000 - 3001:3000
depends_on:
- broker
postgres: postgres:
container_name: postgres container_name: postgres
@ -82,6 +90,7 @@ services:
image: bitnami/kafka image: bitnami/kafka
depends_on: depends_on:
- zookeeper - zookeeper
- broker
ports: ports:
- 29092:29092 - 29092:29092
- 9092:9092 - 9092:9092

@ -1,11 +1,11 @@
import asyncio import asyncio
from os import getenv from os import getenv
import db
import util import util
from sources.ch4 import Chan4 from sources.ch4 import Chan4
from sources.dis import DiscordClient from sources.dis import DiscordClient
from sources.ingest import Ingest from sources.ingest import Ingest
import db
# For development # For development
# if not getenv("DISCORD_TOKEN", None): # if not getenv("DISCORD_TOKEN", None):
@ -27,7 +27,6 @@ async def main(loop):
log.info("Starting Discord handler.") log.info("Starting Discord handler.")
client = DiscordClient() client = DiscordClient()
loop.create_task(client.start(token)) loop.create_task(client.start(token))
# client.run(token)
log.info("Starting 4chan handler.") log.info("Starting 4chan handler.")
chan = Chan4() chan = Chan4()

@ -1,27 +1,27 @@
from concurrent.futures import ProcessPoolExecutor
import asyncio import asyncio
import os import os
import ujson import random
from siphashc import siphash
import db
import util
# 4chan schema
from schemas.ch4_s import ATTRMAP
# For key generation # For key generation
import string import string
import random from concurrent.futures import ProcessPoolExecutor
# For timestamp processing # For timestamp processing
import datetime from datetime import datetime
from math import ceil
import ujson
# For 4chan message parsing # For 4chan message parsing
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from numpy import array_split from numpy import array_split
from math import ceil from siphashc import siphash
import db
import util
# 4chan schema
from schemas.ch4_s import ATTRMAP
log = util.get_logger("process") log = util.get_logger("process")
@ -30,6 +30,7 @@ CPU_THREADS = os.cpu_count()
p = ProcessPoolExecutor(CPU_THREADS) p = ProcessPoolExecutor(CPU_THREADS)
def get_hash_key(): def get_hash_key():
hash_key = db.r.get("hashing_key") hash_key = db.r.get("hashing_key")
if not hash_key: if not hash_key:
@ -42,33 +43,68 @@ def get_hash_key():
log.debug(f"Decoded hash key: {hash_key}") log.debug(f"Decoded hash key: {hash_key}")
return hash_key return hash_key
hash_key = get_hash_key() hash_key = get_hash_key()
@asyncio.coroutine
async def spawn_processing_threads(data): async def spawn_processing_threads(data):
print("SPAWN", data) loop = asyncio.get_event_loop()
tasks = []
oldts = [x["now"] for x in data if "now" in x]
if len(data) < CPU_THREADS: if len(data) < CPU_THREADS:
split_data = [data] split_data = [data]
else: else:
msg_per_core = int(len(data) / CPU_THREADS) msg_per_core = int(len(data) / CPU_THREADS)
print("MSG PER CORE", msg_per_core) print("MSG PER CORE", msg_per_core)
split_data = array_split(data, ceil(len(data) / msg_per_core)) split_data = array_split(data, ceil(len(data) / msg_per_core))
print("SPLIT DATA", split_data) for index, split in enumerate(split_data):
for split in split_data:
print("DELEGATING TO THREAD", len(split)) print("DELEGATING TO THREAD", len(split))
await process_data_thread(split) future = loop.run_in_executor(p, process_data, data)
# future = p.submit(process_data, split)
tasks.append(future)
# results = [x.result(timeout=50) for x in tasks]
results = await asyncio.gather(*tasks)
print("RESULTS", len(results))
# Join the results back from the split list
flat_list = [item for sublist in results for item in sublist]
print("LENFLAT", len(flat_list))
print("LENDATA", len(data))
newts = [x["ts"] for x in flat_list if "ts" in x]
print("lenoldts", len(oldts))
print("lennewts", len(newts))
allts = all(["ts" in x for x in flat_list])
print("ALLTS", allts)
alllen = [len(x) for x in flat_list]
print("ALLLEN", alllen)
await db.store_kafka_batch(flat_list)
# @asyncio.coroutine
# def process_data_thread(data):
# """
# Helper to spawn threads to process a list of data.
# """
# loop = asyncio.get_event_loop()
# if len(data) < CPU_THREADS:
# split_data = [data]
# else:
# msg_per_core = int(len(data) / CPU_THREADS)
# print("MSG PER CORE", msg_per_core)
# split_data = array_split(data, ceil(len(data) / msg_per_core))
# for index, split in enumerate(split_data):
# print("DELEGATING TO THREAD", len(split))
# #f = process_data_thread(split)
# yield loop.run_in_executor(p, process_data, data)
@asyncio.coroutine
def process_data_thread(data):
"""
Helper to spawn threads to process a list of data.
"""
loop = asyncio.get_event_loop()
yield from loop.run_in_executor(p, process_data, data)
def process_data(data): def process_data(data):
print("PROCESSING DATA", data) print("PROCESS DATA START")
# to_store = []
for index, msg in enumerate(data): for index, msg in enumerate(data):
#print("PROCESSING", msg) # print("PROCESSING", msg)
if msg["src"] == "4ch": if msg["src"] == "4ch":
board = msg["net"] board = msg["net"]
thread = msg["channel"] thread = msg["channel"]
@ -81,15 +117,18 @@ def process_data(data):
if key_content: if key_content:
key_content = key_content.decode("ascii") key_content = key_content.decode("ascii")
if key_content == hash: if key_content == hash:
del data[index]
continue continue
else: else:
data[index][index]["type"] = "update" data[index]["type"] = "update"
db.r.set(redis_key, hash) db.r.set(redis_key, hash)
for key2, value in list(msg.items()): if "now" not in data[index]:
print("NOW NOT IN INDEX", data[index])
for key2, value in list(data[index].items()):
if key2 in ATTRMAP: if key2 in ATTRMAP:
msg[ATTRMAP[key2]] = data[index][key2] data[index][ATTRMAP[key2]] = data[index][key2]
del data[index][key2] del data[index][key2]
if "ts" in msg: if "ts" in data[index]:
old_time = data[index]["ts"] old_time = data[index]["ts"]
# '08/30/22(Tue)02:25:37' # '08/30/22(Tue)02:25:37'
time_spl = old_time.split(":") time_spl = old_time.split(":")
@ -100,7 +139,13 @@ def process_data(data):
# new_ts = old_ts.isoformat() # new_ts = old_ts.isoformat()
new_ts = int(old_ts.timestamp()) new_ts = int(old_ts.timestamp())
data[index]["ts"] = new_ts data[index]["ts"] = new_ts
else:
print("MSG WITHOUT TS PROCESS", data[index])
continue
if "msg" in msg: if "msg" in msg:
soup = BeautifulSoup(data[index]["msg"], "html.parser") soup = BeautifulSoup(data[index]["msg"], "html.parser")
msg = soup.get_text(separator="\n") msg = soup.get_text(separator="\n")
data[index]["msg"] = msg data[index]["msg"] = msg
# to_store.append(data[index])
print("FINISHED PROCESSING DATA")
return data

@ -136,7 +136,7 @@ class Chan4(object):
# Split into 10,000 chunks # Split into 10,000 chunks
if not all_posts: if not all_posts:
return return
self.handle_posts(all_posts) await self.handle_posts(all_posts)
# threads_per_core = int(len(all_posts) / CPU_THREADS) # threads_per_core = int(len(all_posts) / CPU_THREADS)
# for i in range(CPU_THREADS): # for i in range(CPU_THREADS):
# new_dict = {} # new_dict = {}
@ -146,8 +146,7 @@ class Chan4(object):
# new_dict[k].append(v) # new_dict[k].append(v)
# else: # else:
# new_dict[k] = [v] # new_dict[k] = [v]
#await self.handle_posts_thread(new_dict) # await self.handle_posts_thread(new_dict)
# print("VAL", ceil(len(all_posts) / threads_per_core)) # print("VAL", ceil(len(all_posts) / threads_per_core))
# split_posts = array_split(all_posts, ceil(len(all_posts) / threads_per_core)) # split_posts = array_split(all_posts, ceil(len(all_posts) / threads_per_core))

@ -4,24 +4,22 @@ import ujson
import db import db
import util import util
from processing import process from processing import process
SOURCES = ["irc", "dis", "4ch"] SOURCES = ["4ch", "irc", "dis"]
KEYPREFIX = "queue." KEYPREFIX = "queue."
CHUNK_SIZE = 1000 CHUNK_SIZE = 90000
ITER_DELAY = 0.5 ITER_DELAY = 0.5
class Ingest(object): class Ingest(object):
def __init__(self): def __init__(self):
name = self.__class__.__name__ name = self.__class__.__name__
self.log = util.get_logger(name) self.log = util.get_logger(name)
async def run(self): async def run(self):
# items = [{'no': 23567753, 'now': '09/12/22(Mon)20:10:29', 'name': 'Anonysmous', 'filename': '1644986767568', 'ext': '.webm', 'w': 1280, 'h': 720, 'tn_w': 125, 'tn_h': 70, 'tim': 1663027829301457, 'time': 1663027829, 'md5': 'zeElr1VR05XpZ2XuAPhmPA==', 'fsize': 3843621, 'resto': 23554700, 'type': 'msg', 'src': '4ch', 'net': 'gif', 'channel': '23554700'}]
# await process.spawn_processing_threads(items)
while True: while True:
await self.get_chunk() await self.get_chunk()
await asyncio.sleep(ITER_DELAY) await asyncio.sleep(ITER_DELAY)
@ -33,13 +31,11 @@ class Ingest(object):
chunk = await db.ar.spop(key, CHUNK_SIZE) chunk = await db.ar.spop(key, CHUNK_SIZE)
if not chunk: if not chunk:
continue continue
#self.log.info(f"Got chunk: {chunk}") # self.log.info(f"Got chunk: {chunk}")
for item in chunk: for item in chunk:
item = ujson.loads(item) item = ujson.loads(item)
#self.log.info(f"Got item: {item}") # self.log.info(f"Got item: {item}")
items.append(item) items.append(item)
if items: if items:
print("PROCESSING", len(items)) print("PROCESSING", len(items))
await process.spawn_processing_threads(items) await process.spawn_processing_threads(items)
print("DONE WITH PROCESSING", len(items))
await db.store_kafka_batch(items)

Loading…
Cancel
Save