import asyncio import os import random # For key generation import string # Squash errors import warnings from concurrent.futures import ProcessPoolExecutor # For timestamp processing from datetime import datetime from math import ceil import orjson import regex # Tokenisation import spacy # For 4chan message parsing from bs4 import BeautifulSoup from numpy import array_split from polyglot.detect.base import logger as polyglot_logger # For NLP from polyglot.text import Text from pycld2 import error as cld2_error from siphashc import siphash # For sentiment from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer import db import util # 4chan schema from schemas.ch4_s import ATTRMAP # For tokenisation # from gensim.parsing.preprocessing import ( # strip_tags, # strip_punctuation, # strip_numeric, # stem_text, # strip_multiple_whitespaces, # strip_non_alphanum, # remove_stopwords, # strip_short, # preprocess_string, # ) # CUSTOM_FILTERS = [ # lambda x: x.lower(), # strip_tags, # # strip_punctuation, # # strip_multiple_whitespaces, # strip_numeric, # remove_stopwords, # strip_short, # #stem_text, # strip_non_alphanum, # # ] RE_BAD_CHARS = regex.compile(r"[\p{Cc}\p{Cs}]+") # Squash errors polyglot_logger.setLevel("ERROR") warnings.filterwarnings("ignore", category=UserWarning, module="bs4") TAGS = ["NOUN", "ADJ", "VERB", "ADV"] nlp = spacy.load("en_core_web_sm", disable=["parser", "ner"]) log = util.get_logger("process") # Maximum number of CPU threads to use for post processing CPU_THREADS = int(os.getenv("MONOLITH_PROCESS_THREADS", os.cpu_count())) p = ProcessPoolExecutor(CPU_THREADS) def get_hash_key(): hash_key = db.r.get("hashing_key") if not hash_key: letters = string.ascii_lowercase hash_key = "".join(random.choice(letters) for i in range(16)) log.debug(f"Created new hash key: {hash_key}") db.r.set("hashing_key", hash_key) else: hash_key = hash_key.decode("ascii") log.debug(f"Decoded hash key: {hash_key}") return hash_key hash_key = get_hash_key() @asyncio.coroutine async def spawn_processing_threads(data): len_data = len(data) loop = asyncio.get_event_loop() tasks = [] if len(data) < CPU_THREADS * 100: split_data = [data] else: msg_per_core = int(len(data) / CPU_THREADS) split_data = array_split(data, ceil(len(data) / msg_per_core)) for index, split in enumerate(split_data): log.debug(f"Delegating processing of {len(split)} messages to thread {index}") task = loop.run_in_executor(p, process_data, split) tasks.append(task) results = [await task for task in tasks] log.debug( ( f"Results from processing of {len_data} messages in " f"{len(split_data)} threads: {len(results)}" ) ) # Join the results back from the split list flat_list = [item for sublist in results for item in sublist] await db.store_kafka_batch(flat_list) # log.debug(f"Finished processing {len_data} messages") def process_data(data): to_store = [] # Initialise sentiment analyser analyzer = SentimentIntensityAnalyzer() for msg in data: # normalise fields for key, value in list(msg.items()): if value is None: del msg[key] # Remove invalid UTF-8 characters # IRC and Discord if "msg" in msg: msg["msg"] = RE_BAD_CHARS.sub("", msg["msg"]) # 4chan - since we change the attributes below if "com" in msg: msg["com"] = RE_BAD_CHARS.sub("", msg["com"]) if msg["src"] == "4ch": board = msg["net"] thread = msg["channel"] # Calculate hash for post post_normalised = orjson.dumps(msg, option=orjson.OPT_SORT_KEYS) hash = siphash(hash_key, post_normalised) hash = str(hash) redis_key = f"cache.{board}.{thread}.{msg['no']}" key_content = db.r.get(redis_key) if key_content: key_content = key_content.decode("ascii") if key_content == hash: # This deletes the message since the append at the end won't be hit continue else: msg["type"] = "update" db.r.set(redis_key, hash) for key2, value in list(msg.items()): if key2 in ATTRMAP: msg[ATTRMAP[key2]] = msg[key2] del msg[key2] if "ts" in msg: old_time = msg["ts"] # '08/30/22(Tue)02:25:37' time_spl = old_time.split(":") if len(time_spl) == 3: old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S") else: old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M") # new_ts = old_ts.isoformat() new_ts = int(old_ts.timestamp()) msg["ts"] = new_ts else: raise Exception("No TS in msg") if "msg" in msg: soup = BeautifulSoup(msg["msg"], "html.parser") msg_str = soup.get_text(separator="\n") msg["msg"] = msg_str # Annotate sentiment/NLP if "msg" in msg: RE_BAD_CHARS.sub("", msg["msg"]) # Language text = Text(msg["msg"]) try: lang_code = text.language.code lang_name = text.language.name msg["lang_code"] = lang_code msg["lang_name"] = lang_name except cld2_error as e: log.error(f"Error detecting language: {e}") # So below block doesn't fail lang_code = None # Blatant discrimination if lang_code == "en": # Sentiment vs = analyzer.polarity_scores(str(msg["msg"])) addendum = vs["compound"] msg["sentiment"] = addendum # Tokens n = nlp(msg["msg"]) for tag in TAGS: tag_name = tag.lower() tags_flag = [token.lemma_ for token in n if token.pos_ == tag] msg[f"words_{tag_name}"] = tags_flag # Add the mutated message to the return buffer to_store.append(msg) return to_store