From 4ea77ac543eb531216f6cf626e681037b3e132fd Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Wed, 14 Sep 2022 18:32:32 +0100 Subject: [PATCH] Properly process Redis buffered messages and ingest into Kafka --- db.py | 179 ++++++++++++++++++++++-------------------- docker-compose.yml | 11 ++- monolith.py | 3 +- processing/process.py | 109 +++++++++++++++++-------- sources/ch4.py | 5 +- sources/ingest.py | 16 ++-- 6 files changed, 190 insertions(+), 133 deletions(-) diff --git a/db.py b/db.py index 88676ee..4c92555 100644 --- a/db.py +++ b/db.py @@ -1,15 +1,15 @@ +import random from math import ceil import aioredis import manticoresearch import ujson +from aiokafka import AIOKafkaProducer from manticoresearch.rest import ApiException from numpy import array_split from redis import StrictRedis import util -import random -from aiokafka import AIOKafkaProducer # Manticore schema from schemas import mc_s @@ -21,6 +21,7 @@ api_instance = manticoresearch.IndexApi(api_client) # Kafka from aiokafka import AIOKafkaProducer + KAFKA_TOPIC = "msg" log = util.get_logger("db") @@ -51,7 +52,7 @@ KEYPREFIX = "queue." async def store_kafka_batch(data): print("STORING KAFKA BATCH") - producer = AIOKafkaProducer(bootstrap_servers='kafka:9092') + producer = AIOKafkaProducer(bootstrap_servers="kafka:9092") await producer.start() batch = producer.create_batch() for msg in data: @@ -70,67 +71,74 @@ async def store_kafka_batch(data): del msg[key] if key in schema: if isinstance(value, int): - if schema[key].startswith("string") or schema[key].startswith("text"): + if schema[key].startswith("string") or schema[key].startswith( + "text" + ): msg[key] = str(value) message = ujson.dumps(msg) body = str.encode(message) + if "ts" not in msg: + # print("MSG WITHOUT TS", msg) + continue metadata = batch.append(key=None, value=body, timestamp=msg["ts"]) if metadata is None: partitions = await producer.partitions_for(KAFKA_TOPIC) partition = random.choice(tuple(partitions)) await producer.send_batch(batch, KAFKA_TOPIC, partition=partition) - print("%d messages sent to partition %d" - % (batch.record_count(), partition)) + print( + "%d messages sent to partition %d" % (batch.record_count(), partition) + ) batch = producer.create_batch() continue partitions = await producer.partitions_for(KAFKA_TOPIC) partition = random.choice(tuple(partitions)) await producer.send_batch(batch, KAFKA_TOPIC, partition=partition) - print("%d messages sent to partition %d" - % (batch.record_count(), partition)) + print("%d messages sent to partition %d" % (batch.record_count(), partition)) await producer.stop() + # def store_message(msg): # """ # Store a message into Manticore # :param msg: dict # """ # store_kafka(msg) - # # Duplicated to avoid extra function call - # if msg["type"] in TYPES_MAIN: - # index = "main" - # schema = mc_s.schema_main - # elif msg["type"] in TYPES_META: - # index = "meta" - # schema = mc_s.schema_meta - # elif msg["type"] in TYPES_INT: - # index = "internal" - # schema = mc_s.schema_int - # # normalise fields - # for key, value in list(msg.items()): - # if value is None: - # del msg[key] - # if key in schema: - # if isinstance(value, int): - # if schema[key].startswith("string") or schema[key].startswith("text"): - # msg[key] = str(value) +# # Duplicated to avoid extra function call +# if msg["type"] in TYPES_MAIN: +# index = "main" +# schema = mc_s.schema_main +# elif msg["type"] in TYPES_META: +# index = "meta" +# schema = mc_s.schema_meta +# elif msg["type"] in TYPES_INT: +# index = "internal" +# schema = mc_s.schema_int +# # normalise fields +# for key, value in list(msg.items()): +# if value is None: +# del msg[key] +# if key in schema: +# if isinstance(value, int): +# if schema[key].startswith("string") or schema[key].startswith("text"): +# msg[key] = str(value) - # body = [{"insert": {"index": index, "doc": msg}}] - # body_post = "" - # for item in body: - # body_post += ujson.dumps(item) - # body_post += "\n" +# body = [{"insert": {"index": index, "doc": msg}}] +# body_post = "" +# for item in body: +# body_post += ujson.dumps(item) +# body_post += "\n" + +# # print(body_post) +# try: +# # Bulk index operations +# print("FAKE POST") +# #api_response = api_instance.bulk(body_post) # , async_req=True +# # print(api_response) +# except ApiException as e: +# print("Exception when calling IndexApi->bulk: %s\n" % e) +# print("ATTEMPT", body_post) - # # print(body_post) - # try: - # # Bulk index operations - # print("FAKE POST") - # #api_response = api_instance.bulk(body_post) # , async_req=True - # # print(api_response) - # except ApiException as e: - # print("Exception when calling IndexApi->bulk: %s\n" % e) - # print("ATTEMPT", body_post) async def queue_message(msg): """ @@ -139,9 +147,10 @@ async def queue_message(msg): src = msg["src"] message = ujson.dumps(msg) - key = "{KEYPREFIX}{src}" + key = f"{KEYPREFIX}{src}" await ar.sadd(key, message) + async def queue_message_bulk(data): """ Queue multiple messages on the Redis buffer. @@ -150,7 +159,7 @@ async def queue_message_bulk(data): src = msg["src"] message = ujson.dumps(msg) - key = "{KEYPREFIX}{src}" + key = f"{KEYPREFIX}{src}" await ar.sadd(key, message) @@ -176,50 +185,50 @@ def queue_message_bulk_sync(data): # return # for msg in data: # store_kafka(msg) - # # 10000: maximum inserts we can submit to - # # Manticore as of Sept 2022 - # split_posts = array_split(data, ceil(len(data) / 10000)) - # for messages in split_posts: - # total = [] - # for msg in messages: - # # Duplicated to avoid extra function call (see above) - # if msg["type"] in TYPES_MAIN: - # index = "main" - # schema = mc_s.schema_main - # elif msg["type"] in TYPES_META: - # index = "meta" - # schema = mc_s.schema_meta - # elif msg["type"] in TYPES_INT: - # index = "internal" - # schema = mc_s.schema_int - # # normalise fields - # for key, value in list(msg.items()): - # if value is None: - # del msg[key] - # if key in schema: - # if isinstance(value, int): - # if schema[key].startswith("string") or schema[key].startswith( - # "text" - # ): - # msg[key] = str(value) +# # 10000: maximum inserts we can submit to +# # Manticore as of Sept 2022 +# split_posts = array_split(data, ceil(len(data) / 10000)) +# for messages in split_posts: +# total = [] +# for msg in messages: +# # Duplicated to avoid extra function call (see above) +# if msg["type"] in TYPES_MAIN: +# index = "main" +# schema = mc_s.schema_main +# elif msg["type"] in TYPES_META: +# index = "meta" +# schema = mc_s.schema_meta +# elif msg["type"] in TYPES_INT: +# index = "internal" +# schema = mc_s.schema_int +# # normalise fields +# for key, value in list(msg.items()): +# if value is None: +# del msg[key] +# if key in schema: +# if isinstance(value, int): +# if schema[key].startswith("string") or schema[key].startswith( +# "text" +# ): +# msg[key] = str(value) - # body = {"insert": {"index": index, "doc": msg}} - # total.append(body) +# body = {"insert": {"index": index, "doc": msg}} +# total.append(body) - # body_post = "" - # for item in total: - # body_post += ujson.dumps(item) - # body_post += "\n" +# body_post = "" +# for item in total: +# body_post += ujson.dumps(item) +# body_post += "\n" - # # print(body_post) - # try: - # # Bulk index operations - # print("FAKE POST") - # #api_response = api_instance.bulk(body_post) # , async_req=True - # #print(api_response) - # except ApiException as e: - # print("Exception when calling IndexApi->bulk: %s\n" % e) - # print("ATTEMPT", body_post) +# # print(body_post) +# try: +# # Bulk index operations +# print("FAKE POST") +# #api_response = api_instance.bulk(body_post) # , async_req=True +# #print(api_response) +# except ApiException as e: +# print("Exception when calling IndexApi->bulk: %s\n" % e) +# print("ATTEMPT", body_post) # def update_schema(): @@ -243,5 +252,5 @@ def queue_message_bulk_sync(data): # util_instance.sql(create_query) -#create_index(api_client) -#update_schema() +# create_index(api_client) +# update_schema() diff --git a/docker-compose.yml b/docker-compose.yml index 8d15443..f82b86b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,7 +19,11 @@ services: - .env volumes_from: - tmp - # depends_on: + depends_on: + - broker + - kafka + - tmp + - redis # - db threshold: @@ -52,12 +56,16 @@ services: - 9093:9090 environment: - DRUID_BROKER_URL=http://broker:8082 + depends_on: + - broker metabase: container_name: metabase image: metabase/metabase:latest ports: - 3001:3000 + depends_on: + - broker postgres: container_name: postgres @@ -82,6 +90,7 @@ services: image: bitnami/kafka depends_on: - zookeeper + - broker ports: - 29092:29092 - 9092:9092 diff --git a/monolith.py b/monolith.py index ce4c692..ff3b929 100644 --- a/monolith.py +++ b/monolith.py @@ -1,11 +1,11 @@ import asyncio from os import getenv +import db import util from sources.ch4 import Chan4 from sources.dis import DiscordClient from sources.ingest import Ingest -import db # For development # if not getenv("DISCORD_TOKEN", None): @@ -27,7 +27,6 @@ async def main(loop): log.info("Starting Discord handler.") client = DiscordClient() loop.create_task(client.start(token)) - # client.run(token) log.info("Starting 4chan handler.") chan = Chan4() diff --git a/processing/process.py b/processing/process.py index ee45ebc..b547845 100644 --- a/processing/process.py +++ b/processing/process.py @@ -1,7 +1,20 @@ -from concurrent.futures import ProcessPoolExecutor import asyncio import os +import random + +# For key generation +import string +from concurrent.futures import ProcessPoolExecutor + +# For timestamp processing +from datetime import datetime +from math import ceil + import ujson + +# For 4chan message parsing +from bs4 import BeautifulSoup +from numpy import array_split from siphashc import siphash import db @@ -10,19 +23,6 @@ import util # 4chan schema from schemas.ch4_s import ATTRMAP -# For key generation -import string -import random - -# For timestamp processing -import datetime - -# For 4chan message parsing -from bs4 import BeautifulSoup - -from numpy import array_split -from math import ceil - log = util.get_logger("process") # Maximum number of CPU threads to use for post processing @@ -30,6 +30,7 @@ CPU_THREADS = os.cpu_count() p = ProcessPoolExecutor(CPU_THREADS) + def get_hash_key(): hash_key = db.r.get("hashing_key") if not hash_key: @@ -42,33 +43,68 @@ def get_hash_key(): log.debug(f"Decoded hash key: {hash_key}") return hash_key + hash_key = get_hash_key() + +@asyncio.coroutine async def spawn_processing_threads(data): - print("SPAWN", data) + loop = asyncio.get_event_loop() + tasks = [] + oldts = [x["now"] for x in data if "now" in x] if len(data) < CPU_THREADS: split_data = [data] else: msg_per_core = int(len(data) / CPU_THREADS) print("MSG PER CORE", msg_per_core) split_data = array_split(data, ceil(len(data) / msg_per_core)) - print("SPLIT DATA", split_data) - for split in split_data: + for index, split in enumerate(split_data): print("DELEGATING TO THREAD", len(split)) - await process_data_thread(split) + future = loop.run_in_executor(p, process_data, data) + # future = p.submit(process_data, split) + tasks.append(future) + # results = [x.result(timeout=50) for x in tasks] + results = await asyncio.gather(*tasks) + print("RESULTS", len(results)) + + # Join the results back from the split list + flat_list = [item for sublist in results for item in sublist] + print("LENFLAT", len(flat_list)) + print("LENDATA", len(data)) + + newts = [x["ts"] for x in flat_list if "ts" in x] + print("lenoldts", len(oldts)) + print("lennewts", len(newts)) + allts = all(["ts" in x for x in flat_list]) + print("ALLTS", allts) + alllen = [len(x) for x in flat_list] + print("ALLLEN", alllen) + await db.store_kafka_batch(flat_list) + + +# @asyncio.coroutine +# def process_data_thread(data): +# """ +# Helper to spawn threads to process a list of data. +# """ +# loop = asyncio.get_event_loop() +# if len(data) < CPU_THREADS: +# split_data = [data] +# else: +# msg_per_core = int(len(data) / CPU_THREADS) +# print("MSG PER CORE", msg_per_core) +# split_data = array_split(data, ceil(len(data) / msg_per_core)) +# for index, split in enumerate(split_data): +# print("DELEGATING TO THREAD", len(split)) +# #f = process_data_thread(split) +# yield loop.run_in_executor(p, process_data, data) -@asyncio.coroutine -def process_data_thread(data): - """ - Helper to spawn threads to process a list of data. - """ - loop = asyncio.get_event_loop() - yield from loop.run_in_executor(p, process_data, data) def process_data(data): - print("PROCESSING DATA", data) + print("PROCESS DATA START") + # to_store = [] for index, msg in enumerate(data): - #print("PROCESSING", msg) + # print("PROCESSING", msg) if msg["src"] == "4ch": board = msg["net"] thread = msg["channel"] @@ -81,15 +117,18 @@ def process_data(data): if key_content: key_content = key_content.decode("ascii") if key_content == hash: + del data[index] continue else: - data[index][index]["type"] = "update" + data[index]["type"] = "update" db.r.set(redis_key, hash) - for key2, value in list(msg.items()): + if "now" not in data[index]: + print("NOW NOT IN INDEX", data[index]) + for key2, value in list(data[index].items()): if key2 in ATTRMAP: - msg[ATTRMAP[key2]] = data[index][key2] + data[index][ATTRMAP[key2]] = data[index][key2] del data[index][key2] - if "ts" in msg: + if "ts" in data[index]: old_time = data[index]["ts"] # '08/30/22(Tue)02:25:37' time_spl = old_time.split(":") @@ -100,7 +139,13 @@ def process_data(data): # new_ts = old_ts.isoformat() new_ts = int(old_ts.timestamp()) data[index]["ts"] = new_ts + else: + print("MSG WITHOUT TS PROCESS", data[index]) + continue if "msg" in msg: soup = BeautifulSoup(data[index]["msg"], "html.parser") msg = soup.get_text(separator="\n") - data[index]["msg"] = msg \ No newline at end of file + data[index]["msg"] = msg + # to_store.append(data[index]) + print("FINISHED PROCESSING DATA") + return data diff --git a/sources/ch4.py b/sources/ch4.py index 7fad264..7640c9b 100644 --- a/sources/ch4.py +++ b/sources/ch4.py @@ -136,7 +136,7 @@ class Chan4(object): # Split into 10,000 chunks if not all_posts: return - self.handle_posts(all_posts) + await self.handle_posts(all_posts) # threads_per_core = int(len(all_posts) / CPU_THREADS) # for i in range(CPU_THREADS): # new_dict = {} @@ -146,8 +146,7 @@ class Chan4(object): # new_dict[k].append(v) # else: # new_dict[k] = [v] - #await self.handle_posts_thread(new_dict) - + # await self.handle_posts_thread(new_dict) # print("VAL", ceil(len(all_posts) / threads_per_core)) # split_posts = array_split(all_posts, ceil(len(all_posts) / threads_per_core)) diff --git a/sources/ingest.py b/sources/ingest.py index 3132797..be7d6ca 100644 --- a/sources/ingest.py +++ b/sources/ingest.py @@ -4,24 +4,22 @@ import ujson import db import util - from processing import process -SOURCES = ["irc", "dis", "4ch"] +SOURCES = ["4ch", "irc", "dis"] KEYPREFIX = "queue." -CHUNK_SIZE = 1000 +CHUNK_SIZE = 90000 ITER_DELAY = 0.5 - - - class Ingest(object): def __init__(self): name = self.__class__.__name__ self.log = util.get_logger(name) async def run(self): + # items = [{'no': 23567753, 'now': '09/12/22(Mon)20:10:29', 'name': 'Anonysmous', 'filename': '1644986767568', 'ext': '.webm', 'w': 1280, 'h': 720, 'tn_w': 125, 'tn_h': 70, 'tim': 1663027829301457, 'time': 1663027829, 'md5': 'zeElr1VR05XpZ2XuAPhmPA==', 'fsize': 3843621, 'resto': 23554700, 'type': 'msg', 'src': '4ch', 'net': 'gif', 'channel': '23554700'}] + # await process.spawn_processing_threads(items) while True: await self.get_chunk() await asyncio.sleep(ITER_DELAY) @@ -33,13 +31,11 @@ class Ingest(object): chunk = await db.ar.spop(key, CHUNK_SIZE) if not chunk: continue - #self.log.info(f"Got chunk: {chunk}") + # self.log.info(f"Got chunk: {chunk}") for item in chunk: item = ujson.loads(item) - #self.log.info(f"Got item: {item}") + # self.log.info(f"Got item: {item}") items.append(item) if items: print("PROCESSING", len(items)) await process.spawn_processing_threads(items) - print("DONE WITH PROCESSING", len(items)) - await db.store_kafka_batch(items)