diff --git a/docker/docker-compose.prod.yml b/docker/docker-compose.prod.yml index b81597d..dac8c23 100644 --- a/docker/docker-compose.prod.yml +++ b/docker/docker-compose.prod.yml @@ -19,6 +19,8 @@ services: - ${THRESHOLD_CONFIG_DIR}:/code/legacy/conf/live #- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates - ${THRESHOLD_CERT_DIR}:/code/legacy/conf/cert + volumes_from: + - tmp ports: - "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}" - "${THRESHOLD_RELAY_PORT}:${THRESHOLD_RELAY_PORT}" @@ -37,8 +39,37 @@ services: environment: - SSDB_PORT=1289 + tmp: + image: busybox + container_name: tmp_monolith + command: chmod -R 777 /var/run/socks + volumes: + - /var/run/socks + + redis: + image: redis + container_name: redis_monolith + command: redis-server /etc/redis.conf + ulimits: + nproc: 65535 + nofile: + soft: 65535 + hard: 65535 + volumes: + - ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf + - redis_data:/data + volumes_from: + - tmp + healthcheck: + test: "redis-cli -s /var/run/socks/redis.sock ping" + interval: 2s + timeout: 2s + retries: 15 + networks: default: external: name: pathogen +volumes: + redis_data: diff --git a/docker/redis.conf b/docker/redis.conf index 46366bf..424a612 100644 --- a/docker/redis.conf +++ b/docker/redis.conf @@ -1,2 +1,2 @@ -unixsocket /var/run/redis/redis.sock +unixsocket /var/run/socks/redis.sock unixsocketperm 777 \ No newline at end of file diff --git a/legacy/conf/templates/config.json b/legacy/conf/templates/config.json index e5c24ca..500431a 100644 --- a/legacy/conf/templates/config.json +++ b/legacy/conf/templates/config.json @@ -17,7 +17,7 @@ }, "Key": "key.pem", "Certificate": "cert.pem", - "RedisSocket": "/var/run/redis/redis.sock", + "RedisSocket": "/var/run/socks/redis.sock", "RedisDBEphemeral": 1, "RedisDBPersistent": 0, "UsePassword": false, diff --git a/processing/process.py b/processing/process.py index da6a218..e9802c2 100644 --- a/processing/process.py +++ b/processing/process.py @@ -14,7 +14,6 @@ from concurrent.futures import ProcessPoolExecutor # For timestamp processing from datetime import datetime -from math import ceil from os import getenv import orjson @@ -35,7 +34,6 @@ from gensim.parsing.preprocessing import ( # stem_text, strip_short, strip_tags, ) -from numpy import array_split from polyglot.detect.base import logger as polyglot_logger # For NLP @@ -54,6 +52,8 @@ from schemas.ch4_s import ATTRMAP trues = ("true", "1", "t", True) +KEYNAME = "queue" + MONOLITH_PROCESS_PERFSTATS = ( getenv("MONOLITH_PROCESS_PERFSTATS", "false").lower() in trues ) @@ -106,20 +106,23 @@ hash_key = get_hash_key() @asyncio.coroutine -async def spawn_processing_threads(data): - len_data = len(data) - +async def spawn_processing_threads(chunk, length): + log.debug(f"Spawning processing threads for chunk {chunk} of length {length}") loop = asyncio.get_event_loop() tasks = [] - if len(data) < CPU_THREADS * 100: - split_data = [data] + if length < CPU_THREADS * 100: + cores = 1 + chunk_size = length else: - msg_per_core = int(len(data) / CPU_THREADS) - split_data = array_split(data, ceil(len(data) / msg_per_core)) - for index, split in enumerate(split_data): - log.debug(f"Delegating processing of {len(split)} messages to thread {index}") - task = loop.run_in_executor(p, process_data, split) + cores = CPU_THREADS + chunk_size = int(length / cores) + + for index in range(cores): + log.debug( + f"[{chunk}/{index}] Delegating {chunk_size} messages to thread {index}" + ) + task = loop.run_in_executor(p, process_data, chunk, index, chunk_size) tasks.append(task) results = [await task for task in tasks] @@ -128,8 +131,8 @@ async def spawn_processing_threads(data): flat_list = [item for sublist in results for item in sublist] log.debug( ( - f"Results from processing of {len_data} messages in " - f"{len(split_data)} threads: {len(flat_list)}" + f"[{chunk}/{index}] Results from processing of {length} messages in " + f"{cores} threads: {len(flat_list)}" ) ) await db.store_kafka_batch(flat_list) @@ -137,7 +140,8 @@ async def spawn_processing_threads(data): # log.debug(f"Finished processing {len_data} messages") -def process_data(data): +def process_data(chunk, index, chunk_size): + log.debug(f"[{chunk}/{index}] Processing {chunk_size} messages") to_store = [] sentiment_time = 0.0 @@ -154,7 +158,11 @@ def process_data(data): # Initialise sentiment analyser analyzer = SentimentIntensityAnalyzer() - for msg in data: + for msg_index in range(chunk_size): + msg = db.r.rpop(KEYNAME) + if not msg: + return + msg = orjson.loads(msg) total_start = time.process_time() # normalise fields start = time.process_time() @@ -185,13 +193,16 @@ def process_data(data): post_normalised = orjson.dumps(msg, option=orjson.OPT_SORT_KEYS) hash = siphash(hash_key, post_normalised) hash = str(hash) - redis_key = f"cache.{board}.{thread}.{msg['no']}" + redis_key = ( + f"cache.{board}.{thread}.{msg['no']}.{msg['resto']}.{msg['now']}" + ) key_content = db.r.get(redis_key) - if key_content: + if key_content is not None: key_content = key_content.decode("ascii") if key_content == hash: # This deletes the message since the append at the end won't be hit continue + # pass else: msg["type"] = "update" db.r.set(redis_key, hash) @@ -243,7 +254,7 @@ def process_data(data): msg["lang_code"] = lang_code msg["lang_name"] = lang_name except cld2_error as e: - log.error(f"Error detecting language: {e}") + log.error(f"[{chunk}/{index}] Error detecting language: {e}") # So below block doesn't fail lang_code = None time_took = (time.process_time() - start) * 1000 @@ -277,6 +288,8 @@ def process_data(data): if MONOLITH_PROCESS_PERFSTATS: log.debug("=====================================") + log.debug(f"Chunk: {chunk}") + log.debug(f"Index: {index}") log.debug(f"Sentiment: {sentiment_time}") log.debug(f"Regex: {regex_time}") log.debug(f"Polyglot: {polyglot_time}") diff --git a/sources/ch4.py b/sources/ch4.py index 88b65fa..7adec89 100644 --- a/sources/ch4.py +++ b/sources/ch4.py @@ -74,26 +74,28 @@ class Chan4(object): async def get_thread_lists(self, boards): # self.log.debug(f"Getting thread list for {boards}") - board_urls = {board: f"{board}/catalog.json" for board in boards} + board_urls = {board: f"{board}/threads.json" for board in boards} responses = await self.api_call(board_urls) to_get = [] flat_map = [board for board, thread in responses] - self.log.debug(f"Got thread list for {flat_map}: {len(responses)}") - for mapped, response in responses: + self.log.debug(f"Got thread list for {len(responses)} boards: {flat_map}") + for board, response in responses: if not response: continue for page in response: for threads in page["threads"]: no = threads["no"] - to_get.append((mapped, no)) + to_get.append((board, no)) if not to_get: return + self.log.debug(f"Got {len(to_get)} threads to fetch") split_threads = array_split(to_get, ceil(len(to_get) / THREADS_CONCURRENT)) - for threads in split_threads: - await self.get_threads_content(threads) + self.log.debug(f"Split threads into {len(split_threads)} series") + for index, thr in enumerate(split_threads): + self.log.debug(f"Series {index} - getting {len(thr)} threads") + await self.get_threads_content(thr) await asyncio.sleep(THREADS_DELAY) - # await self.get_threads_content(to_get) def take_items(self, dict_list, n): i = 0 @@ -132,14 +134,14 @@ class Chan4(object): to_store = [] for key, post_list in posts.items(): board, thread = key - for index, post in enumerate(post_list): - posts[key][index]["type"] = "msg" + for post in post_list: + post["type"] = "msg" - posts[key][index]["src"] = "4ch" - posts[key][index]["net"] = board - posts[key][index]["channel"] = thread + post["src"] = "4ch" + post["net"] = board + post["channel"] = thread - to_store.append(posts[key][index]) + to_store.append(post) if to_store: await db.queue_message_bulk(to_store) diff --git a/sources/ingest.py b/sources/ingest.py index aae58df..a04c1f8 100644 --- a/sources/ingest.py +++ b/sources/ingest.py @@ -1,8 +1,6 @@ import asyncio from os import getenv -import orjson - import db import util from processing import process @@ -20,6 +18,7 @@ class Ingest(object): def __init__(self): name = self.__class__.__name__ self.log = util.get_logger(name) + self.current_chunk = 0 self.log.info( ( "Starting ingest handler for chunk size of " @@ -30,20 +29,14 @@ class Ingest(object): async def run(self): while True: await self.get_chunk() + self.log.debug(f"Ingest chunk {self.current_chunk} complete") + self.current_chunk += 1 await asyncio.sleep(ITER_DELAY) async def get_chunk(self): - items = [] - # for source in SOURCES: - # key = f"{KEYPREFIX}{source}" length = await db.ar.llen(KEYNAME) - start_num = length - CHUNK_SIZE - chunk = await db.ar.lrange(KEYNAME, start_num, -1) - # chunk = await db.ar.rpop(KEYNAME, CHUNK_SIZE) - if not chunk: + if length > CHUNK_SIZE: + length = CHUNK_SIZE + if not length: return - for item in chunk: - item = orjson.loads(item) - items.append(item) - if items: - await process.spawn_processing_threads(items) + await process.spawn_processing_threads(self.current_chunk, length)