Normalise fields in processing and remove invalid characters

master
Mark Veidemanis 2 years ago
parent 740f93208b
commit ced3a251b2

@ -61,10 +61,7 @@ async def store_kafka_batch(data):
# schema = mc_s.schema_int
KAFKA_TOPIC = index
# normalise fields
for key, value in list(msg.items()):
if value is None:
del msg[key]
# if key in schema:
# if isinstance(value, int):
# if schema[key].startswith("string") or schema[key].startswith(

@ -16,13 +16,13 @@ if not token:
async def main(loop):
log.info("Starting Discord handler.")
client = DiscordClient()
loop.create_task(client.start(token))
# log.info("Starting Discord handler.")
# client = DiscordClient()
# loop.create_task(client.start(token))
log.info("Starting 4chan handler.")
chan = Chan4()
loop.create_task(chan.run())
# log.info("Starting 4chan handler.")
# chan = Chan4()
# loop.create_task(chan.run())
log.info("Starting ingest handler.")
ingest = Ingest()

@ -14,6 +14,7 @@ from datetime import datetime
from math import ceil
import orjson
import regex
# Tokenisation
import spacy
@ -62,6 +63,8 @@ from schemas.ch4_s import ATTRMAP
# strip_non_alphanum, #
# ]
RE_BAD_CHARS = regex.compile(r"[\p{Cc}\p{Cs}]+")
# Squash errors
polyglot_logger.setLevel("ERROR")
warnings.filterwarnings("ignore", category=UserWarning, module="bs4")
@ -110,7 +113,7 @@ async def spawn_processing_threads(data):
split_data = array_split(data, ceil(len(data) / msg_per_core))
for index, split in enumerate(split_data):
log.debug(f"Delegating processing of {len(split)} messages to thread {index}")
task = loop.run_in_executor(p, process_data, data)
task = loop.run_in_executor(p, process_data, split)
tasks.append(task)
results = [await task for task in tasks]
@ -129,6 +132,21 @@ def process_data(data):
# Initialise sentiment analyser
analyzer = SentimentIntensityAnalyzer()
for msg in data:
# normalise fields
for key, value in list(msg.items()):
if value is None:
del msg[key]
# Remove invalid UTF-8 characters
# IRC and Discord
if "msg" in msg:
msg["msg"] = RE_BAD_CHARS.sub("", msg["msg"])
# 4chan - since we change the attributes below
if "com" in msg:
msg["msg"] = RE_BAD_CHARS.sub("", msg["msg"])
if msg["src"] == "4ch":
board = msg["net"]
thread = msg["channel"]
@ -151,6 +169,7 @@ def process_data(data):
if key2 in ATTRMAP:
msg[ATTRMAP[key2]] = msg[key2]
del msg[key2]
if "ts" in msg:
old_time = msg["ts"]
# '08/30/22(Tue)02:25:37'
@ -168,8 +187,10 @@ def process_data(data):
soup = BeautifulSoup(msg["msg"], "html.parser")
msg_str = soup.get_text(separator="\n")
msg["msg"] = msg_str
# Annotate sentiment/NLP
if "msg" in msg:
RE_BAD_CHARS.sub("", msg["msg"])
# Language
text = Text(msg["msg"])
try:

@ -8,6 +8,10 @@ import util
from processing import process
SOURCES = ["4ch", "irc", "dis"]
# DEBUG CODE REMOVE ME
SOURCES.remove("4ch")
SOURCES.remove("dis")
# DEBUG CODE REMOVE ME
KEYPREFIX = "queue."
# Chunk size per source (divide by len(SOURCES) for total)

Loading…
Cancel
Save