From 54ecfbae645c29bcf69488039129341713624f1a Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Fri, 24 Jan 2025 12:17:22 +0000 Subject: [PATCH] Add throttling for performance --- docker-compose.prod.yml | 23 +++++++++++++++++++++ processing/process.py | 34 +++++++++++++++++++++++++++++- sources/ch4.py | 46 ++++++++++++++++++++++++++++++++++++++++- 3 files changed, 101 insertions(+), 2 deletions(-) diff --git a/docker-compose.prod.yml b/docker-compose.prod.yml index b2f5576..4ad5baa 100644 --- a/docker-compose.prod.yml +++ b/docker-compose.prod.yml @@ -1,9 +1,30 @@ version: "2.2" services: + rts: + image: xf/monolith:latest + container_name: rts_monolith + command: sh -c '. /venv/bin/activate && exec python rts.py' + build: . + volumes: + - ${PORTAINER_GIT_DIR}:/code + - type: bind + source: /code/run + target: /var/run + environment: + PORTAINER_GIT_DIR: "${PORTAINER_GIT_DIR}" + MODULES_ENABLED: "${MODULES_ENABLED}" + deploy: + resources: + limits: + cpus: '0.5' + memory: 1.0G + network_mode: host + app: image: xf/monolith:latest container_name: monolith + #command: sh -c '. /venv/bin/activate && exec python -m cProfile -o /tmp/profile.out monolith.py' build: . volumes: - ${PORTAINER_GIT_DIR}:/code @@ -44,6 +65,8 @@ services: MONOLITH_PROCESS_THREADS: "${MONOLITH_PROCESS_THREADS}" # Enable performance metrics after message processing MONOLITH_PROCESS_PERFSTATS: "${MONOLITH_PROCESS_PERFSTATS}" + MONOLITH_PROCESS_TARGET_CPU_USAGE: "${MONOLITH_PROCESS_TARGET_CPU_USAGE}" + MONOLITH_CH4_TARGET_CPU_USAGE: "${MONOLITH_CH4_TARGET_CPU_USAGE}" MONOLITH_CH4_BOARDS: "${MONOLITH_CH4_BOARDS}" REDIS_PASSWORD: "${REDIS_PASSWORD}" MONOLITH_INGEST_INCREASE_BELOW: "${MONOLITH_INGEST_INCREASE_BELOW}" diff --git a/processing/process.py b/processing/process.py index 5443da8..949f819 100644 --- a/processing/process.py +++ b/processing/process.py @@ -8,6 +8,9 @@ import string # For timing import time +# For throttling +import psutil + # Squash errors import warnings from concurrent.futures import ProcessPoolExecutor @@ -57,6 +60,9 @@ KEYNAME = "queue" MONOLITH_PROCESS_PERFSTATS = ( getenv("MONOLITH_PROCESS_PERFSTATS", "false").lower() in trues ) +TARGET_CPU_USAGE = float(os.getenv("MONOLITH_PROCESS_TARGET_CPU_USAGE", 50.0)) + +SLEEP_INTERVAL = 0.0 CUSTOM_FILTERS = [ lambda x: x.lower(), @@ -143,6 +149,7 @@ async def spawn_processing_threads(chunk, length): def process_data(chunk, index, chunk_size): + global SLEEP_INTERVAL log.debug(f"[{chunk}/{index}] Processing {chunk_size} messages") to_store = [] @@ -155,11 +162,13 @@ def process_data(chunk, index, chunk_size): hash_time = 0.0 normal2_time = 0.0 soup_time = 0.0 + sleep_time = 0.0 total_time = 0.0 # Initialise sentiment analyser analyzer = SentimentIntensityAnalyzer() + for msg_index in range(chunk_size): msg = db.r.rpop(KEYNAME) if not msg: @@ -207,7 +216,9 @@ def process_data(chunk, index, chunk_size): continue # pass else: - msg["type"] = "update" + # msg["type"] = "update" + # Fuck it, updates just brew spam + continue db.r.set(redis_key, hash) time_took = (time.process_time() - start) * 1000 hash_time += time_took @@ -289,6 +300,26 @@ def process_data(chunk, index, chunk_size): to_store.append(msg) total_time += (time.process_time() - total_start) * 1000 + # Dynamic throttling to reduce CPU usage + if msg_index % 5 == 0: + current_cpu_usage = psutil.cpu_percent(interval=0.2) + if current_cpu_usage > TARGET_CPU_USAGE: + SLEEP_INTERVAL += 0.02 + if SLEEP_INTERVAL > 0.5: + SLEEP_INTERVAL = 0.5 + log.info( + f"CPU {current_cpu_usage}% > {TARGET_CPU_USAGE}%, " + f"=> sleep {SLEEP_INTERVAL:.3f}s" + ) + elif current_cpu_usage < TARGET_CPU_USAGE and SLEEP_INTERVAL > 0.01: + SLEEP_INTERVAL -= 0.01 + log.info( + f"CPU {current_cpu_usage}% < {TARGET_CPU_USAGE}%, " + f"=> sleep {SLEEP_INTERVAL:.3f}s" + ) + time.sleep(SLEEP_INTERVAL) + sleep_time += SLEEP_INTERVAL + if MONOLITH_PROCESS_PERFSTATS: log.debug("=====================================") log.debug(f"Chunk: {chunk}") @@ -303,6 +334,7 @@ def process_data(chunk, index, chunk_size): log.debug(f"Normal2: {normal2_time}") log.debug(f"Soup: {soup_time}") log.debug(f"Total: {total_time}") + log.debug(f"Throttling: {sleep_time}") log.debug("=====================================") return to_store diff --git a/sources/ch4.py b/sources/ch4.py index 7659387..98a34d7 100644 --- a/sources/ch4.py +++ b/sources/ch4.py @@ -8,6 +8,8 @@ from os import getenv import aiohttp from numpy import array_split +import psutil + import db import util @@ -25,12 +27,14 @@ CRAWL_DELAY = int(getenv("MONOLITH_CH4_CRAWL_DELAY", 5)) # Semaphore value ? THREADS_SEMAPHORE = int(getenv("MONOLITH_CH4_THREADS_SEMAPHORE", 1000)) +# Target CPU usage percentage +TARGET_CPU_USAGE = float(getenv("MONOLITH_CH4_TARGET_CPU_USAGE", 50.0)) + # Boards to crawl BOARDS = getenv("MONOLITH_CH4_BOARDS", "").split(",") # CONFIGURATION END # - class Chan4(object): """ 4chan indexer, crawler and ingester. @@ -40,6 +44,8 @@ class Chan4(object): name = self.__class__.__name__ self.log = util.get_logger(name) + self.sleep_interval = 0.0 + self.api_endpoint = "https://a.4cdn.org" # self.boards = ["out", "g", "a", "3", "pol"] # self.boards = [] @@ -59,6 +65,33 @@ class Chan4(object): self.hash_key = self.hash_key.decode("ascii") self.log.debug(f"Decoded hash key: {self.hash_key}") + async def dynamic_throttle(self): + """ + Dynamically sleeps before a request if CPU usage is above our target. + Also, if CPU usage is far below the target, reduce the sleep time. + Caps the sleep interval at 0.2s. + Prints CPU usage and sleep interval like process.py. + """ + current_cpu_usage = psutil.cpu_percent(interval=0.2) + + if current_cpu_usage > TARGET_CPU_USAGE: + self.sleep_interval += 0.01 + if self.sleep_interval > 0.1: + self.sleep_interval = 0.1 + self.log.info( + f"CPU {current_cpu_usage}% > {TARGET_CPU_USAGE}%, " + f"=> sleep {self.sleep_interval:.3f}s" + ) + elif current_cpu_usage < TARGET_CPU_USAGE and self.sleep_interval > 0.01: + self.sleep_interval -= 0.01 + self.log.info( + f"CPU {current_cpu_usage}% < {TARGET_CPU_USAGE}%, " + f"=> sleep {self.sleep_interval:.3f}s" + ) + + if self.sleep_interval > 0: + await asyncio.sleep(self.sleep_interval) + async def run(self): if "ALL" in BOARDS: await self.get_board_list() @@ -76,6 +109,8 @@ class Chan4(object): for board in response["boards"]: self.boards.append(board["board"]) self.log.debug(f"Got boards: {self.boards}") + # await self.dynamic_throttle() + # TODO async def get_thread_lists(self, boards): # self.log.debug(f"Getting thread list for {boards}") @@ -91,6 +126,8 @@ class Chan4(object): for threads in page["threads"]: no = threads["no"] to_get.append((board, no)) + # await self.dynamic_throttle() + # TODO if not to_get: return @@ -100,6 +137,8 @@ class Chan4(object): for index, thr in enumerate(split_threads): self.log.debug(f"Series {index} - getting {len(thr)} threads") await self.get_threads_content(thr) + # await self.dynamic_throttle() + # TODO await asyncio.sleep(THREADS_DELAY) def take_items(self, dict_list, n): @@ -130,6 +169,8 @@ class Chan4(object): continue board, thread = mapped all_posts[mapped] = response["posts"] + # await self.dynamic_throttle() + # TODO if not all_posts: return @@ -147,6 +188,8 @@ class Chan4(object): post["channel"] = thread to_store.append(post) + # await self.dynamic_throttle() + # TODO if to_store: await db.queue_message_bulk(to_store) @@ -161,6 +204,7 @@ class Chan4(object): async def bound_fetch(self, sem, url, session, mapped): # Getter function with semaphore. async with sem: + await self.dynamic_throttle() try: return await self.fetch(url, session, mapped) except: # noqa