Implement sentiment/NLP annotation and optimise processing

Mark Veidemanis 2 years ago
parent f432e9b29e
commit a89b5a8b6f

198
db.py

@ -1,28 +1,15 @@
import random import random
from math import ceil
import aioredis import aioredis
import manticoresearch import orjson
import ujson
# Kafka
from aiokafka import AIOKafkaProducer from aiokafka import AIOKafkaProducer
from manticoresearch.rest import ApiException
from numpy import array_split
from redis import StrictRedis from redis import StrictRedis
import util import util
# Manticore schema # KAFKA_TOPIC = "msg"
from schemas import mc_s
# Manticore
configuration = manticoresearch.Configuration(host="http://monolith-db-1:9308")
api_client = manticoresearch.ApiClient(configuration)
api_instance = manticoresearch.IndexApi(api_client)
# Kafka
from aiokafka import AIOKafkaProducer
KAFKA_TOPIC = "msg"
log = util.get_logger("db") log = util.get_logger("db")
@ -51,103 +38,62 @@ KEYPREFIX = "queue."
async def store_kafka_batch(data): async def store_kafka_batch(data):
print("STORING KAFKA BATCH") log.debug(f"Storing Kafka batch of {len(data)} messages")
producer = AIOKafkaProducer(bootstrap_servers="kafka:9092") producer = AIOKafkaProducer(bootstrap_servers="kafka:9092")
await producer.start() await producer.start()
batch = producer.create_batch() batch = producer.create_batch()
for msg in data: for msg in data:
if msg["type"] in TYPES_MAIN: if msg["type"] in TYPES_MAIN:
index = "main" index = "main"
schema = mc_s.schema_main # schema = mc_s.schema_main
elif msg["type"] in TYPES_META: elif msg["type"] in TYPES_META:
index = "meta" index = "meta"
schema = mc_s.schema_meta # schema = mc_s.schema_meta
elif msg["type"] in TYPES_INT: elif msg["type"] in TYPES_INT:
index = "internal" index = "internal"
schema = mc_s.schema_int # schema = mc_s.schema_int
KAFKA_TOPIC = index
# normalise fields # normalise fields
for key, value in list(msg.items()): for key, value in list(msg.items()):
if value is None: if value is None:
del msg[key] del msg[key]
if key in schema: # if key in schema:
if isinstance(value, int): # if isinstance(value, int):
if schema[key].startswith("string") or schema[key].startswith( # if schema[key].startswith("string") or schema[key].startswith(
"text" # "text"
): # ):
msg[key] = str(value) # msg[key] = str(value)
message = ujson.dumps(msg) body = orjson.dumps(msg)
body = str.encode(message) # orjson returns bytes
# body = str.encode(message)
if "ts" not in msg: if "ts" not in msg:
# print("MSG WITHOUT TS", msg) raise Exception("No TS in msg")
continue
metadata = batch.append(key=None, value=body, timestamp=msg["ts"]) metadata = batch.append(key=None, value=body, timestamp=msg["ts"])
if metadata is None: if metadata is None:
partitions = await producer.partitions_for(KAFKA_TOPIC) partitions = await producer.partitions_for(KAFKA_TOPIC)
partition = random.choice(tuple(partitions)) partition = random.choice(tuple(partitions))
await producer.send_batch(batch, KAFKA_TOPIC, partition=partition) await producer.send_batch(batch, KAFKA_TOPIC, partition=partition)
print( log.debug(f"{batch.record_count()} messages sent to partition {partition}")
"%d messages sent to partition %d" % (batch.record_count(), partition)
)
batch = producer.create_batch() batch = producer.create_batch()
continue continue
partitions = await producer.partitions_for(KAFKA_TOPIC) partitions = await producer.partitions_for(KAFKA_TOPIC)
partition = random.choice(tuple(partitions)) partition = random.choice(tuple(partitions))
await producer.send_batch(batch, KAFKA_TOPIC, partition=partition) await producer.send_batch(batch, KAFKA_TOPIC, partition=partition)
print("%d messages sent to partition %d" % (batch.record_count(), partition)) log.debug(f"{batch.record_count()} messages sent to partition {partition}")
await producer.stop() await producer.stop()
# def store_message(msg):
# """
# Store a message into Manticore
# :param msg: dict
# """
# store_kafka(msg)
# # Duplicated to avoid extra function call
# if msg["type"] in TYPES_MAIN:
# index = "main"
# schema = mc_s.schema_main
# elif msg["type"] in TYPES_META:
# index = "meta"
# schema = mc_s.schema_meta
# elif msg["type"] in TYPES_INT:
# index = "internal"
# schema = mc_s.schema_int
# # normalise fields
# for key, value in list(msg.items()):
# if value is None:
# del msg[key]
# if key in schema:
# if isinstance(value, int):
# if schema[key].startswith("string") or schema[key].startswith("text"):
# msg[key] = str(value)
# body = [{"insert": {"index": index, "doc": msg}}]
# body_post = ""
# for item in body:
# body_post += ujson.dumps(item)
# body_post += "\n"
# # print(body_post)
# try:
# # Bulk index operations
# print("FAKE POST")
# #api_response = api_instance.bulk(body_post) # , async_req=True
# # print(api_response)
# except ApiException as e:
# print("Exception when calling IndexApi->bulk: %s\n" % e)
# print("ATTEMPT", body_post)
async def queue_message(msg): async def queue_message(msg):
""" """
Queue a message on the Redis buffer. Queue a message on the Redis buffer.
""" """
src = msg["src"] src = msg["src"]
message = ujson.dumps(msg) message = orjson.dumps(msg)
key = f"{KEYPREFIX}{src}" key = f"{KEYPREFIX}{src}"
# log.debug(f"Queueing single message of string length {len(message)}")
await ar.sadd(key, message) await ar.sadd(key, message)
@ -155,102 +101,10 @@ async def queue_message_bulk(data):
""" """
Queue multiple messages on the Redis buffer. Queue multiple messages on the Redis buffer.
""" """
# log.debug(f"Queueing message batch of length {len(data)}")
for msg in data: for msg in data:
src = msg["src"] src = msg["src"]
message = ujson.dumps(msg) message = orjson.dumps(msg)
key = f"{KEYPREFIX}{src}" key = f"{KEYPREFIX}{src}"
await ar.sadd(key, message) await ar.sadd(key, message)
# For now, make a normal function until we go full async
def queue_message_bulk_sync(data):
"""
Queue multiple messages on the Redis buffer.
"""
for msg in data:
src = msg["src"]
message = ujson.dumps(msg)
key = "{KEYPREFIX}{src}"
r.sadd(key, message)
# def store_message_bulk(data):
# """
# Store a message into Manticore
# :param msg: dict
# """
# if not data:
# return
# for msg in data:
# store_kafka(msg)
# # 10000: maximum inserts we can submit to
# # Manticore as of Sept 2022
# split_posts = array_split(data, ceil(len(data) / 10000))
# for messages in split_posts:
# total = []
# for msg in messages:
# # Duplicated to avoid extra function call (see above)
# if msg["type"] in TYPES_MAIN:
# index = "main"
# schema = mc_s.schema_main
# elif msg["type"] in TYPES_META:
# index = "meta"
# schema = mc_s.schema_meta
# elif msg["type"] in TYPES_INT:
# index = "internal"
# schema = mc_s.schema_int
# # normalise fields
# for key, value in list(msg.items()):
# if value is None:
# del msg[key]
# if key in schema:
# if isinstance(value, int):
# if schema[key].startswith("string") or schema[key].startswith(
# "text"
# ):
# msg[key] = str(value)
# body = {"insert": {"index": index, "doc": msg}}
# total.append(body)
# body_post = ""
# for item in total:
# body_post += ujson.dumps(item)
# body_post += "\n"
# # print(body_post)
# try:
# # Bulk index operations
# print("FAKE POST")
# #api_response = api_instance.bulk(body_post) # , async_req=True
# #print(api_response)
# except ApiException as e:
# print("Exception when calling IndexApi->bulk: %s\n" % e)
# print("ATTEMPT", body_post)
# def update_schema():
# pass
# def create_index(api_client):
# util_instance = manticoresearch.UtilsApi(api_client)
# schemas = {
# "main": mc_s.schema_main,
# "meta": mc_s.schema_meta,
# "internal": mc_s.schema_int,
# }
# for name, schema in schemas.items():
# schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()])
# create_query = (
# f"create table if not exists {name}({schema_types}) engine='columnar'"
# )
# print("Schema types", create_query)
# util_instance.sql(create_query)
# create_index(api_client)
# update_schema()

