Add throttling for performance
This commit is contained in:
@@ -1,9 +1,30 @@
|
||||
version: "2.2"
|
||||
|
||||
services:
|
||||
rts:
|
||||
image: xf/monolith:latest
|
||||
container_name: rts_monolith
|
||||
command: sh -c '. /venv/bin/activate && exec python rts.py'
|
||||
build: .
|
||||
volumes:
|
||||
- ${PORTAINER_GIT_DIR}:/code
|
||||
- type: bind
|
||||
source: /code/run
|
||||
target: /var/run
|
||||
environment:
|
||||
PORTAINER_GIT_DIR: "${PORTAINER_GIT_DIR}"
|
||||
MODULES_ENABLED: "${MODULES_ENABLED}"
|
||||
deploy:
|
||||
resources:
|
||||
limits:
|
||||
cpus: '0.5'
|
||||
memory: 1.0G
|
||||
network_mode: host
|
||||
|
||||
app:
|
||||
image: xf/monolith:latest
|
||||
container_name: monolith
|
||||
#command: sh -c '. /venv/bin/activate && exec python -m cProfile -o /tmp/profile.out monolith.py'
|
||||
build: .
|
||||
volumes:
|
||||
- ${PORTAINER_GIT_DIR}:/code
|
||||
@@ -44,6 +65,8 @@ services:
|
||||
MONOLITH_PROCESS_THREADS: "${MONOLITH_PROCESS_THREADS}"
|
||||
# Enable performance metrics after message processing
|
||||
MONOLITH_PROCESS_PERFSTATS: "${MONOLITH_PROCESS_PERFSTATS}"
|
||||
MONOLITH_PROCESS_TARGET_CPU_USAGE: "${MONOLITH_PROCESS_TARGET_CPU_USAGE}"
|
||||
MONOLITH_CH4_TARGET_CPU_USAGE: "${MONOLITH_CH4_TARGET_CPU_USAGE}"
|
||||
MONOLITH_CH4_BOARDS: "${MONOLITH_CH4_BOARDS}"
|
||||
REDIS_PASSWORD: "${REDIS_PASSWORD}"
|
||||
MONOLITH_INGEST_INCREASE_BELOW: "${MONOLITH_INGEST_INCREASE_BELOW}"
|
||||
|
||||
@@ -8,6 +8,9 @@ import string
|
||||
# For timing
|
||||
import time
|
||||
|
||||
# For throttling
|
||||
import psutil
|
||||
|
||||
# Squash errors
|
||||
import warnings
|
||||
from concurrent.futures import ProcessPoolExecutor
|
||||
@@ -57,6 +60,9 @@ KEYNAME = "queue"
|
||||
MONOLITH_PROCESS_PERFSTATS = (
|
||||
getenv("MONOLITH_PROCESS_PERFSTATS", "false").lower() in trues
|
||||
)
|
||||
TARGET_CPU_USAGE = float(os.getenv("MONOLITH_PROCESS_TARGET_CPU_USAGE", 50.0))
|
||||
|
||||
SLEEP_INTERVAL = 0.0
|
||||
|
||||
CUSTOM_FILTERS = [
|
||||
lambda x: x.lower(),
|
||||
@@ -143,6 +149,7 @@ async def spawn_processing_threads(chunk, length):
|
||||
|
||||
|
||||
def process_data(chunk, index, chunk_size):
|
||||
global SLEEP_INTERVAL
|
||||
log.debug(f"[{chunk}/{index}] Processing {chunk_size} messages")
|
||||
to_store = []
|
||||
|
||||
@@ -155,11 +162,13 @@ def process_data(chunk, index, chunk_size):
|
||||
hash_time = 0.0
|
||||
normal2_time = 0.0
|
||||
soup_time = 0.0
|
||||
sleep_time = 0.0
|
||||
|
||||
total_time = 0.0
|
||||
|
||||
# Initialise sentiment analyser
|
||||
analyzer = SentimentIntensityAnalyzer()
|
||||
|
||||
for msg_index in range(chunk_size):
|
||||
msg = db.r.rpop(KEYNAME)
|
||||
if not msg:
|
||||
@@ -207,7 +216,9 @@ def process_data(chunk, index, chunk_size):
|
||||
continue
|
||||
# pass
|
||||
else:
|
||||
msg["type"] = "update"
|
||||
# msg["type"] = "update"
|
||||
# Fuck it, updates just brew spam
|
||||
continue
|
||||
db.r.set(redis_key, hash)
|
||||
time_took = (time.process_time() - start) * 1000
|
||||
hash_time += time_took
|
||||
@@ -289,6 +300,26 @@ def process_data(chunk, index, chunk_size):
|
||||
to_store.append(msg)
|
||||
total_time += (time.process_time() - total_start) * 1000
|
||||
|
||||
# Dynamic throttling to reduce CPU usage
|
||||
if msg_index % 5 == 0:
|
||||
current_cpu_usage = psutil.cpu_percent(interval=0.2)
|
||||
if current_cpu_usage > TARGET_CPU_USAGE:
|
||||
SLEEP_INTERVAL += 0.02
|
||||
if SLEEP_INTERVAL > 0.5:
|
||||
SLEEP_INTERVAL = 0.5
|
||||
log.info(
|
||||
f"CPU {current_cpu_usage}% > {TARGET_CPU_USAGE}%, "
|
||||
f"=> sleep {SLEEP_INTERVAL:.3f}s"
|
||||
)
|
||||
elif current_cpu_usage < TARGET_CPU_USAGE and SLEEP_INTERVAL > 0.01:
|
||||
SLEEP_INTERVAL -= 0.01
|
||||
log.info(
|
||||
f"CPU {current_cpu_usage}% < {TARGET_CPU_USAGE}%, "
|
||||
f"=> sleep {SLEEP_INTERVAL:.3f}s"
|
||||
)
|
||||
time.sleep(SLEEP_INTERVAL)
|
||||
sleep_time += SLEEP_INTERVAL
|
||||
|
||||
if MONOLITH_PROCESS_PERFSTATS:
|
||||
log.debug("=====================================")
|
||||
log.debug(f"Chunk: {chunk}")
|
||||
@@ -303,6 +334,7 @@ def process_data(chunk, index, chunk_size):
|
||||
log.debug(f"Normal2: {normal2_time}")
|
||||
log.debug(f"Soup: {soup_time}")
|
||||
log.debug(f"Total: {total_time}")
|
||||
log.debug(f"Throttling: {sleep_time}")
|
||||
log.debug("=====================================")
|
||||
|
||||
return to_store
|
||||
|
||||
@@ -8,6 +8,8 @@ from os import getenv
|
||||
import aiohttp
|
||||
from numpy import array_split
|
||||
|
||||
import psutil
|
||||
|
||||
import db
|
||||
import util
|
||||
|
||||
@@ -25,12 +27,14 @@ CRAWL_DELAY = int(getenv("MONOLITH_CH4_CRAWL_DELAY", 5))
|
||||
# Semaphore value ?
|
||||
THREADS_SEMAPHORE = int(getenv("MONOLITH_CH4_THREADS_SEMAPHORE", 1000))
|
||||
|
||||
# Target CPU usage percentage
|
||||
TARGET_CPU_USAGE = float(getenv("MONOLITH_CH4_TARGET_CPU_USAGE", 50.0))
|
||||
|
||||
# Boards to crawl
|
||||
BOARDS = getenv("MONOLITH_CH4_BOARDS", "").split(",")
|
||||
|
||||
# CONFIGURATION END #
|
||||
|
||||
|
||||
class Chan4(object):
|
||||
"""
|
||||
4chan indexer, crawler and ingester.
|
||||
@@ -40,6 +44,8 @@ class Chan4(object):
|
||||
name = self.__class__.__name__
|
||||
self.log = util.get_logger(name)
|
||||
|
||||
self.sleep_interval = 0.0
|
||||
|
||||
self.api_endpoint = "https://a.4cdn.org"
|
||||
# self.boards = ["out", "g", "a", "3", "pol"] #
|
||||
self.boards = []
|
||||
@@ -59,6 +65,33 @@ class Chan4(object):
|
||||
self.hash_key = self.hash_key.decode("ascii")
|
||||
self.log.debug(f"Decoded hash key: {self.hash_key}")
|
||||
|
||||
async def dynamic_throttle(self):
|
||||
"""
|
||||
Dynamically sleeps before a request if CPU usage is above our target.
|
||||
Also, if CPU usage is far below the target, reduce the sleep time.
|
||||
Caps the sleep interval at 0.2s.
|
||||
Prints CPU usage and sleep interval like process.py.
|
||||
"""
|
||||
current_cpu_usage = psutil.cpu_percent(interval=0.2)
|
||||
|
||||
if current_cpu_usage > TARGET_CPU_USAGE:
|
||||
self.sleep_interval += 0.01
|
||||
if self.sleep_interval > 0.1:
|
||||
self.sleep_interval = 0.1
|
||||
self.log.info(
|
||||
f"CPU {current_cpu_usage}% > {TARGET_CPU_USAGE}%, "
|
||||
f"=> sleep {self.sleep_interval:.3f}s"
|
||||
)
|
||||
elif current_cpu_usage < TARGET_CPU_USAGE and self.sleep_interval > 0.01:
|
||||
self.sleep_interval -= 0.01
|
||||
self.log.info(
|
||||
f"CPU {current_cpu_usage}% < {TARGET_CPU_USAGE}%, "
|
||||
f"=> sleep {self.sleep_interval:.3f}s"
|
||||
)
|
||||
|
||||
if self.sleep_interval > 0:
|
||||
await asyncio.sleep(self.sleep_interval)
|
||||
|
||||
async def run(self):
|
||||
if "ALL" in BOARDS:
|
||||
await self.get_board_list()
|
||||
@@ -76,6 +109,8 @@ class Chan4(object):
|
||||
for board in response["boards"]:
|
||||
self.boards.append(board["board"])
|
||||
self.log.debug(f"Got boards: {self.boards}")
|
||||
# await self.dynamic_throttle()
|
||||
# TODO
|
||||
|
||||
async def get_thread_lists(self, boards):
|
||||
# self.log.debug(f"Getting thread list for {boards}")
|
||||
@@ -91,6 +126,8 @@ class Chan4(object):
|
||||
for threads in page["threads"]:
|
||||
no = threads["no"]
|
||||
to_get.append((board, no))
|
||||
# await self.dynamic_throttle()
|
||||
# TODO
|
||||
|
||||
if not to_get:
|
||||
return
|
||||
@@ -100,6 +137,8 @@ class Chan4(object):
|
||||
for index, thr in enumerate(split_threads):
|
||||
self.log.debug(f"Series {index} - getting {len(thr)} threads")
|
||||
await self.get_threads_content(thr)
|
||||
# await self.dynamic_throttle()
|
||||
# TODO
|
||||
await asyncio.sleep(THREADS_DELAY)
|
||||
|
||||
def take_items(self, dict_list, n):
|
||||
@@ -130,6 +169,8 @@ class Chan4(object):
|
||||
continue
|
||||
board, thread = mapped
|
||||
all_posts[mapped] = response["posts"]
|
||||
# await self.dynamic_throttle()
|
||||
# TODO
|
||||
|
||||
if not all_posts:
|
||||
return
|
||||
@@ -147,6 +188,8 @@ class Chan4(object):
|
||||
post["channel"] = thread
|
||||
|
||||
to_store.append(post)
|
||||
# await self.dynamic_throttle()
|
||||
# TODO
|
||||
|
||||
if to_store:
|
||||
await db.queue_message_bulk(to_store)
|
||||
@@ -161,6 +204,7 @@ class Chan4(object):
|
||||
async def bound_fetch(self, sem, url, session, mapped):
|
||||
# Getter function with semaphore.
|
||||
async with sem:
|
||||
await self.dynamic_throttle()
|
||||
try:
|
||||
return await self.fetch(url, session, mapped)
|
||||
except: # noqa
|
||||
|
||||
Reference in New Issue
Block a user