From ced3a251b2e00c21327e546065714bc3fb062f01 Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Wed, 21 Sep 2022 10:01:12 +0100 Subject: [PATCH] Normalise fields in processing and remove invalid characters --- db.py | 5 +---- monolith.py | 12 ++++++------ processing/process.py | 23 ++++++++++++++++++++++- sources/ingest.py | 4 ++++ 4 files changed, 33 insertions(+), 11 deletions(-) diff --git a/db.py b/db.py index 48ba6cb..1f52c48 100644 --- a/db.py +++ b/db.py @@ -61,10 +61,7 @@ async def store_kafka_batch(data): # schema = mc_s.schema_int KAFKA_TOPIC = index - # normalise fields - for key, value in list(msg.items()): - if value is None: - del msg[key] + # if key in schema: # if isinstance(value, int): # if schema[key].startswith("string") or schema[key].startswith( diff --git a/monolith.py b/monolith.py index 1eb559b..5caebf8 100644 --- a/monolith.py +++ b/monolith.py @@ -16,13 +16,13 @@ if not token: async def main(loop): - log.info("Starting Discord handler.") - client = DiscordClient() - loop.create_task(client.start(token)) + # log.info("Starting Discord handler.") + # client = DiscordClient() + # loop.create_task(client.start(token)) - log.info("Starting 4chan handler.") - chan = Chan4() - loop.create_task(chan.run()) + # log.info("Starting 4chan handler.") + # chan = Chan4() + # loop.create_task(chan.run()) log.info("Starting ingest handler.") ingest = Ingest() diff --git a/processing/process.py b/processing/process.py index c55737d..d154b3c 100644 --- a/processing/process.py +++ b/processing/process.py @@ -14,6 +14,7 @@ from datetime import datetime from math import ceil import orjson +import regex # Tokenisation import spacy @@ -62,6 +63,8 @@ from schemas.ch4_s import ATTRMAP # strip_non_alphanum, # # ] +RE_BAD_CHARS = regex.compile(r"[\p{Cc}\p{Cs}]+") + # Squash errors polyglot_logger.setLevel("ERROR") warnings.filterwarnings("ignore", category=UserWarning, module="bs4") @@ -110,7 +113,7 @@ async def spawn_processing_threads(data): split_data = array_split(data, ceil(len(data) / msg_per_core)) for index, split in enumerate(split_data): log.debug(f"Delegating processing of {len(split)} messages to thread {index}") - task = loop.run_in_executor(p, process_data, data) + task = loop.run_in_executor(p, process_data, split) tasks.append(task) results = [await task for task in tasks] @@ -129,6 +132,21 @@ def process_data(data): # Initialise sentiment analyser analyzer = SentimentIntensityAnalyzer() for msg in data: + + # normalise fields + for key, value in list(msg.items()): + if value is None: + del msg[key] + + # Remove invalid UTF-8 characters + # IRC and Discord + if "msg" in msg: + msg["msg"] = RE_BAD_CHARS.sub("", msg["msg"]) + + # 4chan - since we change the attributes below + if "com" in msg: + msg["msg"] = RE_BAD_CHARS.sub("", msg["msg"]) + if msg["src"] == "4ch": board = msg["net"] thread = msg["channel"] @@ -151,6 +169,7 @@ def process_data(data): if key2 in ATTRMAP: msg[ATTRMAP[key2]] = msg[key2] del msg[key2] + if "ts" in msg: old_time = msg["ts"] # '08/30/22(Tue)02:25:37' @@ -168,8 +187,10 @@ def process_data(data): soup = BeautifulSoup(msg["msg"], "html.parser") msg_str = soup.get_text(separator="\n") msg["msg"] = msg_str + # Annotate sentiment/NLP if "msg" in msg: + RE_BAD_CHARS.sub("", msg["msg"]) # Language text = Text(msg["msg"]) try: diff --git a/sources/ingest.py b/sources/ingest.py index 5f6b9e0..4edb496 100644 --- a/sources/ingest.py +++ b/sources/ingest.py @@ -8,6 +8,10 @@ import util from processing import process SOURCES = ["4ch", "irc", "dis"] +# DEBUG CODE REMOVE ME +SOURCES.remove("4ch") +SOURCES.remove("dis") +# DEBUG CODE REMOVE ME KEYPREFIX = "queue." # Chunk size per source (divide by len(SOURCES) for total)