@ -20,10 +20,14 @@ services:
volumes_from: volumes_from:
- tmp - tmp
depends_on: depends_on:
- broker broker:
- kafka condition: service_started
- tmp kafka:
- redis condition: service_healthy
tmp:
condition: service_started
redis:
condition: service_healthy
# - db # - db
threshold: threshold:
@ -46,8 +50,10 @@ services:
volumes_from: volumes_from:
- tmp - tmp
depends_on: depends_on:
- tmp tmp:
- redis condition: service_started
redis:
condition: service_healthy
turnilo: turnilo:
container_name: turnilo container_name: turnilo
@ -102,6 +108,17 @@ services:
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ALLOW_PLAINTEXT_LISTENER: yes ALLOW_PLAINTEXT_LISTENER: yes
# healthcheck:
# test: ["CMD-SHELL", "kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic main --describe"]
# interval: 2s
# timeout: 2s
# retries: 15
healthcheck:
test: ["CMD", "kafka-topics.sh", "--list", "--bootstrap-server", "kafka:9092"]
start_period: 15s
interval: 2s
timeout: 5s
retries: 30
coordinator: coordinator:
image: apache/druid:0.23.0 image: apache/druid:0.23.0
@ -230,6 +247,11 @@ services:
- ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf - ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf
volumes_from: volumes_from:
- tmp - tmp
healthcheck:
test: "redis-cli -s /var/run/redis/redis.sock ping"
interval: 2s
timeout: 2s
retries: 15
networks: networks:
default: default:

