Compare commits

...

3 Commits

5 changed files with 181 additions and 2 deletions

View File

@@ -1,9 +1,30 @@
version: "2.2" version: "2.2"
services: services:
rts:
image: xf/monolith:latest
container_name: rts_monolith
command: sh -c '. /venv/bin/activate && exec python rts.py'
build: .
volumes:
- ${PORTAINER_GIT_DIR}:/code
- type: bind
source: /code/run
target: /var/run
environment:
PORTAINER_GIT_DIR: "${PORTAINER_GIT_DIR}"
MODULES_ENABLED: "${MODULES_ENABLED}"
deploy:
resources:
limits:
cpus: '0.5'
memory: 1.0G
network_mode: host
app: app:
image: xf/monolith:latest image: xf/monolith:latest
container_name: monolith container_name: monolith
#command: sh -c '. /venv/bin/activate && exec python -m cProfile -o /tmp/profile.out monolith.py'
build: . build: .
volumes: volumes:
- ${PORTAINER_GIT_DIR}:/code - ${PORTAINER_GIT_DIR}:/code
@@ -44,6 +65,8 @@ services:
MONOLITH_PROCESS_THREADS: "${MONOLITH_PROCESS_THREADS}" MONOLITH_PROCESS_THREADS: "${MONOLITH_PROCESS_THREADS}"
# Enable performance metrics after message processing # Enable performance metrics after message processing
MONOLITH_PROCESS_PERFSTATS: "${MONOLITH_PROCESS_PERFSTATS}" MONOLITH_PROCESS_PERFSTATS: "${MONOLITH_PROCESS_PERFSTATS}"
MONOLITH_PROCESS_TARGET_CPU_USAGE: "${MONOLITH_PROCESS_TARGET_CPU_USAGE}"
MONOLITH_CH4_TARGET_CPU_USAGE: "${MONOLITH_CH4_TARGET_CPU_USAGE}"
MONOLITH_CH4_BOARDS: "${MONOLITH_CH4_BOARDS}" MONOLITH_CH4_BOARDS: "${MONOLITH_CH4_BOARDS}"
REDIS_PASSWORD: "${REDIS_PASSWORD}" REDIS_PASSWORD: "${REDIS_PASSWORD}"
MONOLITH_INGEST_INCREASE_BELOW: "${MONOLITH_INGEST_INCREASE_BELOW}" MONOLITH_INGEST_INCREASE_BELOW: "${MONOLITH_INGEST_INCREASE_BELOW}"

79
perf/throttle.py Normal file
View File

