From fec0d379a6fe3103941789d9d21b4e79e7f1b24a Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Tue, 13 Sep 2022 22:17:46 +0100 Subject: [PATCH] Ingest into Kafka and queue messages better --- db.py | 291 +++++++++++++++++++++++++++-------------- monolith.py | 1 + processing/__init__.py | 0 processing/process.py | 106 +++++++++++++++ sources/ch4.py | 102 ++++++++------- sources/dis.py | 2 +- sources/ingest.py | 21 ++- 7 files changed, 366 insertions(+), 157 deletions(-) create mode 100644 processing/__init__.py create mode 100644 processing/process.py diff --git a/db.py b/db.py index 710e7d3..88676ee 100644 --- a/db.py +++ b/db.py @@ -8,12 +8,21 @@ from numpy import array_split from redis import StrictRedis import util +import random +from aiokafka import AIOKafkaProducer + +# Manticore schema from schemas import mc_s +# Manticore configuration = manticoresearch.Configuration(host="http://monolith-db-1:9308") api_client = manticoresearch.ApiClient(configuration) api_instance = manticoresearch.IndexApi(api_client) +# Kafka +from aiokafka import AIOKafkaProducer +KAFKA_TOPIC = "msg" + log = util.get_logger("db") # Redis (legacy) @@ -37,121 +46,201 @@ TYPES_MAIN = [ ] TYPES_META = ["who"] TYPES_INT = ["conn", "highlight", "znc", "query", "self"] +KEYPREFIX = "queue." -def store_message(msg): +async def store_kafka_batch(data): + print("STORING KAFKA BATCH") + producer = AIOKafkaProducer(bootstrap_servers='kafka:9092') + await producer.start() + batch = producer.create_batch() + for msg in data: + if msg["type"] in TYPES_MAIN: + index = "main" + schema = mc_s.schema_main + elif msg["type"] in TYPES_META: + index = "meta" + schema = mc_s.schema_meta + elif msg["type"] in TYPES_INT: + index = "internal" + schema = mc_s.schema_int + # normalise fields + for key, value in list(msg.items()): + if value is None: + del msg[key] + if key in schema: + if isinstance(value, int): + if schema[key].startswith("string") or schema[key].startswith("text"): + msg[key] = str(value) + message = ujson.dumps(msg) + body = str.encode(message) + metadata = batch.append(key=None, value=body, timestamp=msg["ts"]) + if metadata is None: + partitions = await producer.partitions_for(KAFKA_TOPIC) + partition = random.choice(tuple(partitions)) + await producer.send_batch(batch, KAFKA_TOPIC, partition=partition) + print("%d messages sent to partition %d" + % (batch.record_count(), partition)) + batch = producer.create_batch() + continue + + partitions = await producer.partitions_for(KAFKA_TOPIC) + partition = random.choice(tuple(partitions)) + await producer.send_batch(batch, KAFKA_TOPIC, partition=partition) + print("%d messages sent to partition %d" + % (batch.record_count(), partition)) + await producer.stop() + +# def store_message(msg): +# """ +# Store a message into Manticore +# :param msg: dict +# """ +# store_kafka(msg) + # # Duplicated to avoid extra function call + # if msg["type"] in TYPES_MAIN: + # index = "main" + # schema = mc_s.schema_main + # elif msg["type"] in TYPES_META: + # index = "meta" + # schema = mc_s.schema_meta + # elif msg["type"] in TYPES_INT: + # index = "internal" + # schema = mc_s.schema_int + # # normalise fields + # for key, value in list(msg.items()): + # if value is None: + # del msg[key] + # if key in schema: + # if isinstance(value, int): + # if schema[key].startswith("string") or schema[key].startswith("text"): + # msg[key] = str(value) + + # body = [{"insert": {"index": index, "doc": msg}}] + # body_post = "" + # for item in body: + # body_post += ujson.dumps(item) + # body_post += "\n" + + # # print(body_post) + # try: + # # Bulk index operations + # print("FAKE POST") + # #api_response = api_instance.bulk(body_post) # , async_req=True + # # print(api_response) + # except ApiException as e: + # print("Exception when calling IndexApi->bulk: %s\n" % e) + # print("ATTEMPT", body_post) + +async def queue_message(msg): """ - Store a message into Manticore - :param msg: dict + Queue a message on the Redis buffer. """ - # Duplicated to avoid extra function call - if msg["type"] in TYPES_MAIN: - index = "main" - schema = mc_s.schema_main - elif msg["type"] in TYPES_META: - index = "meta" - schema = mc_s.schema_meta - elif msg["type"] in TYPES_INT: - index = "internal" - schema = mc_s.schema_int - # normalise fields - for key, value in list(msg.items()): - if value is None: - del msg[key] - if key in schema: - if isinstance(value, int): - if schema[key].startswith("string") or schema[key].startswith("text"): - msg[key] = str(value) + src = msg["src"] + message = ujson.dumps(msg) - body = [{"insert": {"index": index, "doc": msg}}] - body_post = "" - for item in body: - body_post += ujson.dumps(item) - body_post += "\n" + key = "{KEYPREFIX}{src}" + await ar.sadd(key, message) - # print(body_post) - try: - # Bulk index operations - print("FAKE POST") - #api_response = api_instance.bulk(body_post) # , async_req=True - # print(api_response) - except ApiException as e: - print("Exception when calling IndexApi->bulk: %s\n" % e) - print("ATTEMPT", body_post) - - -def store_message_bulk(data): +async def queue_message_bulk(data): """ - Store a message into Manticore - :param msg: dict + Queue multiple messages on the Redis buffer. """ - if not data: - return - # 10000: maximum inserts we can submit to - # Manticore as of Sept 2022 - split_posts = array_split(data, ceil(len(data) / 10000)) - for messages in split_posts: - total = [] - for msg in messages: - # Duplicated to avoid extra function call (see above) - if msg["type"] in TYPES_MAIN: - index = "main" - schema = mc_s.schema_main - elif msg["type"] in TYPES_META: - index = "meta" - schema = mc_s.schema_meta - elif msg["type"] in TYPES_INT: - index = "internal" - schema = mc_s.schema_int - # normalise fields - for key, value in list(msg.items()): - if value is None: - del msg[key] - if key in schema: - if isinstance(value, int): - if schema[key].startswith("string") or schema[key].startswith( - "text" - ): - msg[key] = str(value) + for msg in data: + src = msg["src"] + message = ujson.dumps(msg) - body = {"insert": {"index": index, "doc": msg}} - total.append(body) - - body_post = "" - for item in total: - body_post += ujson.dumps(item) - body_post += "\n" - - # print(body_post) - try: - # Bulk index operations - print("FAKE POST") - #api_response = api_instance.bulk(body_post) # , async_req=True - #print(api_response) - except ApiException as e: - print("Exception when calling IndexApi->bulk: %s\n" % e) - print("ATTEMPT", body_post) + key = "{KEYPREFIX}{src}" + await ar.sadd(key, message) -def update_schema(): - pass +# For now, make a normal function until we go full async +def queue_message_bulk_sync(data): + """ + Queue multiple messages on the Redis buffer. + """ + for msg in data: + src = msg["src"] + message = ujson.dumps(msg) + + key = "{KEYPREFIX}{src}" + r.sadd(key, message) -def create_index(api_client): - util_instance = manticoresearch.UtilsApi(api_client) - schemas = { - "main": mc_s.schema_main, - "meta": mc_s.schema_meta, - "internal": mc_s.schema_int, - } - for name, schema in schemas.items(): - schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()]) +# def store_message_bulk(data): +# """ +# Store a message into Manticore +# :param msg: dict +# """ +# if not data: +# return +# for msg in data: +# store_kafka(msg) + # # 10000: maximum inserts we can submit to + # # Manticore as of Sept 2022 + # split_posts = array_split(data, ceil(len(data) / 10000)) + # for messages in split_posts: + # total = [] + # for msg in messages: + # # Duplicated to avoid extra function call (see above) + # if msg["type"] in TYPES_MAIN: + # index = "main" + # schema = mc_s.schema_main + # elif msg["type"] in TYPES_META: + # index = "meta" + # schema = mc_s.schema_meta + # elif msg["type"] in TYPES_INT: + # index = "internal" + # schema = mc_s.schema_int + # # normalise fields + # for key, value in list(msg.items()): + # if value is None: + # del msg[key] + # if key in schema: + # if isinstance(value, int): + # if schema[key].startswith("string") or schema[key].startswith( + # "text" + # ): + # msg[key] = str(value) - create_query = ( - f"create table if not exists {name}({schema_types}) engine='columnar'" - ) - print("Schema types", create_query) - util_instance.sql(create_query) + # body = {"insert": {"index": index, "doc": msg}} + # total.append(body) + + # body_post = "" + # for item in total: + # body_post += ujson.dumps(item) + # body_post += "\n" + + # # print(body_post) + # try: + # # Bulk index operations + # print("FAKE POST") + # #api_response = api_instance.bulk(body_post) # , async_req=True + # #print(api_response) + # except ApiException as e: + # print("Exception when calling IndexApi->bulk: %s\n" % e) + # print("ATTEMPT", body_post) + + +# def update_schema(): +# pass + + +# def create_index(api_client): +# util_instance = manticoresearch.UtilsApi(api_client) +# schemas = { +# "main": mc_s.schema_main, +# "meta": mc_s.schema_meta, +# "internal": mc_s.schema_int, +# } +# for name, schema in schemas.items(): +# schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()]) + +# create_query = ( +# f"create table if not exists {name}({schema_types}) engine='columnar'" +# ) +# print("Schema types", create_query) +# util_instance.sql(create_query) #create_index(api_client) diff --git a/monolith.py b/monolith.py index fbead0a..ce4c692 100644 --- a/monolith.py +++ b/monolith.py @@ -5,6 +5,7 @@ import util from sources.ch4 import Chan4 from sources.dis import DiscordClient from sources.ingest import Ingest +import db # For development # if not getenv("DISCORD_TOKEN", None): diff --git a/processing/__init__.py b/processing/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/processing/process.py b/processing/process.py new file mode 100644 index 0000000..ee45ebc --- /dev/null +++ b/processing/process.py @@ -0,0 +1,106 @@ +from concurrent.futures import ProcessPoolExecutor +import asyncio +import os +import ujson +from siphashc import siphash + +import db +import util + +# 4chan schema +from schemas.ch4_s import ATTRMAP + +# For key generation +import string +import random + +# For timestamp processing +import datetime + +# For 4chan message parsing +from bs4 import BeautifulSoup + +from numpy import array_split +from math import ceil + +log = util.get_logger("process") + +# Maximum number of CPU threads to use for post processing +CPU_THREADS = os.cpu_count() + +p = ProcessPoolExecutor(CPU_THREADS) + +def get_hash_key(): + hash_key = db.r.get("hashing_key") + if not hash_key: + letters = string.ascii_lowercase + hash_key = "".join(random.choice(letters) for i in range(16)) + log.debug(f"Created new hash key: {hash_key}") + db.r.set("hashing_key", hash_key) + else: + hash_key = hash_key.decode("ascii") + log.debug(f"Decoded hash key: {hash_key}") + return hash_key + +hash_key = get_hash_key() + +async def spawn_processing_threads(data): + print("SPAWN", data) + if len(data) < CPU_THREADS: + split_data = [data] + else: + msg_per_core = int(len(data) / CPU_THREADS) + print("MSG PER CORE", msg_per_core) + split_data = array_split(data, ceil(len(data) / msg_per_core)) + print("SPLIT DATA", split_data) + for split in split_data: + print("DELEGATING TO THREAD", len(split)) + await process_data_thread(split) + +@asyncio.coroutine +def process_data_thread(data): + """ + Helper to spawn threads to process a list of data. + """ + loop = asyncio.get_event_loop() + yield from loop.run_in_executor(p, process_data, data) + +def process_data(data): + print("PROCESSING DATA", data) + for index, msg in enumerate(data): + #print("PROCESSING", msg) + if msg["src"] == "4ch": + board = msg["net"] + thread = msg["channel"] + # Calculate hash for post + post_normalised = ujson.dumps(msg, sort_keys=True) + hash = siphash(hash_key, post_normalised) + hash = str(hash) + redis_key = f"cache.{board}.{thread}.{msg['no']}" + key_content = db.r.get(redis_key) + if key_content: + key_content = key_content.decode("ascii") + if key_content == hash: + continue + else: + data[index][index]["type"] = "update" + db.r.set(redis_key, hash) + for key2, value in list(msg.items()): + if key2 in ATTRMAP: + msg[ATTRMAP[key2]] = data[index][key2] + del data[index][key2] + if "ts" in msg: + old_time = data[index]["ts"] + # '08/30/22(Tue)02:25:37' + time_spl = old_time.split(":") + if len(time_spl) == 3: + old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S") + else: + old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M") + # new_ts = old_ts.isoformat() + new_ts = int(old_ts.timestamp()) + data[index]["ts"] = new_ts + if "msg" in msg: + soup = BeautifulSoup(data[index]["msg"], "html.parser") + msg = soup.get_text(separator="\n") + data[index]["msg"] = msg \ No newline at end of file diff --git a/sources/ch4.py b/sources/ch4.py index 54038ea..7fad264 100644 --- a/sources/ch4.py +++ b/sources/ch4.py @@ -19,19 +19,19 @@ from schemas.ch4_s import ATTRMAP # CONFIGURATION # # Number of 4chan threads to request at once -THREADS_CONCURRENT = 100 +THREADS_CONCURRENT = 1000 # Seconds to wait between every THREADS_CONCURRENT requests -THREADS_DELAY = 0.8 +THREADS_DELAY = 0.1 # Seconds to wait between crawls CRAWL_DELAY = 5 # Semaphore value ? -THREADS_SEMAPHORE = 100 +THREADS_SEMAPHORE = 1000 # Maximum number of CPU threads to use for post processing -CPU_THREADS = 1 +CPU_THREADS = 8 # CONFIGURATION END # @@ -95,7 +95,7 @@ class Chan4(object): no = threads["no"] to_get.append((mapped, no)) - self.log.info(f"Got thread list for {mapped}: {len(response)}") + self.log.debug(f"Got thread list for {mapped}: {len(response)}") if not to_get: return split_threads = array_split(to_get, ceil(len(to_get) / THREADS_CONCURRENT)) @@ -136,16 +136,19 @@ class Chan4(object): # Split into 10,000 chunks if not all_posts: return - threads_per_core = int(len(all_posts) / CPU_THREADS) - for i in range(CPU_THREADS): - new_dict = {} - pulled_posts = self.take_items(all_posts, threads_per_core) - for k, v in pulled_posts: - if k in new_dict: - new_dict[k].append(v) - else: - new_dict[k] = [v] - await self.handle_posts_thread(new_dict) + self.handle_posts(all_posts) + # threads_per_core = int(len(all_posts) / CPU_THREADS) + # for i in range(CPU_THREADS): + # new_dict = {} + # pulled_posts = self.take_items(all_posts, threads_per_core) + # for k, v in pulled_posts: + # if k in new_dict: + # new_dict[k].append(v) + # else: + # new_dict[k] = [v] + #await self.handle_posts_thread(new_dict) + + # print("VAL", ceil(len(all_posts) / threads_per_core)) # split_posts = array_split(all_posts, ceil(len(all_posts) / threads_per_core)) # print("THREADS PER CORE SPLIT", len(split_posts)) @@ -161,46 +164,46 @@ class Chan4(object): loop = asyncio.get_event_loop() yield from loop.run_in_executor(p, self.handle_posts, posts) - def handle_posts(self, posts): + async def handle_posts(self, posts): to_store = [] for key, post_list in posts.items(): board, thread = key for index, post in enumerate(post_list): posts[key][index]["type"] = "msg" - # Calculate hash for post - post_normalised = ujson.dumps(post, sort_keys=True) - hash = siphash(self.hash_key, post_normalised) - hash = str(hash) - redis_key = f"cache.{board}.{thread}.{post['no']}" - key_content = db.r.get(redis_key) - if key_content: - key_content = key_content.decode("ascii") - if key_content == hash: - continue - else: - posts[key][index]["type"] = "update" - db.r.set(redis_key, hash) + # # Calculate hash for post + # post_normalised = ujson.dumps(post, sort_keys=True) + # hash = siphash(self.hash_key, post_normalised) + # hash = str(hash) + # redis_key = f"cache.{board}.{thread}.{post['no']}" + # key_content = db.r.get(redis_key) + # if key_content: + # key_content = key_content.decode("ascii") + # if key_content == hash: + # continue + # else: + # posts[key][index]["type"] = "update" + # #db.r.set(redis_key, hash) - for key2, value in list(post.items()): - if key2 in ATTRMAP: - post[ATTRMAP[key2]] = posts[key][index][key2] - del posts[key][index][key2] - if "ts" in post: - old_time = posts[key][index]["ts"] - # '08/30/22(Tue)02:25:37' - time_spl = old_time.split(":") - if len(time_spl) == 3: - old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S") - else: - old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M") - # new_ts = old_ts.isoformat() - new_ts = int(old_ts.timestamp()) - posts[key][index]["ts"] = new_ts - if "msg" in post: - soup = BeautifulSoup(posts[key][index]["msg"], "html.parser") - msg = soup.get_text(separator="\n") - posts[key][index]["msg"] = msg + # for key2, value in list(post.items()): + # if key2 in ATTRMAP: + # post[ATTRMAP[key2]] = posts[key][index][key2] + # del posts[key][index][key2] + # if "ts" in post: + # old_time = posts[key][index]["ts"] + # # '08/30/22(Tue)02:25:37' + # time_spl = old_time.split(":") + # if len(time_spl) == 3: + # old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S") + # else: + # old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M") + # # new_ts = old_ts.isoformat() + # new_ts = int(old_ts.timestamp()) + # posts[key][index]["ts"] = new_ts + # if "msg" in post: + # soup = BeautifulSoup(posts[key][index]["msg"], "html.parser") + # msg = soup.get_text(separator="\n") + # posts[key][index]["msg"] = msg posts[key][index]["src"] = "4ch" posts[key][index]["net"] = board @@ -211,7 +214,8 @@ class Chan4(object): # print({name_map[name]: val for name, val in post.items()}) # print(f"Got posts: {len(posts)}") if to_store: - db.store_message_bulk(to_store) + print("STORING", len(to_store)) + await db.queue_message_bulk(to_store) async def fetch(self, url, session, mapped): async with session.get(url) as response: diff --git a/sources/dis.py b/sources/dis.py index 0b8b2f6..c187cf7 100644 --- a/sources/dis.py +++ b/sources/dis.py @@ -41,4 +41,4 @@ class DiscordClient(discord.Client): a["type"] = "msg" a["src"] = "dis" - db.store_message(a) + await db.queue_message(a) diff --git a/sources/ingest.py b/sources/ingest.py index 083a085..3132797 100644 --- a/sources/ingest.py +++ b/sources/ingest.py @@ -5,12 +5,17 @@ import ujson import db import util -SOURCES = ["irc"] +from processing import process + +SOURCES = ["irc", "dis", "4ch"] KEYPREFIX = "queue." CHUNK_SIZE = 1000 ITER_DELAY = 0.5 + + + class Ingest(object): def __init__(self): name = self.__class__.__name__ @@ -18,19 +23,23 @@ class Ingest(object): async def run(self): while True: - await self.process_chunk() + await self.get_chunk() await asyncio.sleep(ITER_DELAY) - async def process_chunk(self): + async def get_chunk(self): items = [] for source in SOURCES: key = f"{KEYPREFIX}{source}" chunk = await db.ar.spop(key, CHUNK_SIZE) if not chunk: continue - self.log.info(f"Got chunk: {chunk}") + #self.log.info(f"Got chunk: {chunk}") for item in chunk: item = ujson.loads(item) - self.log.info(f"Got item: {item}") + #self.log.info(f"Got item: {item}") items.append(item) - db.store_message_bulk(items) + if items: + print("PROCESSING", len(items)) + await process.spawn_processing_threads(items) + print("DONE WITH PROCESSING", len(items)) + await db.store_kafka_batch(items)