@ -16,7 +16,7 @@ COPY requirements.txt /code/
COPY discord-patched.tgz /code/ COPY discord-patched.tgz /code/
RUN python -m venv /venv RUN python -m venv /venv
RUN . /venv/bin/activate && pip install -r requirements.txt RUN . /venv/bin/activate && pip install -r requirements.txt && python -m spacy download en_core_web_sm
RUN tar xf /code/discord-patched.tgz -C /venv/lib/python3.10/site-packages RUN tar xf /code/discord-patched.tgz -C /venv/lib/python3.10/site-packages

@ -4,8 +4,18 @@ redis
siphashc siphashc
aiohttp[speedups] aiohttp[speedups]
python-dotenv python-dotenv
manticoresearch #manticoresearch
numpy numpy
ujson ujson
aioredis[hiredis] aioredis[hiredis]
aiokafka aiokafka
vaderSentiment
polyglot
pyicu
pycld2
morfessor
six
nltk
spacy
python-Levenshtein
orjson

@ -1,19 +1,11 @@
import asyncio import asyncio
from os import getenv from os import getenv
import db
import util import util
from sources.ch4 import Chan4 from sources.ch4 import Chan4
from sources.dis import DiscordClient from sources.dis import DiscordClient
from sources.ingest import Ingest from sources.ingest import Ingest
# For development
# if not getenv("DISCORD_TOKEN", None):
# print("Could not get Discord token, attempting load from .env")
# from dotenv import load_dotenv
# load_dotenv()
log = util.get_logger("monolith") log = util.get_logger("monolith")
modules_enabled = getenv("MODULES_ENABLED", False) modules_enabled = getenv("MODULES_ENABLED", False)

