import asyncio import os import random # For key generation import string # For timing import time # Squash errors import warnings from concurrent.futures import ProcessPoolExecutor # For timestamp processing from datetime import datetime from os import getenv import orjson import regex # For 4chan message parsing from bs4 import BeautifulSoup # Tokenisation # import spacy from gensim.parsing.preprocessing import ( # stem_text, preprocess_string, remove_stopwords, strip_multiple_whitespaces, strip_non_alphanum, strip_numeric, strip_punctuation, strip_short, strip_tags, ) from polyglot.detect.base import logger as polyglot_logger # For NLP from polyglot.text import Text from pycld2 import error as cld2_error from siphashc import siphash # For sentiment from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer import db import util # 4chan schema from schemas.ch4_s import ATTRMAP trues = ("true", "1", "t", True) KEYNAME = "queue" MONOLITH_PROCESS_PERFSTATS = ( getenv("MONOLITH_PROCESS_PERFSTATS", "false").lower() in trues ) CUSTOM_FILTERS = [ lambda x: x.lower(), strip_tags, # strip_punctuation, # strip_multiple_whitespaces, strip_numeric, remove_stopwords, strip_short, # stem_text, strip_non_alphanum, # ] RE_BAD_CHARS = regex.compile(r"[\p{Cc}\p{Cs}]+") # Squash errors polyglot_logger.setLevel("ERROR") warnings.filterwarnings("ignore", category=UserWarning, module="bs4") # TAGS = ["NOUN", "ADJ", "VERB", "ADV"] # nlp = spacy.load("en_core_web_sm", disable=["parser", "ner"]) log = util.get_logger("process") # Maximum number of CPU threads to use for post processing CPU_THREADS = int(os.getenv("MONOLITH_PROCESS_THREADS", os.cpu_count())) p = ProcessPoolExecutor(CPU_THREADS) def get_hash_key(): hash_key = db.r.get("hashing_key") if not hash_key: letters = string.ascii_lowercase hash_key = "".join(random.choice(letters) for i in range(16)) log.debug(f"Created new hash key: {hash_key}") db.r.set("hashing_key", hash_key) else: hash_key = hash_key.decode("ascii") log.debug(f"Decoded hash key: {hash_key}") return hash_key hash_key = get_hash_key() @asyncio.coroutine async def spawn_processing_threads(chunk, length): log.debug(f"Spawning processing threads for chunk {chunk} of length {length}") loop = asyncio.get_event_loop() tasks = [] if length < CPU_THREADS * 100: cores = 1 chunk_size = length else: cores = CPU_THREADS chunk_size = int(length / cores) for index in range(cores): log.debug( f"[{chunk}/{index}] Delegating {chunk_size} messages to thread {index}" ) task = loop.run_in_executor(p, process_data, chunk, index, chunk_size) tasks.append(task) results = [await task for task in tasks] # Join the results back from the split list flat_list = [item for sublist in results for item in sublist] log.debug( ( f"[{chunk}/{index}] Results from processing of {length} messages in " f"{cores} threads: {len(flat_list)}" ) ) await db.store_kafka_batch(flat_list) # log.debug(f"Finished processing {len_data} messages") def process_data(chunk, index, chunk_size): log.debug(f"[{chunk}/{index}] Processing {chunk_size} messages") to_store = [] sentiment_time = 0.0 regex_time = 0.0 polyglot_time = 0.0 date_time = 0.0 nlp_time = 0.0 normalise_time = 0.0 hash_time = 0.0 normal2_time = 0.0 soup_time = 0.0 total_time = 0.0 # Initialise sentiment analyser analyzer = SentimentIntensityAnalyzer() for msg_index in range(chunk_size): msg = db.r.rpop(KEYNAME) if not msg: return msg = orjson.loads(msg) total_start = time.process_time() # normalise fields start = time.process_time() for key, value in list(msg.items()): if value is None: del msg[key] time_took = (time.process_time() - start) * 1000 normalise_time += time_took # Remove invalid UTF-8 characters # IRC and Discord start = time.process_time() if "msg" in msg: msg["msg"] = RE_BAD_CHARS.sub("", msg["msg"]) # 4chan - since we change the attributes below if "com" in msg: msg["com"] = RE_BAD_CHARS.sub("", msg["com"]) time_took = (time.process_time() - start) * 1000 regex_time += time_took if msg["src"] == "4ch": board = msg["net"] thread = msg["channel"] # Calculate hash for post start = time.process_time() post_normalised = orjson.dumps(msg, option=orjson.OPT_SORT_KEYS) hash = siphash(hash_key, post_normalised) hash = str(hash) redis_key = ( f"cache.{board}.{thread}.{msg['no']}.{msg['resto']}.{msg['now']}" ) key_content = db.r.get(redis_key) if key_content is not None: key_content = key_content.decode("ascii") if key_content == hash: # This deletes the message since the append at the end won't be hit continue # pass else: msg["type"] = "update" db.r.set(redis_key, hash) time_took = (time.process_time() - start) * 1000 hash_time += time_took start = time.process_time() for key2, value in list(msg.items()): if key2 in ATTRMAP: msg[ATTRMAP[key2]] = msg[key2] del msg[key2] time_took = (time.process_time() - start) * 1000 normal2_time += time_took start = time.process_time() if "ts" in msg: old_time = msg["ts"] # '08/30/22(Tue)02:25:37' time_spl = old_time.split(":") if len(time_spl) == 3: old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S") else: old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M") # new_ts = old_ts.isoformat() new_ts = int(old_ts.timestamp()) msg["ts"] = new_ts else: raise Exception("No TS in msg") time_took = (time.process_time() - start) * 1000 date_time += time_took start = time.process_time() if "msg" in msg: soup = BeautifulSoup(msg["msg"], "html.parser") msg_str = soup.get_text(separator="\n") msg["msg"] = msg_str time_took = (time.process_time() - start) * 1000 soup_time += time_took # Annotate sentiment/NLP if "msg" in msg: # RE_BAD_CHARS.sub("", msg["msg"]) # Language start = time.process_time() text = Text(msg["msg"]) try: lang_code = text.language.code lang_name = text.language.name msg["lang_code"] = lang_code msg["lang_name"] = lang_name except cld2_error as e: log.error(f"[{chunk}/{index}] Error detecting language: {e}") # So below block doesn't fail lang_code = None time_took = (time.process_time() - start) * 1000 polyglot_time += time_took # Blatant discrimination if lang_code == "en": # Sentiment start = time.process_time() vs = analyzer.polarity_scores(str(msg["msg"])) addendum = vs["compound"] msg["sentiment"] = addendum time_took = (time.process_time() - start) * 1000 sentiment_time += time_took # Tokens start = time.process_time() tokens = preprocess_string(msg["msg"], CUSTOM_FILTERS) msg["tokens"] = tokens # n = nlp(msg["msg"]) # for tag in TAGS: # tag_name = tag.lower() # tags_flag = [token.lemma_ for token in n if token.pos_ == tag] # msg[f"words_{tag_name}"] = tags_flag time_took = (time.process_time() - start) * 1000 nlp_time += time_took # Add the mutated message to the return buffer to_store.append(msg) total_time += (time.process_time() - total_start) * 1000 if MONOLITH_PROCESS_PERFSTATS: log.debug("=====================================") log.debug(f"Chunk: {chunk}") log.debug(f"Index: {index}") log.debug(f"Sentiment: {sentiment_time}") log.debug(f"Regex: {regex_time}") log.debug(f"Polyglot: {polyglot_time}") log.debug(f"Date: {date_time}") log.debug(f"NLP: {nlp_time}") log.debug(f"Normalise: {normalise_time}") log.debug(f"Hash: {hash_time}") log.debug(f"Normal2: {normal2_time}") log.debug(f"Soup: {soup_time}") log.debug(f"Total: {total_time}") log.debug("=====================================") return to_store