@@ -0,0 +1,79 @@
import psutil
import asyncio
import time
import util
class DynamicThrottle(object):
def __init__(self, **kwargs):
self.target_cpu_usage = kwargs.get("target_cpu_usage", 50)
self.sleep_interval = 0.0
self.sleep_increment = kwargs.get("sleep_increment", 0.01)
self.sleep_decrement = kwargs.get("sleep_decrement", 0.01)
self.sleep_max = kwargs.get("sleep_max", 0.1)
self.sleep_min = kwargs.get("sleep_min", 0.01)
self.psutil_interval = kwargs.get("psutil_interval", 0.1)
self.log = kwargs.get("log", util.get_logger(self.__class__.__name__))
self.consecutive_increments = 0
self.consecutive_decrements = 0
self.last_was_increment = False
if kwargs.get("async"):
self.dynamic_throttle = self.dynamic_throttle_async
else:
self.dynamic_throttle = self.dynamic_throttle
async def dynamic_throttle_async(self):
"""
Dynamically sleeps before a request if CPU usage is above our target.
"""
current_cpu_usage = psutil.cpu_percent(interval=self.psutil_interval)
if current_cpu_usage > self.target_cpu_usage:
self.sleep_interval += self.sleep_increment
if self.sleep_interval > self.sleep_max:
self.sleep_interval = self.sleep_max
self.log.info(
f"CPU {current_cpu_usage}% > {self.target_cpu_usage}%, "
f"=> sleep {self.sleep_interval:.3f}s"
)
elif current_cpu_usage < self.target_cpu_usage and self.sleep_interval > self.sleep_min:
self.sleep_interval -= self.sleep_decrement
self.log.info(
f"CPU {current_cpu_usage}% < {self.target_cpu_usage}%, "
f"=> sleep {self.sleep_interval:.3f}s"
)
if self.sleep_interval > 0:
await asyncio.sleep(self.sleep_interval)
def dynamic_throttle(self):
"""
Dynamically sleeps before a request if CPU usage is above our target.
"""
current_cpu_usage = psutil.cpu_percent(interval=self.psutil_interval)
if current_cpu_usage > self.target_cpu_usage:
self.sleep_interval += self.sleep_increment
if self.sleep_interval > self.sleep_max:
self.sleep_interval = self.sleep_max
self.log.info(
f"CPU {current_cpu_usage}% > {self.target_cpu_usage}%, "
f"=> sleep {self.sleep_interval:.3f}s"
)
elif current_cpu_usage < self.target_cpu_usage and self.sleep_interval > self.sleep_min:
self.sleep_interval -= self.sleep_decrement
self.log.info(
f"CPU {current_cpu_usage}% < {self.target_cpu_usage}%, "
f"=> sleep {self.sleep_interval:.3f}s"
)
if self.sleep_interval > 0:
time.sleep(self.sleep_interval)

View File