@ -4,25 +4,73 @@ import random
# For key generation # For key generation
import string import string
# Squash errors
import warnings
from concurrent.futures import ProcessPoolExecutor from concurrent.futures import ProcessPoolExecutor
# For timestamp processing # For timestamp processing
from datetime import datetime from datetime import datetime
from math import ceil from math import ceil
import ujson import orjson
# Tokenisation
import spacy
# For 4chan message parsing # For 4chan message parsing
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from numpy import array_split from numpy import array_split
from polyglot.detect.base import logger as polyglot_logger
# For NLP
from polyglot.text import Text
from pycld2 import error as cld2_error
from siphashc import siphash from siphashc import siphash
# For sentiment
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import db import db
import util import util
# 4chan schema # 4chan schema
from schemas.ch4_s import ATTRMAP from schemas.ch4_s import ATTRMAP
# For tokenisation
# from gensim.parsing.preprocessing import (
# strip_tags,
# strip_punctuation,
# strip_numeric,
# stem_text,
# strip_multiple_whitespaces,
# strip_non_alphanum,
# remove_stopwords,
# strip_short,
# preprocess_string,
# )
# CUSTOM_FILTERS = [
# lambda x: x.lower(),
# strip_tags, #
# strip_punctuation, #
# strip_multiple_whitespaces,
# strip_numeric,
# remove_stopwords,
# strip_short,
# #stem_text,
# strip_non_alphanum, #
# ]
# Squash errors
polyglot_logger.setLevel("ERROR")
warnings.filterwarnings("ignore", category=UserWarning, module="bs4")
TAGS = ["NOUN", "ADJ", "VERB", "ADV"]
nlp = spacy.load("en_core_web_sm", disable=["parser", "ner"])
log = util.get_logger("process") log = util.get_logger("process")
# Maximum number of CPU threads to use for post processing # Maximum number of CPU threads to use for post processing
@ -49,67 +97,44 @@ hash_key = get_hash_key()
@asyncio.coroutine @asyncio.coroutine
async def spawn_processing_threads(data): async def spawn_processing_threads(data):
len_data = len(data)
log.debug(f"Spawning processing threads for batch of {len_data} messages")
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
tasks = [] tasks = []
oldts = [x["now"] for x in data if "now" in x]
if len(data) < CPU_THREADS: if len(data) < CPU_THREADS:
split_data = [data] split_data = [data]
else: else:
msg_per_core = int(len(data) / CPU_THREADS) msg_per_core = int(len(data) / CPU_THREADS)
print("MSG PER CORE", msg_per_core)
split_data = array_split(data, ceil(len(data) / msg_per_core)) split_data = array_split(data, ceil(len(data) / msg_per_core))
for index, split in enumerate(split_data): for index, split in enumerate(split_data):
print("DELEGATING TO THREAD", len(split)) log.debug(f"Delegating processing of {len(split)} messages to thread {index}")
future = loop.run_in_executor(p, process_data, data) task = loop.run_in_executor(p, process_data, data)
# future = p.submit(process_data, split) tasks.append(task)
tasks.append(future)
# results = [x.result(timeout=50) for x in tasks] results = [await task for task in tasks]
results = await asyncio.gather(*tasks) log.debug(f"Results from processing of {len_data} messages: {len(results)}")
print("RESULTS", len(results))
# Join the results back from the split list # Join the results back from the split list
flat_list = [item for sublist in results for item in sublist] flat_list = [item for sublist in results for item in sublist]
print("LENFLAT", len(flat_list))
print("LENDATA", len(data))
newts = [x["ts"] for x in flat_list if "ts" in x]
print("lenoldts", len(oldts))
print("lennewts", len(newts))
allts = all(["ts" in x for x in flat_list])
print("ALLTS", allts)
alllen = [len(x) for x in flat_list]
print("ALLLEN", alllen)
await db.store_kafka_batch(flat_list) await db.store_kafka_batch(flat_list)
log.debug(f"Finished processing {len_data} messages")
# @asyncio.coroutine
# def process_data_thread(data):
# """
# Helper to spawn threads to process a list of data.
# """
# loop = asyncio.get_event_loop()
# if len(data) < CPU_THREADS:
# split_data = [data]
# else:
# msg_per_core = int(len(data) / CPU_THREADS)
# print("MSG PER CORE", msg_per_core)
# split_data = array_split(data, ceil(len(data) / msg_per_core))
# for index, split in enumerate(split_data):
# print("DELEGATING TO THREAD", len(split))
# #f = process_data_thread(split)
# yield loop.run_in_executor(p, process_data, data)
def process_data(data): def process_data(data):
print("PROCESS DATA START") to_store = []
# to_store = []
for index, msg in enumerate(data): # Initialise sentiment analyser
# print("PROCESSING", msg) analyzer = SentimentIntensityAnalyzer()
for msg in data:
if msg["src"] == "4ch": if msg["src"] == "4ch":
board = msg["net"] board = msg["net"]
thread = msg["channel"] thread = msg["channel"]
# Calculate hash for post # Calculate hash for post
post_normalised = ujson.dumps(msg, sort_keys=True) post_normalised = orjson.dumps(msg, option=orjson.OPT_SORT_KEYS)
hash = siphash(hash_key, post_normalised) hash = siphash(hash_key, post_normalised)
hash = str(hash) hash = str(hash)
redis_key = f"cache.{board}.{thread}.{msg['no']}" redis_key = f"cache.{board}.{thread}.{msg['no']}"
@ -117,19 +142,17 @@ def process_data(data):
if key_content: if key_content:
key_content = key_content.decode("ascii") key_content = key_content.decode("ascii")
if key_content == hash: if key_content == hash:
del data[index] # This deletes the message since the append at the end won't be hit
continue continue
else: else:
data[index]["type"] = "update" msg["type"] = "update"
db.r.set(redis_key, hash) db.r.set(redis_key, hash)
if "now" not in data[index]: for key2, value in list(msg.items()):
print("NOW NOT IN INDEX", data[index])
for key2, value in list(data[index].items()):
if key2 in ATTRMAP: if key2 in ATTRMAP:
data[index][ATTRMAP[key2]] = data[index][key2] msg[ATTRMAP[key2]] = msg[key2]
del data[index][key2] del msg[key2]
if "ts" in data[index]: if "ts" in msg:
old_time = data[index]["ts"] old_time = msg["ts"]
# '08/30/22(Tue)02:25:37' # '08/30/22(Tue)02:25:37'
time_spl = old_time.split(":") time_spl = old_time.split(":")
if len(time_spl) == 3: if len(time_spl) == 3:
@ -138,14 +161,42 @@ def process_data(data):
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M") old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M")
# new_ts = old_ts.isoformat() # new_ts = old_ts.isoformat()
new_ts = int(old_ts.timestamp()) new_ts = int(old_ts.timestamp())
data[index]["ts"] = new_ts msg["ts"] = new_ts
else: else:
print("MSG WITHOUT TS PROCESS", data[index]) raise Exception("No TS in msg")
continue
if "msg" in msg: if "msg" in msg:
soup = BeautifulSoup(data[index]["msg"], "html.parser") soup = BeautifulSoup(msg["msg"], "html.parser")
msg = soup.get_text(separator="\n") msg_str = soup.get_text(separator="\n")
data[index]["msg"] = msg msg["msg"] = msg_str
# to_store.append(data[index]) # Annotate sentiment/NLP
print("FINISHED PROCESSING DATA") if "msg" in msg:
return data # Language
text = Text(msg["msg"])
try:
lang_code = text.language.code
lang_name = text.language.name
msg["lang_code"] = lang_code
msg["lang_name"] = lang_name
except cld2_error as e:
log.error(f"Error detecting language: {e}")
# So below block doesn't fail
lang_code = None
# Blatant discrimination
if lang_code == "en":
# Sentiment
vs = analyzer.polarity_scores(str(msg["msg"]))
addendum = vs["compound"]
msg["sentiment"] = addendum
# Tokens
n = nlp(msg["msg"])
for tag in TAGS:
tag_name = tag.lower()
tags_flag = [token.lemma_ for token in n if token.pos_ == tag]
msg[f"words_{tag_name}"] = tags_flag
# Add the mutated message to the return buffer
to_store.append(msg)
return to_store

