Normalise fields in processing and remove invalid characters
This commit is contained in:
parent
48e4c07959
commit
cf4aa45663
5
db.py
5
db.py
|
@ -61,10 +61,7 @@ async def store_kafka_batch(data):
|
|||
# schema = mc_s.schema_int
|
||||
|
||||
KAFKA_TOPIC = index
|
||||
# normalise fields
|
||||
for key, value in list(msg.items()):
|
||||
if value is None:
|
||||
del msg[key]
|
||||
|
||||
# if key in schema:
|
||||
# if isinstance(value, int):
|
||||
# if schema[key].startswith("string") or schema[key].startswith(
|
||||
|
|
12
monolith.py
12
monolith.py
|
@ -16,13 +16,13 @@ if not token:
|
|||
|
||||
|
||||
async def main(loop):
|
||||
log.info("Starting Discord handler.")
|
||||
client = DiscordClient()
|
||||
loop.create_task(client.start(token))
|
||||
# log.info("Starting Discord handler.")
|
||||
# client = DiscordClient()
|
||||
# loop.create_task(client.start(token))
|
||||
|
||||
log.info("Starting 4chan handler.")
|
||||
chan = Chan4()
|
||||
loop.create_task(chan.run())
|
||||
# log.info("Starting 4chan handler.")
|
||||
# chan = Chan4()
|
||||
# loop.create_task(chan.run())
|
||||
|
||||
log.info("Starting ingest handler.")
|
||||
ingest = Ingest()
|
||||
|
|
|
@ -14,6 +14,7 @@ from datetime import datetime
|
|||
from math import ceil
|
||||
|
||||
import orjson
|
||||
import regex
|
||||
|
||||
# Tokenisation
|
||||
import spacy
|
||||
|
@ -62,6 +63,8 @@ from schemas.ch4_s import ATTRMAP
|
|||
# strip_non_alphanum, #
|
||||
# ]
|
||||
|
||||
RE_BAD_CHARS = regex.compile(r"[\p{Cc}\p{Cs}]+")
|
||||
|
||||
# Squash errors
|
||||
polyglot_logger.setLevel("ERROR")
|
||||
warnings.filterwarnings("ignore", category=UserWarning, module="bs4")
|
||||
|
@ -110,7 +113,7 @@ async def spawn_processing_threads(data):
|
|||
split_data = array_split(data, ceil(len(data) / msg_per_core))
|
||||
for index, split in enumerate(split_data):
|
||||
log.debug(f"Delegating processing of {len(split)} messages to thread {index}")
|
||||
task = loop.run_in_executor(p, process_data, data)
|
||||
task = loop.run_in_executor(p, process_data, split)
|
||||
tasks.append(task)
|
||||
|
||||
results = [await task for task in tasks]
|
||||
|
@ -129,6 +132,21 @@ def process_data(data):
|
|||
# Initialise sentiment analyser
|
||||
analyzer = SentimentIntensityAnalyzer()
|
||||
for msg in data:
|
||||
|
||||
# normalise fields
|
||||
for key, value in list(msg.items()):
|
||||
if value is None:
|
||||
del msg[key]
|
||||
|
||||
# Remove invalid UTF-8 characters
|
||||
# IRC and Discord
|
||||
if "msg" in msg:
|
||||
msg["msg"] = RE_BAD_CHARS.sub("", msg["msg"])
|
||||
|
||||
# 4chan - since we change the attributes below
|
||||
if "com" in msg:
|
||||
msg["msg"] = RE_BAD_CHARS.sub("", msg["msg"])
|
||||
|
||||
if msg["src"] == "4ch":
|
||||
board = msg["net"]
|
||||
thread = msg["channel"]
|
||||
|
@ -151,6 +169,7 @@ def process_data(data):
|
|||
if key2 in ATTRMAP:
|
||||
msg[ATTRMAP[key2]] = msg[key2]
|
||||
del msg[key2]
|
||||
|
||||
if "ts" in msg:
|
||||
old_time = msg["ts"]
|
||||
# '08/30/22(Tue)02:25:37'
|
||||
|
@ -168,8 +187,10 @@ def process_data(data):
|
|||
soup = BeautifulSoup(msg["msg"], "html.parser")
|
||||
msg_str = soup.get_text(separator="\n")
|
||||
msg["msg"] = msg_str
|
||||
|
||||
# Annotate sentiment/NLP
|
||||
if "msg" in msg:
|
||||
RE_BAD_CHARS.sub("", msg["msg"])
|
||||
# Language
|
||||
text = Text(msg["msg"])
|
||||
try:
|
||||
|
|
|
@ -8,6 +8,10 @@ import util
|
|||
from processing import process
|
||||
|
||||
SOURCES = ["4ch", "irc", "dis"]
|
||||
# DEBUG CODE REMOVE ME
|
||||
SOURCES.remove("4ch")
|
||||
SOURCES.remove("dis")
|
||||
# DEBUG CODE REMOVE ME
|
||||
KEYPREFIX = "queue."
|
||||
|
||||
# Chunk size per source (divide by len(SOURCES) for total)
|
||||
|
|
Loading…
Reference in New Issue