Compare commits
3 Commits
352909bec0
...
dc533f266f
| Author | SHA1 | Date | |
|---|---|---|---|
|
dc533f266f
|
|||
|
ea4b5e6321
|
|||
|
54ecfbae64
|
@@ -1,9 +1,30 @@
|
|||||||
version: "2.2"
|
version: "2.2"
|
||||||
|
|
||||||
services:
|
services:
|
||||||
|
rts:
|
||||||
|
image: xf/monolith:latest
|
||||||
|
container_name: rts_monolith
|
||||||
|
command: sh -c '. /venv/bin/activate && exec python rts.py'
|
||||||
|
build: .
|
||||||
|
volumes:
|
||||||
|
- ${PORTAINER_GIT_DIR}:/code
|
||||||
|
- type: bind
|
||||||
|
source: /code/run
|
||||||
|
target: /var/run
|
||||||
|
environment:
|
||||||
|
PORTAINER_GIT_DIR: "${PORTAINER_GIT_DIR}"
|
||||||
|
MODULES_ENABLED: "${MODULES_ENABLED}"
|
||||||
|
deploy:
|
||||||
|
resources:
|
||||||
|
limits:
|
||||||
|
cpus: '0.5'
|
||||||
|
memory: 1.0G
|
||||||
|
network_mode: host
|
||||||
|
|
||||||
app:
|
app:
|
||||||
image: xf/monolith:latest
|
image: xf/monolith:latest
|
||||||
container_name: monolith
|
container_name: monolith
|
||||||
|
#command: sh -c '. /venv/bin/activate && exec python -m cProfile -o /tmp/profile.out monolith.py'
|
||||||
build: .
|
build: .
|
||||||
volumes:
|
volumes:
|
||||||
- ${PORTAINER_GIT_DIR}:/code
|
- ${PORTAINER_GIT_DIR}:/code
|
||||||
@@ -44,6 +65,8 @@ services:
|
|||||||
MONOLITH_PROCESS_THREADS: "${MONOLITH_PROCESS_THREADS}"
|
MONOLITH_PROCESS_THREADS: "${MONOLITH_PROCESS_THREADS}"
|
||||||
# Enable performance metrics after message processing
|
# Enable performance metrics after message processing
|
||||||
MONOLITH_PROCESS_PERFSTATS: "${MONOLITH_PROCESS_PERFSTATS}"
|
MONOLITH_PROCESS_PERFSTATS: "${MONOLITH_PROCESS_PERFSTATS}"
|
||||||
|
MONOLITH_PROCESS_TARGET_CPU_USAGE: "${MONOLITH_PROCESS_TARGET_CPU_USAGE}"
|
||||||
|
MONOLITH_CH4_TARGET_CPU_USAGE: "${MONOLITH_CH4_TARGET_CPU_USAGE}"
|
||||||
MONOLITH_CH4_BOARDS: "${MONOLITH_CH4_BOARDS}"
|
MONOLITH_CH4_BOARDS: "${MONOLITH_CH4_BOARDS}"
|
||||||
REDIS_PASSWORD: "${REDIS_PASSWORD}"
|
REDIS_PASSWORD: "${REDIS_PASSWORD}"
|
||||||
MONOLITH_INGEST_INCREASE_BELOW: "${MONOLITH_INGEST_INCREASE_BELOW}"
|
MONOLITH_INGEST_INCREASE_BELOW: "${MONOLITH_INGEST_INCREASE_BELOW}"
|
||||||
|
|||||||
79
perf/throttle.py
Normal file
79
perf/throttle.py
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
import psutil
|
||||||
|
import asyncio
|
||||||
|
import time
|
||||||
|
import util
|
||||||
|
|
||||||
|
class DynamicThrottle(object):
|
||||||
|
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
self.target_cpu_usage = kwargs.get("target_cpu_usage", 50)
|
||||||
|
self.sleep_interval = 0.0
|
||||||
|
|
||||||
|
self.sleep_increment = kwargs.get("sleep_increment", 0.01)
|
||||||
|
self.sleep_decrement = kwargs.get("sleep_decrement", 0.01)
|
||||||
|
|
||||||
|
self.sleep_max = kwargs.get("sleep_max", 0.1)
|
||||||
|
self.sleep_min = kwargs.get("sleep_min", 0.01)
|
||||||
|
|
||||||
|
self.psutil_interval = kwargs.get("psutil_interval", 0.1)
|
||||||
|
|
||||||
|
self.log = kwargs.get("log", util.get_logger(self.__class__.__name__))
|
||||||
|
|
||||||
|
self.consecutive_increments = 0
|
||||||
|
self.consecutive_decrements = 0
|
||||||
|
|
||||||
|
self.last_was_increment = False
|
||||||
|
|
||||||
|
if kwargs.get("async"):
|
||||||
|
self.dynamic_throttle = self.dynamic_throttle_async
|
||||||
|
else:
|
||||||
|
self.dynamic_throttle = self.dynamic_throttle
|
||||||
|
|
||||||
|
|
||||||
|
async def dynamic_throttle_async(self):
|
||||||
|
"""
|
||||||
|
Dynamically sleeps before a request if CPU usage is above our target.
|
||||||
|
"""
|
||||||
|
current_cpu_usage = psutil.cpu_percent(interval=self.psutil_interval)
|
||||||
|
|
||||||
|
if current_cpu_usage > self.target_cpu_usage:
|
||||||
|
self.sleep_interval += self.sleep_increment
|
||||||
|
if self.sleep_interval > self.sleep_max:
|
||||||
|
self.sleep_interval = self.sleep_max
|
||||||
|
self.log.info(
|
||||||
|
f"CPU {current_cpu_usage}% > {self.target_cpu_usage}%, "
|
||||||
|
f"=> sleep {self.sleep_interval:.3f}s"
|
||||||
|
)
|
||||||
|
elif current_cpu_usage < self.target_cpu_usage and self.sleep_interval > self.sleep_min:
|
||||||
|
self.sleep_interval -= self.sleep_decrement
|
||||||
|
self.log.info(
|
||||||
|
f"CPU {current_cpu_usage}% < {self.target_cpu_usage}%, "
|
||||||
|
f"=> sleep {self.sleep_interval:.3f}s"
|
||||||
|
)
|
||||||
|
|
||||||
|
if self.sleep_interval > 0:
|
||||||
|
await asyncio.sleep(self.sleep_interval)
|
||||||
|
|
||||||
|
def dynamic_throttle(self):
|
||||||
|
"""
|
||||||
|
Dynamically sleeps before a request if CPU usage is above our target.
|
||||||
|
"""
|
||||||
|
current_cpu_usage = psutil.cpu_percent(interval=self.psutil_interval)
|
||||||
|
|
||||||
|
if current_cpu_usage > self.target_cpu_usage:
|
||||||
|
self.sleep_interval += self.sleep_increment
|
||||||
|
if self.sleep_interval > self.sleep_max:
|
||||||
|
self.sleep_interval = self.sleep_max
|
||||||
|
self.log.info(
|
||||||
|
f"CPU {current_cpu_usage}% > {self.target_cpu_usage}%, "
|
||||||
|
f"=> sleep {self.sleep_interval:.3f}s"
|
||||||
|
)
|
||||||
|
elif current_cpu_usage < self.target_cpu_usage and self.sleep_interval > self.sleep_min:
|
||||||
|
self.sleep_interval -= self.sleep_decrement
|
||||||
|
self.log.info(
|
||||||
|
f"CPU {current_cpu_usage}% < {self.target_cpu_usage}%, "
|
||||||
|
f"=> sleep {self.sleep_interval:.3f}s"
|
||||||
|
)
|
||||||
|
|
||||||
|
if self.sleep_interval > 0:
|
||||||
|
time.sleep(self.sleep_interval)
|
||||||
@@ -8,6 +8,9 @@ import string
|
|||||||
# For timing
|
# For timing
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
# For throttling
|
||||||
|
import psutil
|
||||||
|
|
||||||
# Squash errors
|
# Squash errors
|
||||||
import warnings
|
import warnings
|
||||||
from concurrent.futures import ProcessPoolExecutor
|
from concurrent.futures import ProcessPoolExecutor
|
||||||
@@ -57,6 +60,9 @@ KEYNAME = "queue"
|
|||||||
MONOLITH_PROCESS_PERFSTATS = (
|
MONOLITH_PROCESS_PERFSTATS = (
|
||||||
getenv("MONOLITH_PROCESS_PERFSTATS", "false").lower() in trues
|
getenv("MONOLITH_PROCESS_PERFSTATS", "false").lower() in trues
|
||||||
)
|
)
|
||||||
|
TARGET_CPU_USAGE = float(os.getenv("MONOLITH_PROCESS_TARGET_CPU_USAGE", 50.0))
|
||||||
|
|
||||||
|
SLEEP_INTERVAL = 0.0
|
||||||
|
|
||||||
CUSTOM_FILTERS = [
|
CUSTOM_FILTERS = [
|
||||||
lambda x: x.lower(),
|
lambda x: x.lower(),
|
||||||
@@ -143,6 +149,7 @@ async def spawn_processing_threads(chunk, length):
|
|||||||
|
|
||||||
|
|
||||||
def process_data(chunk, index, chunk_size):
|
def process_data(chunk, index, chunk_size):
|
||||||
|
global SLEEP_INTERVAL
|
||||||
log.debug(f"[{chunk}/{index}] Processing {chunk_size} messages")
|
log.debug(f"[{chunk}/{index}] Processing {chunk_size} messages")
|
||||||
to_store = []
|
to_store = []
|
||||||
|
|
||||||
@@ -155,11 +162,13 @@ def process_data(chunk, index, chunk_size):
|
|||||||
hash_time = 0.0
|
hash_time = 0.0
|
||||||
normal2_time = 0.0
|
normal2_time = 0.0
|
||||||
soup_time = 0.0
|
soup_time = 0.0
|
||||||
|
sleep_time = 0.0
|
||||||
|
|
||||||
total_time = 0.0
|
total_time = 0.0
|
||||||
|
|
||||||
# Initialise sentiment analyser
|
# Initialise sentiment analyser
|
||||||
analyzer = SentimentIntensityAnalyzer()
|
analyzer = SentimentIntensityAnalyzer()
|
||||||
|
|
||||||
for msg_index in range(chunk_size):
|
for msg_index in range(chunk_size):
|
||||||
msg = db.r.rpop(KEYNAME)
|
msg = db.r.rpop(KEYNAME)
|
||||||
if not msg:
|
if not msg:
|
||||||
@@ -207,7 +216,9 @@ def process_data(chunk, index, chunk_size):
|
|||||||
continue
|
continue
|
||||||
# pass
|
# pass
|
||||||
else:
|
else:
|
||||||
msg["type"] = "update"
|
# msg["type"] = "update"
|
||||||
|
# Fuck it, updates just brew spam
|
||||||
|
continue
|
||||||
db.r.set(redis_key, hash)
|
db.r.set(redis_key, hash)
|
||||||
time_took = (time.process_time() - start) * 1000
|
time_took = (time.process_time() - start) * 1000
|
||||||
hash_time += time_took
|
hash_time += time_took
|
||||||
@@ -289,6 +300,26 @@ def process_data(chunk, index, chunk_size):
|
|||||||
to_store.append(msg)
|
to_store.append(msg)
|
||||||
total_time += (time.process_time() - total_start) * 1000
|
total_time += (time.process_time() - total_start) * 1000
|
||||||
|
|
||||||
|
# Dynamic throttling to reduce CPU usage
|
||||||
|
if msg_index % 5 == 0:
|
||||||
|
current_cpu_usage = psutil.cpu_percent(interval=0.2)
|
||||||
|
if current_cpu_usage > TARGET_CPU_USAGE:
|
||||||
|
SLEEP_INTERVAL += 0.02
|
||||||
|
if SLEEP_INTERVAL > 0.5:
|
||||||
|
SLEEP_INTERVAL = 0.5
|
||||||
|
log.info(
|
||||||
|
f"CPU {current_cpu_usage}% > {TARGET_CPU_USAGE}%, "
|
||||||
|
f"=> sleep {SLEEP_INTERVAL:.3f}s"
|
||||||
|
)
|
||||||
|
elif current_cpu_usage < TARGET_CPU_USAGE and SLEEP_INTERVAL > 0.01:
|
||||||
|
SLEEP_INTERVAL -= 0.01
|
||||||
|
log.info(
|
||||||
|
f"CPU {current_cpu_usage}% < {TARGET_CPU_USAGE}%, "
|
||||||
|
f"=> sleep {SLEEP_INTERVAL:.3f}s"
|
||||||
|
)
|
||||||
|
time.sleep(SLEEP_INTERVAL)
|
||||||
|
sleep_time += SLEEP_INTERVAL
|
||||||
|
|
||||||
if MONOLITH_PROCESS_PERFSTATS:
|
if MONOLITH_PROCESS_PERFSTATS:
|
||||||
log.debug("=====================================")
|
log.debug("=====================================")
|
||||||
log.debug(f"Chunk: {chunk}")
|
log.debug(f"Chunk: {chunk}")
|
||||||
@@ -303,6 +334,7 @@ def process_data(chunk, index, chunk_size):
|
|||||||
log.debug(f"Normal2: {normal2_time}")
|
log.debug(f"Normal2: {normal2_time}")
|
||||||
log.debug(f"Soup: {soup_time}")
|
log.debug(f"Soup: {soup_time}")
|
||||||
log.debug(f"Total: {total_time}")
|
log.debug(f"Total: {total_time}")
|
||||||
|
log.debug(f"Throttling: {sleep_time}")
|
||||||
log.debug("=====================================")
|
log.debug("=====================================")
|
||||||
|
|
||||||
return to_store
|
return to_store
|
||||||
|
|||||||
@@ -23,3 +23,4 @@ uvloop
|
|||||||
elasticsearch[async]
|
elasticsearch[async]
|
||||||
msgpack
|
msgpack
|
||||||
# flpc
|
# flpc
|
||||||
|
psutil
|
||||||
|
|||||||
@@ -8,6 +8,8 @@ from os import getenv
|
|||||||
import aiohttp
|
import aiohttp
|
||||||
from numpy import array_split
|
from numpy import array_split
|
||||||
|
|
||||||
|
import psutil
|
||||||
|
|
||||||
import db
|
import db
|
||||||
import util
|
import util
|
||||||
|
|
||||||
@@ -25,12 +27,14 @@ CRAWL_DELAY = int(getenv("MONOLITH_CH4_CRAWL_DELAY", 5))
|
|||||||
# Semaphore value ?
|
# Semaphore value ?
|
||||||
THREADS_SEMAPHORE = int(getenv("MONOLITH_CH4_THREADS_SEMAPHORE", 1000))
|
THREADS_SEMAPHORE = int(getenv("MONOLITH_CH4_THREADS_SEMAPHORE", 1000))
|
||||||
|
|
||||||
|
# Target CPU usage percentage
|
||||||
|
TARGET_CPU_USAGE = float(getenv("MONOLITH_CH4_TARGET_CPU_USAGE", 50.0))
|
||||||
|
|
||||||
# Boards to crawl
|
# Boards to crawl
|
||||||
BOARDS = getenv("MONOLITH_CH4_BOARDS", "").split(",")
|
BOARDS = getenv("MONOLITH_CH4_BOARDS", "").split(",")
|
||||||
|
|
||||||
# CONFIGURATION END #
|
# CONFIGURATION END #
|
||||||
|
|
||||||
|
|
||||||
class Chan4(object):
|
class Chan4(object):
|
||||||
"""
|
"""
|
||||||
4chan indexer, crawler and ingester.
|
4chan indexer, crawler and ingester.
|
||||||
@@ -40,6 +44,8 @@ class Chan4(object):
|
|||||||
name = self.__class__.__name__
|
name = self.__class__.__name__
|
||||||
self.log = util.get_logger(name)
|
self.log = util.get_logger(name)
|
||||||
|
|
||||||
|
self.sleep_interval = 0.0
|
||||||
|
|
||||||
self.api_endpoint = "https://a.4cdn.org"
|
self.api_endpoint = "https://a.4cdn.org"
|
||||||
# self.boards = ["out", "g", "a", "3", "pol"] #
|
# self.boards = ["out", "g", "a", "3", "pol"] #
|
||||||
self.boards = []
|
self.boards = []
|
||||||
@@ -59,6 +65,33 @@ class Chan4(object):
|
|||||||
self.hash_key = self.hash_key.decode("ascii")
|
self.hash_key = self.hash_key.decode("ascii")
|
||||||
self.log.debug(f"Decoded hash key: {self.hash_key}")
|
self.log.debug(f"Decoded hash key: {self.hash_key}")
|
||||||
|
|
||||||
|
async def dynamic_throttle(self):
|
||||||
|
"""
|
||||||
|
Dynamically sleeps before a request if CPU usage is above our target.
|
||||||
|
Also, if CPU usage is far below the target, reduce the sleep time.
|
||||||
|
Caps the sleep interval at 0.2s.
|
||||||
|
Prints CPU usage and sleep interval like process.py.
|
||||||
|
"""
|
||||||
|
current_cpu_usage = psutil.cpu_percent(interval=0.2)
|
||||||
|
|
||||||
|
if current_cpu_usage > TARGET_CPU_USAGE:
|
||||||
|
self.sleep_interval += 0.01
|
||||||
|
if self.sleep_interval > 0.1:
|
||||||
|
self.sleep_interval = 0.1
|
||||||
|
self.log.info(
|
||||||
|
f"CPU {current_cpu_usage}% > {TARGET_CPU_USAGE}%, "
|
||||||
|
f"=> sleep {self.sleep_interval:.3f}s"
|
||||||
|
)
|
||||||
|
elif current_cpu_usage < TARGET_CPU_USAGE and self.sleep_interval > 0.01:
|
||||||
|
self.sleep_interval -= 0.01
|
||||||
|
self.log.info(
|
||||||
|
f"CPU {current_cpu_usage}% < {TARGET_CPU_USAGE}%, "
|
||||||
|
f"=> sleep {self.sleep_interval:.3f}s"
|
||||||
|
)
|
||||||
|
|
||||||
|
if self.sleep_interval > 0:
|
||||||
|
await asyncio.sleep(self.sleep_interval)
|
||||||
|
|
||||||
async def run(self):
|
async def run(self):
|
||||||
if "ALL" in BOARDS:
|
if "ALL" in BOARDS:
|
||||||
await self.get_board_list()
|
await self.get_board_list()
|
||||||
@@ -76,6 +109,8 @@ class Chan4(object):
|
|||||||
for board in response["boards"]:
|
for board in response["boards"]:
|
||||||
self.boards.append(board["board"])
|
self.boards.append(board["board"])
|
||||||
self.log.debug(f"Got boards: {self.boards}")
|
self.log.debug(f"Got boards: {self.boards}")
|
||||||
|
# await self.dynamic_throttle()
|
||||||
|
# TODO
|
||||||
|
|
||||||
async def get_thread_lists(self, boards):
|
async def get_thread_lists(self, boards):
|
||||||
# self.log.debug(f"Getting thread list for {boards}")
|
# self.log.debug(f"Getting thread list for {boards}")
|
||||||
@@ -91,6 +126,8 @@ class Chan4(object):
|
|||||||
for threads in page["threads"]:
|
for threads in page["threads"]:
|
||||||
no = threads["no"]
|
no = threads["no"]
|
||||||
to_get.append((board, no))
|
to_get.append((board, no))
|
||||||
|
# await self.dynamic_throttle()
|
||||||
|
# TODO
|
||||||
|
|
||||||
if not to_get:
|
if not to_get:
|
||||||
return
|
return
|
||||||
@@ -100,6 +137,8 @@ class Chan4(object):
|
|||||||
for index, thr in enumerate(split_threads):
|
for index, thr in enumerate(split_threads):
|
||||||
self.log.debug(f"Series {index} - getting {len(thr)} threads")
|
self.log.debug(f"Series {index} - getting {len(thr)} threads")
|
||||||
await self.get_threads_content(thr)
|
await self.get_threads_content(thr)
|
||||||
|
# await self.dynamic_throttle()
|
||||||
|
# TODO
|
||||||
await asyncio.sleep(THREADS_DELAY)
|
await asyncio.sleep(THREADS_DELAY)
|
||||||
|
|
||||||
def take_items(self, dict_list, n):
|
def take_items(self, dict_list, n):
|
||||||
@@ -130,6 +169,8 @@ class Chan4(object):
|
|||||||
continue
|
continue
|
||||||
board, thread = mapped
|
board, thread = mapped
|
||||||
all_posts[mapped] = response["posts"]
|
all_posts[mapped] = response["posts"]
|
||||||
|
# await self.dynamic_throttle()
|
||||||
|
# TODO
|
||||||
|
|
||||||
if not all_posts:
|
if not all_posts:
|
||||||
return
|
return
|
||||||
@@ -147,6 +188,8 @@ class Chan4(object):
|
|||||||
post["channel"] = thread
|
post["channel"] = thread
|
||||||
|
|
||||||
to_store.append(post)
|
to_store.append(post)
|
||||||
|
# await self.dynamic_throttle()
|
||||||
|
# TODO
|
||||||
|
|
||||||
if to_store:
|
if to_store:
|
||||||
await db.queue_message_bulk(to_store)
|
await db.queue_message_bulk(to_store)
|
||||||
@@ -161,6 +204,7 @@ class Chan4(object):
|
|||||||
async def bound_fetch(self, sem, url, session, mapped):
|
async def bound_fetch(self, sem, url, session, mapped):
|
||||||
# Getter function with semaphore.
|
# Getter function with semaphore.
|
||||||
async with sem:
|
async with sem:
|
||||||
|
await self.dynamic_throttle()
|
||||||
try:
|
try:
|
||||||
return await self.fetch(url, session, mapped)
|
return await self.fetch(url, session, mapped)
|
||||||
except: # noqa
|
except: # noqa
|
||||||
|
|||||||
Reference in New Issue
Block a user