@ -5,8 +5,18 @@ redis
siphashc siphashc
aiohttp[speedups] aiohttp[speedups]
python-dotenv python-dotenv
manticoresearch #manticoresearch
numpy numpy
ujson ujson
aioredis[hiredis] aioredis[hiredis]
aiokafka aiokafka
vaderSentiment
polyglot
pyicu
pycld2
morfessor
six
nltk
spacy
python-Levenshtein
orjson

@ -2,19 +2,13 @@
import asyncio import asyncio
import random import random
import string import string
from concurrent.futures import ProcessPoolExecutor
from datetime import datetime
from math import ceil from math import ceil
import aiohttp import aiohttp
import ujson
from bs4 import BeautifulSoup
from numpy import array_split from numpy import array_split
from siphashc import siphash
import db import db
import util import util
from schemas.ch4_s import ATTRMAP
# CONFIGURATION # # CONFIGURATION #
@ -30,13 +24,8 @@ CRAWL_DELAY = 5
# Semaphore value ? # Semaphore value ?
THREADS_SEMAPHORE = 1000 THREADS_SEMAPHORE = 1000
# Maximum number of CPU threads to use for post processing
CPU_THREADS = 8
# CONFIGURATION END # # CONFIGURATION END #
p = ProcessPoolExecutor(CPU_THREADS)
class Chan4(object): class Chan4(object):
""" """
@ -83,10 +72,12 @@ class Chan4(object):
self.log.debug(f"Got boards: {self.boards}") self.log.debug(f"Got boards: {self.boards}")
async def get_thread_lists(self, boards): async def get_thread_lists(self, boards):
self.log.debug(f"Getting thread list for {boards}") # self.log.debug(f"Getting thread list for {boards}")
board_urls = {board: f"{board}/catalog.json" for board in boards} board_urls = {board: f"{board}/catalog.json" for board in boards}
responses = await self.api_call(board_urls) responses = await self.api_call(board_urls)
to_get = [] to_get = []
flat_map = [board for board, thread in responses]
self.log.debug(f"Got thread list for {flat_map}: {len(responses)}")
for mapped, response in responses: for mapped, response in responses:
if not response: if not response:
continue continue
@ -95,7 +86,6 @@ class Chan4(object):
no = threads["no"] no = threads["no"]
to_get.append((mapped, no)) to_get.append((mapped, no))
self.log.debug(f"Got thread list for {mapped}: {len(response)}")
if not to_get: if not to_get:
return return
split_threads = array_split(to_get, ceil(len(to_get) / THREADS_CONCURRENT)) split_threads = array_split(to_get, ceil(len(to_get) / THREADS_CONCURRENT))
@ -122,46 +112,20 @@ class Chan4(object):
(board, thread): f"{board}/thread/{thread}.json" (board, thread): f"{board}/thread/{thread}.json"
for board, thread in thread_list for board, thread in thread_list
} }
self.log.debug(f"Getting information for threads: {thread_urls}") # self.log.debug(f"Getting information for threads: {thread_urls}")
responses = await self.api_call(thread_urls) responses = await self.api_call(thread_urls)
self.log.debug(f"Got information for threads: {thread_urls}") self.log.debug(f"Got information for {len(responses)} threads")
all_posts = {} all_posts = {}
for mapped, response in responses: for mapped, response in responses:
if not response: if not response:
continue continue
board, thread = mapped board, thread = mapped
self.log.debug(f"Got thread content for thread {thread} on board {board}")
all_posts[mapped] = response["posts"] all_posts[mapped] = response["posts"]
# Split into 10,000 chunks
if not all_posts: if not all_posts:
return return
await self.handle_posts(all_posts) await self.handle_posts(all_posts)
# threads_per_core = int(len(all_posts) / CPU_THREADS)
# for i in range(CPU_THREADS):
# new_dict = {}
# pulled_posts = self.take_items(all_posts, threads_per_core)
# for k, v in pulled_posts:
# if k in new_dict:
# new_dict[k].append(v)
# else:
# new_dict[k] = [v]
# await self.handle_posts_thread(new_dict)
# print("VAL", ceil(len(all_posts) / threads_per_core))
# split_posts = array_split(all_posts, ceil(len(all_posts) / threads_per_core))
# print("THREADS PER CORE SPLIT", len(split_posts))
# # print("SPLIT CHUNK", len(split_posts))
# for posts in split_posts:
# print("SPAWNED THREAD TO PROCESS", len(posts), "POSTS")
# await self.handle_posts_thread(posts)
# await self.handle_posts_thread(all_posts)
@asyncio.coroutine
def handle_posts_thread(self, posts):
loop = asyncio.get_event_loop()
yield from loop.run_in_executor(p, self.handle_posts, posts)
async def handle_posts(self, posts): async def handle_posts(self, posts):
to_store = [] to_store = []
@ -170,50 +134,13 @@ class Chan4(object):
for index, post in enumerate(post_list): for index, post in enumerate(post_list):
posts[key][index]["type"] = "msg" posts[key][index]["type"] = "msg"
# # Calculate hash for post
# post_normalised = ujson.dumps(post, sort_keys=True)
# hash = siphash(self.hash_key, post_normalised)
# hash = str(hash)
# redis_key = f"cache.{board}.{thread}.{post['no']}"
# key_content = db.r.get(redis_key)
# if key_content:
# key_content = key_content.decode("ascii")
# if key_content == hash:
# continue
# else:
# posts[key][index]["type"] = "update"
# #db.r.set(redis_key, hash)
# for key2, value in list(post.items()):
# if key2 in ATTRMAP:
# post[ATTRMAP[key2]] = posts[key][index][key2]
# del posts[key][index][key2]
# if "ts" in post:
# old_time = posts[key][index]["ts"]
# # '08/30/22(Tue)02:25:37'
# time_spl = old_time.split(":")
# if len(time_spl) == 3:
# old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S")
# else:
# old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M")
# # new_ts = old_ts.isoformat()
# new_ts = int(old_ts.timestamp())
# posts[key][index]["ts"] = new_ts
# if "msg" in post:
# soup = BeautifulSoup(posts[key][index]["msg"], "html.parser")
# msg = soup.get_text(separator="\n")
# posts[key][index]["msg"] = msg
posts[key][index]["src"] = "4ch" posts[key][index]["src"] = "4ch"
posts[key][index]["net"] = board posts[key][index]["net"] = board
posts[key][index]["channel"] = thread posts[key][index]["channel"] = thread
to_store.append(posts[key][index]) to_store.append(posts[key][index])
# print({name_map[name]: val for name, val in post.items()})
# print(f"Got posts: {len(posts)}")
if to_store: if to_store:
print("STORING", len(to_store))
await db.queue_message_bulk(to_store) await db.queue_message_bulk(to_store)
async def fetch(self, url, session, mapped): async def fetch(self, url, session, mapped):
@ -238,7 +165,7 @@ class Chan4(object):
async with aiohttp.ClientSession(connector=connector) as session: async with aiohttp.ClientSession(connector=connector) as session:
for mapped, method in methods.items(): for mapped, method in methods.items():
url = f"{self.api_endpoint}/{method}" url = f"{self.api_endpoint}/{method}"
self.log.debug(f"GET {url}") # self.log.debug(f"GET {url}")
task = asyncio.create_task(self.bound_fetch(sem, url, session, mapped)) task = asyncio.create_task(self.bound_fetch(sem, url, session, mapped))
# task = asyncio.ensure_future(self.bound_fetch(sem, url, session)) # task = asyncio.ensure_future(self.bound_fetch(sem, url, session))
tasks.append(task) tasks.append(task)