@@ -8,6 +8,9 @@ import string
# For timing # For timing
import time import time
# For throttling
import psutil
# Squash errors # Squash errors
import warnings import warnings
from concurrent.futures import ProcessPoolExecutor from concurrent.futures import ProcessPoolExecutor
@@ -57,6 +60,9 @@ KEYNAME = "queue"
MONOLITH_PROCESS_PERFSTATS = ( MONOLITH_PROCESS_PERFSTATS = (
getenv("MONOLITH_PROCESS_PERFSTATS", "false").lower() in trues getenv("MONOLITH_PROCESS_PERFSTATS", "false").lower() in trues
) )
TARGET_CPU_USAGE = float(os.getenv("MONOLITH_PROCESS_TARGET_CPU_USAGE", 50.0))
SLEEP_INTERVAL = 0.0
CUSTOM_FILTERS = [ CUSTOM_FILTERS = [
lambda x: x.lower(), lambda x: x.lower(),
@@ -143,6 +149,7 @@ async def spawn_processing_threads(chunk, length):
def process_data(chunk, index, chunk_size): def process_data(chunk, index, chunk_size):
global SLEEP_INTERVAL
log.debug(f"[{chunk}/{index}] Processing {chunk_size} messages") log.debug(f"[{chunk}/{index}] Processing {chunk_size} messages")
to_store = [] to_store = []
@@ -155,11 +162,13 @@ def process_data(chunk, index, chunk_size):
hash_time = 0.0 hash_time = 0.0
normal2_time = 0.0 normal2_time = 0.0
soup_time = 0.0 soup_time = 0.0
sleep_time = 0.0
total_time = 0.0 total_time = 0.0
# Initialise sentiment analyser # Initialise sentiment analyser
analyzer = SentimentIntensityAnalyzer() analyzer = SentimentIntensityAnalyzer()
for msg_index in range(chunk_size): for msg_index in range(chunk_size):
msg = db.r.rpop(KEYNAME) msg = db.r.rpop(KEYNAME)
if not msg: if not msg:
@@ -207,7 +216,9 @@ def process_data(chunk, index, chunk_size):
continue continue
# pass # pass
else: else:
msg["type"] = "update" # msg["type"] = "update"
# Fuck it, updates just brew spam
continue
db.r.set(redis_key, hash) db.r.set(redis_key, hash)
time_took = (time.process_time() - start) * 1000 time_took = (time.process_time() - start) * 1000
hash_time += time_took hash_time += time_took
@@ -289,6 +300,26 @@ def process_data(chunk, index, chunk_size):
to_store.append(msg) to_store.append(msg)
total_time += (time.process_time() - total_start) * 1000 total_time += (time.process_time() - total_start) * 1000
# Dynamic throttling to reduce CPU usage
if msg_index % 5 == 0:
current_cpu_usage = psutil.cpu_percent(interval=0.2)
if current_cpu_usage > TARGET_CPU_USAGE:
SLEEP_INTERVAL += 0.02
if SLEEP_INTERVAL > 0.5:
SLEEP_INTERVAL = 0.5
log.info(
f"CPU {current_cpu_usage}% > {TARGET_CPU_USAGE}%, "
f"=> sleep {SLEEP_INTERVAL:.3f}s"
)
elif current_cpu_usage < TARGET_CPU_USAGE and SLEEP_INTERVAL > 0.01:
SLEEP_INTERVAL -= 0.01
log.info(
f"CPU {current_cpu_usage}% < {TARGET_CPU_USAGE}%, "
f"=> sleep {SLEEP_INTERVAL:.3f}s"
)
time.sleep(SLEEP_INTERVAL)
sleep_time += SLEEP_INTERVAL
if MONOLITH_PROCESS_PERFSTATS: if MONOLITH_PROCESS_PERFSTATS:
log.debug("=====================================") log.debug("=====================================")
log.debug(f"Chunk: {chunk}") log.debug(f"Chunk: {chunk}")
@@ -303,6 +334,7 @@ def process_data(chunk, index, chunk_size):
log.debug(f"Normal2: {normal2_time}") log.debug(f"Normal2: {normal2_time}")
log.debug(f"Soup: {soup_time}") log.debug(f"Soup: {soup_time}")
log.debug(f"Total: {total_time}") log.debug(f"Total: {total_time}")
log.debug(f"Throttling: {sleep_time}")
log.debug("=====================================") log.debug("=====================================")
return to_store return to_store

View File

@@ -23,3 +23,4 @@ uvloop
elasticsearch[async] elasticsearch[async]
msgpack msgpack
# flpc # flpc
psutil

View File

@@ -8,6 +8,8 @@ from os import getenv
import aiohttp import aiohttp
from numpy import array_split from numpy import array_split
import psutil
import db import db
import util import util
@@ -25,12 +27,14 @@ CRAWL_DELAY = int(getenv("MONOLITH_CH4_CRAWL_DELAY", 5))
# Semaphore value ? # Semaphore value ?
THREADS_SEMAPHORE = int(getenv("MONOLITH_CH4_THREADS_SEMAPHORE", 1000)) THREADS_SEMAPHORE = int(getenv("MONOLITH_CH4_THREADS_SEMAPHORE", 1000))
# Target CPU usage percentage
TARGET_CPU_USAGE = float(getenv("MONOLITH_CH4_TARGET_CPU_USAGE", 50.0))
# Boards to crawl # Boards to crawl
BOARDS = getenv("MONOLITH_CH4_BOARDS", "").split(",") BOARDS = getenv("MONOLITH_CH4_BOARDS", "").split(",")
# CONFIGURATION END # # CONFIGURATION END #
class Chan4(object): class Chan4(object):
""" """
4chan indexer, crawler and ingester. 4chan indexer, crawler and ingester.
@@ -40,6 +44,8 @@ class Chan4(object):
name = self.__class__.__name__ name = self.__class__.__name__
self.log = util.get_logger(name) self.log = util.get_logger(name)
self.sleep_interval = 0.0
self.api_endpoint = "https://a.4cdn.org" self.api_endpoint = "https://a.4cdn.org"
# self.boards = ["out", "g", "a", "3", "pol"] # # self.boards = ["out", "g", "a", "3", "pol"] #
self.boards = [] self.boards = []
@@ -59,6 +65,33 @@ class Chan4(object):
self.hash_key = self.hash_key.decode("ascii") self.hash_key = self.hash_key.decode("ascii")
self.log.debug(f"Decoded hash key: {self.hash_key}") self.log.debug(f"Decoded hash key: {self.hash_key}")
async def dynamic_throttle(self):
"""
Dynamically sleeps before a request if CPU usage is above our target.
Also, if CPU usage is far below the target, reduce the sleep time.
Caps the sleep interval at 0.2s.
Prints CPU usage and sleep interval like process.py.
"""
current_cpu_usage = psutil.cpu_percent(interval=0.2)
if current_cpu_usage > TARGET_CPU_USAGE:
self.sleep_interval += 0.01
if self.sleep_interval > 0.1:
self.sleep_interval = 0.1
self.log.info(
f"CPU {current_cpu_usage}% > {TARGET_CPU_USAGE}%, "
f"=> sleep {self.sleep_interval:.3f}s"
)
elif current_cpu_usage < TARGET_CPU_USAGE and self.sleep_interval > 0.01:
self.sleep_interval -= 0.01
self.log.info(
f"CPU {current_cpu_usage}% < {TARGET_CPU_USAGE}%, "
f"=> sleep {self.sleep_interval:.3f}s"
)
if self.sleep_interval > 0:
await asyncio.sleep(self.sleep_interval)
async def run(self): async def run(self):
if "ALL" in BOARDS: if "ALL" in BOARDS:
await self.get_board_list() await self.get_board_list()
@@ -76,6 +109,8 @@ class Chan4(object):
for board in response["boards"]: for board in response["boards"]:
self.boards.append(board["board"]) self.boards.append(board["board"])
self.log.debug(f"Got boards: {self.boards}") self.log.debug(f"Got boards: {self.boards}")
# await self.dynamic_throttle()
# TODO
async def get_thread_lists(self, boards): async def get_thread_lists(self, boards):
# self.log.debug(f"Getting thread list for {boards}") # self.log.debug(f"Getting thread list for {boards}")
@@ -91,6 +126,8 @@ class Chan4(object):
for threads in page["threads"]: for threads in page["threads"]:
no = threads["no"] no = threads["no"]
to_get.append((board, no)) to_get.append((board, no))
# await self.dynamic_throttle()
# TODO
if not to_get: if not to_get:
return return
@@ -100,6 +137,8 @@ class Chan4(object):
for index, thr in enumerate(split_threads): for index, thr in enumerate(split_threads):
self.log.debug(f"Series {index} - getting {len(thr)} threads") self.log.debug(f"Series {index} - getting {len(thr)} threads")
await self.get_threads_content(thr) await self.get_threads_content(thr)
# await self.dynamic_throttle()
# TODO
await asyncio.sleep(THREADS_DELAY) await asyncio.sleep(THREADS_DELAY)
def take_items(self, dict_list, n): def take_items(self, dict_list, n):
@@ -130,6 +169,8 @@ class Chan4(object):
continue continue
board, thread = mapped board, thread = mapped
all_posts[mapped] = response["posts"] all_posts[mapped] = response["posts"]
# await self.dynamic_throttle()
# TODO
if not all_posts: if not all_posts:
return return
@@ -147,6 +188,8 @@ class Chan4(object):
post["channel"] = thread post["channel"] = thread
to_store.append(post) to_store.append(post)
# await self.dynamic_throttle()
# TODO
if to_store: if to_store:
await db.queue_message_bulk(to_store) await db.queue_message_bulk(to_store)
@@ -161,6 +204,7 @@ class Chan4(object):
async def bound_fetch(self, sem, url, session, mapped): async def bound_fetch(self, sem, url, session, mapped):
# Getter function with semaphore. # Getter function with semaphore.
async with sem: async with sem:
await self.dynamic_throttle()
try: try:
return await self.fetch(url, session, mapped) return await self.fetch(url, session, mapped)
except: # noqa except: # noqa