@ -1,6 +1,6 @@
import asyncio import asyncio
import ujson import orjson
import db import db
import util import util
@ -8,9 +8,13 @@ from processing import process
SOURCES = ["4ch", "irc", "dis"] SOURCES = ["4ch", "irc", "dis"]
KEYPREFIX = "queue." KEYPREFIX = "queue."
CHUNK_SIZE = 90000
# Chunk size per source (divide by len(SOURCES) for total)
CHUNK_SIZE = 9000
ITER_DELAY = 0.5 ITER_DELAY = 0.5
log = util.get_logger("ingest")
class Ingest(object): class Ingest(object):
def __init__(self): def __init__(self):
@ -18,8 +22,6 @@ class Ingest(object):
self.log = util.get_logger(name) self.log = util.get_logger(name)
async def run(self): async def run(self):
# items = [{'no': 23567753, 'now': '09/12/22(Mon)20:10:29', 'name': 'Anonysmous', 'filename': '1644986767568', 'ext': '.webm', 'w': 1280, 'h': 720, 'tn_w': 125, 'tn_h': 70, 'tim': 1663027829301457, 'time': 1663027829, 'md5': 'zeElr1VR05XpZ2XuAPhmPA==', 'fsize': 3843621, 'resto': 23554700, 'type': 'msg', 'src': '4ch', 'net': 'gif', 'channel': '23554700'}]
# await process.spawn_processing_threads(items)
while True: while True:
await self.get_chunk() await self.get_chunk()
await asyncio.sleep(ITER_DELAY) await asyncio.sleep(ITER_DELAY)
@ -31,11 +33,8 @@ class Ingest(object):
chunk = await db.ar.spop(key, CHUNK_SIZE) chunk = await db.ar.spop(key, CHUNK_SIZE)
if not chunk: if not chunk:
continue continue
# self.log.info(f"Got chunk: {chunk}")
for item in chunk: for item in chunk:
item = ujson.loads(item) item = orjson.loads(item)
# self.log.info(f"Got item: {item}")
items.append(item) items.append(item)
if items: if items:
print("PROCESSING", len(items))
await process.spawn_processing_threads(items) await process.spawn_processing_threads(items)

@ -3,7 +3,7 @@ import logging
log = logging.getLogger("util") log = logging.getLogger("util")
debug = False debug = True
# Color definitions # Color definitions
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8) BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)

Loading…
Cancel
Save