Compare commits

..

4 Commits

14 changed files with 578 additions and 183 deletions

0
clients/mexc.py Normal file
View File

136
db.py
View File

@@ -1,18 +1,22 @@
import asyncio
from math import ceil from math import ceil
from os import getenv
from time import sleep
import aiomysql
import aioredis import aioredis
import manticoresearch import manticoresearch
import msgpack
import orjson import orjson
from manticoresearch.rest import ApiException from manticoresearch.rest import ApiException
from numpy import array_split from numpy import array_split
from redis import StrictRedis from redis import StrictRedis
import msgpack
import asyncio
import util import util
from schemas import mc_s from schemas import mc_s
from os import getenv
from time import sleep mysql_pool = None
configuration = manticoresearch.Configuration(host="http://127.0.0.1:9308") configuration = manticoresearch.Configuration(host="http://127.0.0.1:9308")
api_client = manticoresearch.ApiClient(configuration) api_client = manticoresearch.ApiClient(configuration)
@@ -24,24 +28,25 @@ log = util.get_logger("db")
# r = StrictRedis(unix_socket_path="/var/run/redis/redis.sock", db=0) # r = StrictRedis(unix_socket_path="/var/run/redis/redis.sock", db=0)
r = StrictRedis( r = StrictRedis(
host="127.0.0.1", # Replace with your Redis server's IP address host="127.0.0.1", # Replace with your Redis server's IP address
port=1289, # Replace with your Redis server's port port=1289, # Replace with your Redis server's port
db=0 # Database number db=0, # Database number
) )
# AIORedis # AIORedis
# ar = aioredis.from_url("unix:///var/run/redis/redis.sock", db=0) # ar = aioredis.from_url("unix:///var/run/redis/redis.sock", db=0)
ar = aioredis.from_url( ar = aioredis.from_url("redis://127.0.0.1:1289", db=0)
"redis://127.0.0.1:1289",
db=0
)
# /var/run/neptune-redis.sock # /var/run/neptune-redis.sock
# db = 10 # db = 10
pr = aioredis.from_url("unix://var/run/neptune-redis.sock", db=10) pr = aioredis.from_url("unix://var/run/neptune-redis.sock", db=10)
#pr = aioredis.from_url("redis://redis_neptune:6379", db=10, password=getenv("REDIS_PASSWORD")) # fr = aioredis.from_url("unix://var/run/fisk-redis.sock", db=10)
fr = aioredis.from_url("unix://var/run/redis.sock", db=10)
# pr = aioredis.from_url("redis://redis_neptune:6379", db=10, password=getenv("REDIS_PASSWORD"))
KEYNAME = "queue" KEYNAME = "queue"
MESSAGE_KEY = "messages" MESSAGE_KEY = "messages"
OHLC_MESSAGE_KEY = "ohlc"
TYPES_MAIN = [ TYPES_MAIN = [
"msg", "msg",
@@ -60,50 +65,68 @@ TYPES_META = ["who"]
TYPES_INT = ["conn", "highlight", "znc", "query", "self"] TYPES_INT = ["conn", "highlight", "znc", "query", "self"]
# def store_message(msg): async def init_mysql_pool():
# """ """
# Store a message into Manticore Initialize the MySQL connection pool.
# :param msg: dict """
# """ global mysql_pool
# # Duplicated to avoid extra function call mysql_pool = await aiomysql.create_pool(
# if msg["type"] in TYPES_MAIN: host="127.0.0.1", port=9306, db="Manticore", minsize=1, maxsize=10
# index = "main" )
# schema = mc_s.schema_main
# elif msg["type"] in TYPES_META:
# index = "meta"
# schema = mc_s.schema_meta
# elif msg["type"] in TYPES_INT:
# index = "internal"
# schema = mc_s.schema_int
# # normalise fields
# for key, value in list(msg.items()):
# if value is None:
# del msg[key]
# if key in schema:
# if isinstance(value, int):
# if schema[key].startswith("string") or schema[key].startswith("text"):
# msg[key] = str(value)
# body = [{"insert": {"index": index, "doc": msg}}]
# body_post = ""
# for item in body:
# body_post += orjson.dumps(item)
# body_post += "\n"
# # print(body_post) async def rts_store_message(index, data):
# try: """
# # Bulk index operations Store a RTS message into MySQL using an existing connection pool.
# api_response = api_instance.bulk(body_post) # , async_req=True Prioritizes instant PubSub delivery, with minimal data storage overhead.
# # print(api_response) :param index: str
# except ApiException as e: :param data: dict
# print("Exception when calling IndexApi->bulk: %s\n" % e) """
# print("ATTEMPT", body_post) # Publish to Redis PubSub
packed_index = msgpack.packb({"index": index, "data": data}, use_bin_type=True)
try:
await fr.publish(OHLC_MESSAGE_KEY, packed_index)
except aioredis.exceptions.ConnectionError as e:
raise e
await asyncio.sleep(0.1)
# Insert data into MySQL
try:
async with mysql_pool.acquire() as conn:
async with conn.cursor() as cur:
# Insert data into the table
query = f"""
INSERT INTO {index} (s, o, c, h, l, v, a, i, t, t2, ts)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
# Bind the values directly
await cur.execute(
query,
(
data["s"], # symbol
data["o"], # open
data["c"], # close
data["h"], # high
data["l"], # low
data["v"], # volume_base
data["a"], # volume_quote
data["i"], # interval
data["t"], # start_time
data["t2"], # end_time
data["ts"], # event_time
),
)
await conn.commit()
log.debug(f"Stored data for {data['s']} in MySQL.")
except aiomysql.Error as e:
log.error(f"MySQL error: {e}")
async def store_batch(data): async def store_batch(data):
""" """
Store a message into Manticore Store a message into Manticore
:param msg: dict :param data: list
""" """
if not data: if not data:
return return
@@ -161,12 +184,11 @@ async def store_batch(data):
body_post = "" body_post = ""
for item in total: for item in total:
#print("ITEM", item) # print("ITEM", item)
body_post += orjson.dumps(item).decode("utf-8") body_post += orjson.dumps(item).decode("utf-8")
body_post += "\n" body_post += "\n"
#print("BODY POST INDEX", index, body_post) # print("BODY POST INDEX", index, body_post)
try: try:
# Bulk index operations # Bulk index operations
@@ -186,6 +208,7 @@ def create_index(api_client):
util_instance = manticoresearch.UtilsApi(api_client) util_instance = manticoresearch.UtilsApi(api_client)
schemas = { schemas = {
"main": mc_s.schema_main, "main": mc_s.schema_main,
"rule_storage": mc_s.schema_rule_storage,
"meta": mc_s.schema_meta, "meta": mc_s.schema_meta,
"internal": mc_s.schema_int, "internal": mc_s.schema_int,
} }
@@ -216,14 +239,3 @@ async def queue_message_bulk(data):
# TODO: msgpack # TODO: msgpack
message = orjson.dumps(msg) message = orjson.dumps(msg)
await ar.lpush(KEYNAME, message) await ar.lpush(KEYNAME, message)
created = False
while not created:
try:
create_index(api_client)
created = True
except Exception as e:
print(f"Error creating index: {e}")
sleep(1) # Block the thread, just wait for the DB
update_schema()

View File

@@ -1,9 +1,32 @@
version: "2.2" version: "2.2"
services: services:
rts:
image: xf/monolith:latest
container_name: rts_monolith
command: sh -c '. /venv/bin/activate && exec python rts.py'
build: .
volumes:
- ${PORTAINER_GIT_DIR}:/code
- type: bind
source: /code/run
target: /var/run
environment:
PORTAINER_GIT_DIR: "${PORTAINER_GIT_DIR}"
MODULES_ENABLED: "${MODULES_ENABLED}"
MONOLITH_RTS_MEXC_API_ACCESS_KEY: "${MONOLITH_RTS_MEXC_API_ACCESS_KEY}"
MONOLITH_RTS_MEXC_API_SECRET_KEY: "${MONOLITH_RTS_MEXC_API_SECRET_KEY}"
deploy:
resources:
limits:
cpus: '0.5'
memory: 1.0G
network_mode: host
app: app:
image: xf/monolith:latest image: xf/monolith:latest
container_name: monolith container_name: monolith
#command: sh -c '. /venv/bin/activate && exec python -m cProfile -o /tmp/profile.out monolith.py'
build: . build: .
volumes: volumes:
- ${PORTAINER_GIT_DIR}:/code - ${PORTAINER_GIT_DIR}:/code
@@ -44,6 +67,8 @@ services:
MONOLITH_PROCESS_THREADS: "${MONOLITH_PROCESS_THREADS}" MONOLITH_PROCESS_THREADS: "${MONOLITH_PROCESS_THREADS}"
# Enable performance metrics after message processing # Enable performance metrics after message processing
MONOLITH_PROCESS_PERFSTATS: "${MONOLITH_PROCESS_PERFSTATS}" MONOLITH_PROCESS_PERFSTATS: "${MONOLITH_PROCESS_PERFSTATS}"
MONOLITH_PROCESS_TARGET_CPU_USAGE: "${MONOLITH_PROCESS_TARGET_CPU_USAGE}"
MONOLITH_CH4_TARGET_CPU_USAGE: "${MONOLITH_CH4_TARGET_CPU_USAGE}"
MONOLITH_CH4_BOARDS: "${MONOLITH_CH4_BOARDS}" MONOLITH_CH4_BOARDS: "${MONOLITH_CH4_BOARDS}"
REDIS_PASSWORD: "${REDIS_PASSWORD}" REDIS_PASSWORD: "${REDIS_PASSWORD}"
MONOLITH_INGEST_INCREASE_BELOW: "${MONOLITH_INGEST_INCREASE_BELOW}" MONOLITH_INGEST_INCREASE_BELOW: "${MONOLITH_INGEST_INCREASE_BELOW}"
@@ -60,64 +85,61 @@ services:
network_mode: host network_mode: host
# threshold: threshold:
# image: xf/threshold:latest image: xf/threshold:latest
# container_name: threshold container_name: threshold
# build: legacy/docker build: legacy/docker
# volumes: volumes:
# - ${PORTAINER_GIT_DIR}:/code - ${PORTAINER_GIT_DIR}:/code
# - ${THRESHOLD_CONFIG_DIR}:/code/legacy/conf/live - ${THRESHOLD_CONFIG_DIR}:/code/legacy/conf/live
# #- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates #- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates
# - ${THRESHOLD_CERT_DIR}:/code/legacy/conf/cert - ${THRESHOLD_CERT_DIR}:/code/legacy/conf/cert
# volumes_from: volumes_from:
# - tmp - tmp
# ports: ports:
# - "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}" - "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}"
# - "${THRESHOLD_RELAY_PORT}:${THRESHOLD_RELAY_PORT}" - "${THRESHOLD_RELAY_PORT}:${THRESHOLD_RELAY_PORT}"
# - "${THRESHOLD_API_PORT}:${THRESHOLD_API_PORT}" - "${THRESHOLD_API_PORT}:${THRESHOLD_API_PORT}"
# environment: environment:
# PORTAINER_GIT_DIR: "${PORTAINER_GIT_DIR}" PORTAINER_GIT_DIR: "${PORTAINER_GIT_DIR}"
# MODULES_ENABLED: "${MODULES_ENABLED}" MODULES_ENABLED: "${MODULES_ENABLED}"
# DISCORD_TOKEN: "${DISCORD_TOKEN}" DISCORD_TOKEN: "${DISCORD_TOKEN}"
# THRESHOLD_LISTENER_HOST: "${THRESHOLD_LISTENER_HOST}" THRESHOLD_LISTENER_HOST: "${THRESHOLD_LISTENER_HOST}"
# THRESHOLD_LISTENER_PORT: "${THRESHOLD_LISTENER_PORT}" THRESHOLD_LISTENER_PORT: "${THRESHOLD_LISTENER_PORT}"
# THRESHOLD_LISTENER_SSL: "${THRESHOLD_LISTENER_SSL}" THRESHOLD_LISTENER_SSL: "${THRESHOLD_LISTENER_SSL}"
# THRESHOLD_RELAY_ENABLED: "${THRESHOLD_RELAY_ENABLED}" THRESHOLD_RELAY_ENABLED: "${THRESHOLD_RELAY_ENABLED}"
# THRESHOLD_RELAY_HOST: "${THRESHOLD_RELAY_HOST}" THRESHOLD_RELAY_HOST: "${THRESHOLD_RELAY_HOST}"
# THRESHOLD_RELAY_PORT: "${THRESHOLD_RELAY_PORT}" THRESHOLD_RELAY_PORT: "${THRESHOLD_RELAY_PORT}"
# THRESHOLD_RELAY_SSL: "${THRESHOLD_RELAY_SSL}" THRESHOLD_RELAY_SSL: "${THRESHOLD_RELAY_SSL}"
# THRESHOLD_API_ENABLED: "${THRESHOLD_API_ENABLED}" THRESHOLD_API_ENABLED: "${THRESHOLD_API_ENABLED}"
# THRESHOLD_API_HOST: "${THRESHOLD_API_HOST}" THRESHOLD_API_HOST: "${THRESHOLD_API_HOST}"
# THRESHOLD_API_PORT: "${THRESHOLD_API_PORT}" THRESHOLD_API_PORT: "${THRESHOLD_API_PORT}"
# THRESHOLD_CONFIG_DIR: "${THRESHOLD_CONFIG_DIR}" THRESHOLD_CONFIG_DIR: "${THRESHOLD_CONFIG_DIR}"
# #THRESHOLD_TEMPLATE_DIR: "${#THRESHOLD_TEMPLATE_DIR}" #THRESHOLD_TEMPLATE_DIR: "${#THRESHOLD_TEMPLATE_DIR}"
# THRESHOLD_CERT_DIR: "${THRESHOLD_CERT_DIR}" THRESHOLD_CERT_DIR: "${THRESHOLD_CERT_DIR}"
# # How many messages to ingest at once from Redis # How many messages to ingest at once from Redis
# MONOLITH_INGEST_CHUNK_SIZE: "${MONOLITH_INGEST_CHUNK_SIZE}" MONOLITH_INGEST_CHUNK_SIZE: "${MONOLITH_INGEST_CHUNK_SIZE}"
# # Time to wait between polling Redis again # Time to wait between polling Redis again
# MONOLITH_INGEST_ITER_DELAY: "${MONOLITH_INGEST_ITER_DELAY}" MONOLITH_INGEST_ITER_DELAY: "${MONOLITH_INGEST_ITER_DELAY}"
# # Number of 4chan threads to request at once # Number of 4chan threads to request at once
# MONOLITH_CH4_THREADS_CONCURRENT: "${MONOLITH_CH4_THREADS_CONCURRENT}" MONOLITH_CH4_THREADS_CONCURRENT: "${MONOLITH_CH4_THREADS_CONCURRENT}"
# # Time to wait between every MONOLITH_CH4_THREADS_CONCURRENT threads # Time to wait between every MONOLITH_CH4_THREADS_CONCURRENT threads
# MONOLITH_CH4_THREADS_DELAY: "${MONOLITH_CH4_THREADS_DELAY}" MONOLITH_CH4_THREADS_DELAY: "${MONOLITH_CH4_THREADS_DELAY}"
# # Time to wait after finishing a crawl before starting again # Time to wait after finishing a crawl before starting again
# MONOLITH_CH4_CRAWL_DELAY: "${MONOLITH_CH4_CRAWL_DELAY}" MONOLITH_CH4_CRAWL_DELAY: "${MONOLITH_CH4_CRAWL_DELAY}"
# # Semaphore value # Semaphore value
# MONOLITH_CH4_THREADS_SEMAPHORE: "${MONOLITH_CH4_THREADS_SEMAPHORE}" MONOLITH_CH4_THREADS_SEMAPHORE: "${MONOLITH_CH4_THREADS_SEMAPHORE}"
# # Threads to use for data processing # Threads to use for data processing
# # Leave uncommented to use all available threads # Leave uncommented to use all available threads
# MONOLITH_PROCESS_THREADS: "${MONOLITH_PROCESS_THREADS}" MONOLITH_PROCESS_THREADS: "${MONOLITH_PROCESS_THREADS}"
# # Enable performance metrics after message processing # Enable performance metrics after message processing
# MONOLITH_PROCESS_PERFSTATS: "${MONOLITH_PROCESS_PERFSTATS}" MONOLITH_PROCESS_PERFSTATS: "${MONOLITH_PROCESS_PERFSTATS}"
# MONOLITH_CH4_BOARDS: "${MONOLITH_CH4_BOARDS}" MONOLITH_CH4_BOARDS: "${MONOLITH_CH4_BOARDS}"
# REDIS_PASSWORD: "${REDIS_PASSWORD}" REDIS_PASSWORD: "${REDIS_PASSWORD}"
# # for development # for development
# extra_hosts: extra_hosts:
# - "host.docker.internal:host-gateway" - "host.docker.internal:host-gateway"
# networks: network_mode: host
# - default
# - xf
# - db
ssdb: ssdb:
image: tsl0922/ssdb image: tsl0922/ssdb

View File

@@ -1,8 +1,10 @@
import asyncio import asyncio
from os import getenv from os import getenv
from time import sleep
import uvloop import uvloop
import db
import util import util
from sources.ch4 import Chan4 from sources.ch4 import Chan4
from sources.dis import DiscordClient from sources.dis import DiscordClient
@@ -34,6 +36,17 @@ async def main(loop):
loop.create_task(chan.run()) loop.create_task(chan.run())
created = False
while not created:
try:
db.create_index(db.api_client)
created = True
except Exception as e:
print(f"Error creating index: {e}")
sleep(1) # Block the thread, just wait for the DB
db.update_schema()
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
loop.create_task(main(loop)) loop.create_task(main(loop))

0
oom Normal file
View File

0
perf/__init__.py Normal file
View File

134
perf/throttle.py Normal file
View File

@@ -0,0 +1,134 @@
import asyncio
import time
import psutil
import util
class DynamicThrottle(object):
def __init__(self, **kwargs):
self.target_cpu_usage = kwargs.get("target_cpu_usage", 50)
self.sleep_interval = 0.0
self.sleep_increment = kwargs.get("sleep_increment", 0.01)
self.sleep_decrement = kwargs.get("sleep_decrement", 0.01)
self.sleep_max = kwargs.get("sleep_max", 0.1)
self.sleep_min = kwargs.get("sleep_min", 0.01)
self.psutil_interval = kwargs.get("psutil_interval", 0.1)
self.log = kwargs.get("log", util.get_logger(self.__class__.__name__))
self.consecutive_increments = 0
self.consecutive_decrements = 0
self.consecutive_divisor = kwargs.get("consecutive_divisor", 1)
self.last_was_increment = kwargs.get("start_increment", True)
if kwargs.get("use_async"):
self.wait = self.dynamic_throttle_async
else:
self.wait = self.dynamic_throttle
async def dynamic_throttle_async(self):
"""
Dynamically sleeps before a request if CPU usage is above our target.
"""
current_cpu_usage = psutil.cpu_percent(interval=self.psutil_interval)
if current_cpu_usage > self.target_cpu_usage:
if self.last_was_increment:
self.consecutive_increments += 1
# self.log.debug(f"High CPU consecutive increments: {self.consecutive_increments}")
else:
self.consecutive_increments = 0 # ?
self.consecutive_decrements = 0 # ?
# self.log.debug(f"High CPU alert reset.")
self.sleep_interval += self.sleep_increment * (
max(1, self.consecutive_increments) / self.consecutive_divisor
)
self.last_was_increment = True
if self.sleep_interval > self.sleep_max:
self.sleep_interval = self.sleep_max
# self.log.debug(f"High CPU, but not increasing above {self.sleep_max:.3f}s")
# self.log.debug(
# f"High CPU: {current_cpu_usage}% > {self.target_cpu_usage}%, "
# f"=> sleep {self.sleep_interval:.3f}s"
# )
elif current_cpu_usage < self.target_cpu_usage:
if not self.last_was_increment:
self.consecutive_decrements += 1
# self.log.debug(f"Low CPU consecutive decrements: {self.consecutive_decrements}")
else:
self.consecutive_decrements = 0 # ?
self.consecutive_increments = 0 # ?
# self.log.debug(f"Low CPU alert reset.")
self.sleep_interval -= self.sleep_decrement * (
max(1, self.consecutive_decrements) / self.consecutive_divisor
)
self.last_was_increment = False
if self.sleep_interval < self.sleep_min:
self.sleep_interval = self.sleep_min
# self.log.debug(f"Low CPU, but not decreasing below {self.sleep_min:.3f}s")
# self.log.debug(
# f"Low CPU: {current_cpu_usage}% < {self.target_cpu_usage}%, "
# f"=> sleep {self.sleep_interval:.3f}s"
# )
if self.sleep_interval > 0:
await asyncio.sleep(self.sleep_interval)
return self.sleep_interval
return 0.0
def dynamic_throttle(self):
"""
Dynamically sleeps before a request if CPU usage is above our target.
"""
current_cpu_usage = psutil.cpu_percent(interval=self.psutil_interval)
if current_cpu_usage > self.target_cpu_usage:
if self.last_was_increment:
self.consecutive_increments += 1
# self.log.debug(f"High CPU consecutive increments: {self.consecutive_increments}")
else:
self.consecutive_increments = 0 # ?
self.consecutive_decrements = 0 # ?
# self.log.debug(f"High CPU alert reset.")
self.sleep_interval += self.sleep_increment * (
max(1, self.consecutive_increments) / self.consecutive_divisor
)
self.last_was_increment = True
if self.sleep_interval > self.sleep_max:
self.sleep_interval = self.sleep_max
# self.log.debug(f"High CPU, but not increasing above {self.sleep_max:.3f}s")
# self.log.debug(
# f"High CPU: {current_cpu_usage}% > {self.target_cpu_usage}%, "
# f"=> sleep {self.sleep_interval:.3f}s"
# )
elif current_cpu_usage < self.target_cpu_usage:
if not self.last_was_increment:
self.consecutive_decrements += 1
# self.log.debug(f"Low CPU consecutive decrements: {self.consecutive_decrements}")
else:
self.consecutive_decrements = 0 # ?
self.consecutive_increments = 0 # ?
# self.log.debug(f"Low CPU alert reset.")
self.sleep_interval -= self.sleep_decrement * (
max(1, self.consecutive_decrements) / self.consecutive_divisor
)
self.last_was_increment = False
if self.sleep_interval < self.sleep_min:
self.sleep_interval = self.sleep_min
# self.log.debug(f"Low CPU, but not decreasing below {self.sleep_min:.3f}s")
# self.log.debug(
# f"Low CPU: {current_cpu_usage}% < {self.target_cpu_usage}%, "
# f"=> sleep {self.sleep_interval:.3f}s"
# )
if self.sleep_interval > 0:
time.sleep(self.sleep_interval)
return self.sleep_interval
return 0.0

1
processing/ohlc.py Normal file
View File

@@ -0,0 +1 @@
# Resample 1Min into 5Min, 15Min, 30Min, 1H, 4H, 1D, 1W, 1M, 1Y

View File

@@ -47,6 +47,9 @@ from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import db import db
import util import util
# For throttling
from perf.throttle import DynamicThrottle
# 4chan schema # 4chan schema
from schemas.ch4_s import ATTRMAP from schemas.ch4_s import ATTRMAP
@@ -57,6 +60,7 @@ KEYNAME = "queue"
MONOLITH_PROCESS_PERFSTATS = ( MONOLITH_PROCESS_PERFSTATS = (
getenv("MONOLITH_PROCESS_PERFSTATS", "false").lower() in trues getenv("MONOLITH_PROCESS_PERFSTATS", "false").lower() in trues
) )
TARGET_CPU_USAGE = float(os.getenv("MONOLITH_PROCESS_TARGET_CPU_USAGE", 50.0))
CUSTOM_FILTERS = [ CUSTOM_FILTERS = [
lambda x: x.lower(), lambda x: x.lower(),
@@ -88,6 +92,19 @@ CPU_THREADS = int(os.getenv("MONOLITH_PROCESS_THREADS", os.cpu_count()))
p = ProcessPoolExecutor(CPU_THREADS) p = ProcessPoolExecutor(CPU_THREADS)
throttle = DynamicThrottle(
target_cpu_usage=TARGET_CPU_USAGE,
sleep_increment=0.02,
sleep_decrement=0.01,
sleep_max=0.5,
sleep_min=0,
psutil_interval=0.1,
consecutive_divisor=2,
log=log,
start_increment=True,
use_async=False,
)
def get_hash_key(): def get_hash_key():
hash_key = db.r.get("hashing_key") hash_key = db.r.get("hashing_key")
@@ -130,7 +147,7 @@ async def spawn_processing_threads(chunk, length):
# Join the results back from the split list # Join the results back from the split list
flat_list = [item for sublist in results for item in sublist] flat_list = [item for sublist in results for item in sublist]
total_messages = len(flat_list) total_messages = len(flat_list)
log.debug( log.info(
( (
f"[{chunk}/{index}] Results from processing of {length} messages in " f"[{chunk}/{index}] Results from processing of {length} messages in "
f"{cores} threads: {len(flat_list)}" f"{cores} threads: {len(flat_list)}"
@@ -152,20 +169,38 @@ def process_data(chunk, index, chunk_size):
date_time = 0.0 date_time = 0.0
nlp_time = 0.0 nlp_time = 0.0
normalise_time = 0.0 normalise_time = 0.0
hash_time = 0.0
normal2_time = 0.0 normal2_time = 0.0
soup_time = 0.0 soup_time = 0.0
sleep_time = 0.0
total_time = 0.0 total_time = 0.0
# Initialise sentiment analyser # Initialise sentiment analyser
analyzer = SentimentIntensityAnalyzer() analyzer = SentimentIntensityAnalyzer()
for msg_index in range(chunk_size): for msg_index in range(chunk_size):
# Print percentage of msg_index relative to chunk_size
if msg_index % 10 == 0:
percentage_done = (msg_index / chunk_size) * 100
log.debug(
f"[{chunk}/{index}] {percentage_done:.2f}% done ({msg_index}/{chunk_size})"
)
msg = db.r.rpop(KEYNAME) msg = db.r.rpop(KEYNAME)
if not msg: if not msg:
return return
# TODO: msgpack
msg = orjson.loads(msg) msg = orjson.loads(msg)
if msg["src"] == "4ch":
board = msg["net"]
thread = msg["channel"]
redis_key = (
f"cache.{board}.{thread}.{msg['no']}.{msg['resto']}.{msg['now']}"
)
key_content = db.r.get(redis_key)
if key_content is not None:
continue
db.r.set(redis_key, "1")
total_start = time.process_time() total_start = time.process_time()
# normalise fields # normalise fields
start = time.process_time() start = time.process_time()
@@ -191,27 +226,6 @@ def process_data(chunk, index, chunk_size):
board = msg["net"] board = msg["net"]
thread = msg["channel"] thread = msg["channel"]
# Calculate hash for post
start = time.process_time()
post_normalised = orjson.dumps(msg, option=orjson.OPT_SORT_KEYS)
hash = siphash(hash_key, post_normalised)
hash = str(hash)
redis_key = (
f"cache.{board}.{thread}.{msg['no']}.{msg['resto']}.{msg['now']}"
)
key_content = db.r.get(redis_key)
if key_content is not None:
key_content = key_content.decode("ascii")
if key_content == hash:
# This deletes the message since the append at the end won't be hit
continue
# pass
else:
msg["type"] = "update"
db.r.set(redis_key, hash)
time_took = (time.process_time() - start) * 1000
hash_time += time_took
start = time.process_time() start = time.process_time()
for key2, value in list(msg.items()): for key2, value in list(msg.items()):
if key2 in ATTRMAP: if key2 in ATTRMAP:
@@ -229,9 +243,10 @@ def process_data(chunk, index, chunk_size):
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S") old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S")
else: else:
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M") old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M")
# new_ts = old_ts.isoformat() # iso_ts = old_ts.isoformat()
new_ts = int(old_ts.timestamp()) new_ts = int(old_ts.timestamp())
msg["ts"] = new_ts msg["ts"] = new_ts
# msg["iso"] = iso_ts
else: else:
raise Exception("No TS in msg") raise Exception("No TS in msg")
time_took = (time.process_time() - start) * 1000 time_took = (time.process_time() - start) * 1000
@@ -289,20 +304,24 @@ def process_data(chunk, index, chunk_size):
to_store.append(msg) to_store.append(msg)
total_time += (time.process_time() - total_start) * 1000 total_time += (time.process_time() - total_start) * 1000
# Dynamic throttling to reduce CPU usage
if msg_index % 5 == 0:
sleep_time += throttle.wait()
if MONOLITH_PROCESS_PERFSTATS: if MONOLITH_PROCESS_PERFSTATS:
log.debug("=====================================") log.info("=====================================")
log.debug(f"Chunk: {chunk}") log.info(f"Chunk: {chunk}")
log.debug(f"Index: {index}") log.info(f"Index: {index}")
log.debug(f"Sentiment: {sentiment_time}") log.info(f"Sentiment: {sentiment_time}")
log.debug(f"Regex: {regex_time}") log.info(f"Regex: {regex_time}")
log.debug(f"Polyglot: {polyglot_time}") log.info(f"Polyglot: {polyglot_time}")
log.debug(f"Date: {date_time}") log.info(f"Date: {date_time}")
log.debug(f"NLP: {nlp_time}") log.info(f"NLP: {nlp_time}")
log.debug(f"Normalise: {normalise_time}") log.info(f"Normalise: {normalise_time}")
log.debug(f"Hash: {hash_time}") log.info(f"Normal2: {normal2_time}")
log.debug(f"Normal2: {normal2_time}") log.info(f"Soup: {soup_time}")
log.debug(f"Soup: {soup_time}") log.info(f"Total: {total_time}")
log.debug(f"Total: {total_time}") log.info(f"Throttling: {sleep_time}")
log.debug("=====================================") log.info("=====================================")
return to_store return to_store

View File

@@ -23,3 +23,7 @@ uvloop
elasticsearch[async] elasticsearch[async]
msgpack msgpack
# flpc # flpc
psutil
pymexc
websockets
aiomysql

193
rts.py
View File

@@ -1,31 +1,186 @@
import asyncio import asyncio
import logging
from os import getenv from os import getenv
import uvloop import orjson
import websockets
import util import db
# Use UVLoop # Logger setup
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) logging.basicConfig(level=logging.INFO)
log = logging.getLogger("RTS")
log = util.get_logger("rts") # Environment variables
MONOLITH_RTS_MEXC_API_ACCESS_KEY = getenv("MONOLITH_RTS_MEXC_API_ACCESS_KEY", None)
MONOLITH_RTS_MEXC_API_SECRET_KEY = getenv("MONOLITH_RTS_MEXC_API_SECRET_KEY", None)
modules_enabled = getenv("MODULES_ENABLED", False) # WebSocket endpoint
if "rts" not in modules_enabled: MEXC_WS_URL = "wss://wbs.mexc.com/ws"
log.info("RTS disabled.")
exit(0) {
"d": {
"e": "spot@public.kline.v3.api",
"k": {
"t": 1737901140, # TS
"o": "684.4", # Open
"c": "684.5", # Close
"h": "684.5", # High
"l": "684.4", # Low
"v": "0.173", # Volume of the base
"a": "118.41", # Volume of the quote (Quantity)
"T": 1737901200, # ?
"i": "Min1", # ?
},
},
"c": "spot@public.kline.v3.api@BNBUSDT@Min1", # Channel
"t": 1737901159239,
"s": "BNBUSDT", # Symbol
}
# Scan DB for last endtime (T)
# Request Kline data from last endtime (T) to now
async def main(loop): # Check Server Time
log.info("RTS started.")
# Response
# {
# "serverTime" : 1645539742000
# }
# GET /api/v3/time
# Weight(IP): 1
# Parameter:
# NONE
# Kline/Candlestick Data
# Response
# [
# [
# 1640804880000,
# "47482.36",
# "47482.36",
# "47416.57",
# "47436.1",
# "3.550717",
# 1640804940000,
# "168387.3"
# ]
# ]
# GET /api/v3/klines
# Weight(IP): 1
# Kline/candlestick bars for a symbol. Klines are uniquely identified by their open time.
# Parameters:
# Name Type Mandatory Description
# symbol string YES
# interval ENUM YES ENUM: Kline Interval
# startTime long NO
# endTime long NO
# limit integer NO Default 500; max 1000.
# Scrub function:
# For each record, ensure there are no time gaps
# When the 1m window goes over, the next t is always the last T.
# Check for gaps, and request all klines between those gaps to ensure a full DB, even with restarts.
loop = asyncio.get_event_loop() # Idle jitter function - compare our time with server time.
loop.create_task(main(loop)) # Compare ts with our time and print jitter. Add jitter warning to log and OHLC.
# High jitter may prevent us from getting the correct data for trading.
async def mex_handle(data):
message = orjson.loads(data)
# print(orjson.dumps(message, option=orjson.OPT_INDENT_2).decode("utf-8"))
if "code" in message:
if message["code"] == 0:
log.info("Control message received")
return
try: symbol = message["s"]
loop.run_forever() open = message["d"]["k"]["o"]
except KeyboardInterrupt: close = message["d"]["k"]["c"]
log.info("RTS process terminating") high = message["d"]["k"]["h"]
finally: low = message["d"]["k"]["l"]
loop.close() volume_base = message["d"]["k"]["v"] # ERROR IN API DOCS
volume_quote = message["d"]["k"]["a"] # > a bigDecimal volume
interval = message["d"]["k"]["i"]
start_time = message["d"]["k"]["t"] # > t long stratTime
end_time = message["d"]["k"]["T"] # > T long endTime
event_time = message["t"] # t long eventTime
index = f"mex_ohlc_{symbol.lower()}"
reformatted = {
"s": symbol,
"o": float(open),
"c": float(close),
"h": float(high),
"l": float(low),
"v": float(volume_base),
"a": float(volume_quote),
"i": interval,
"t": int(start_time),
"t2": int(end_time),
"ts": int(event_time),
}
await db.rts_store_message(index, reformatted)
print(index)
print(orjson.dumps(reformatted, option=orjson.OPT_INDENT_2).decode("utf-8"))
print()
# Kline WebSocket handler
async def mex_main():
await db.init_mysql_pool()
async with websockets.connect(MEXC_WS_URL) as websocket:
log.info("WebSocket connected")
# Define symbols and intervals
symbols = ["BTCUSDT"] # Add more symbols as needed
interval = "Min1" # Kline interval
# Prepare subscription requests for Kline streams
# Request: spot@public.kline.v3.api@<symbol>@<interval>
subscriptions = [
f"spot@public.kline.v3.api@{symbol}@{interval}" for symbol in symbols
]
# Send subscription requests
subscribe_request = {
"method": "SUBSCRIPTION",
"params": subscriptions,
# "id": 1,
}
await websocket.send(orjson.dumps(subscribe_request).decode("utf-8"))
log.info(f"Subscribed to: {subscriptions}")
# Listen for messages
while True:
try:
message = await websocket.recv()
await mex_handle(message)
except websockets.exceptions.ConnectionClosed as e:
log.error(f"WebSocket connection closed: {e}")
break
# Entry point
if __name__ == "__main__":
try:
asyncio.run(mex_main())
except KeyboardInterrupt:
log.info("RTS process terminated.")

View File

@@ -137,9 +137,11 @@ schema_main = {
"rule_id": "bigint", "rule_id": "bigint",
"index": "string indexed attribute", "index": "string indexed attribute",
"meta": "text", "meta": "text",
# "iso": "string indexed attribute",
} }
schema_rule_storage = schema_main
schema_meta = { schema_meta = {
"id": "bigint", "id": "bigint",
# 393598265, #main, Rust Programmer's Club # 393598265, #main, Rust Programmer's Club

View File

@@ -10,6 +10,7 @@ from numpy import array_split
import db import db
import util import util
from perf.throttle import DynamicThrottle
# CONFIGURATION # # CONFIGURATION #
@@ -25,6 +26,9 @@ CRAWL_DELAY = int(getenv("MONOLITH_CH4_CRAWL_DELAY", 5))
# Semaphore value ? # Semaphore value ?
THREADS_SEMAPHORE = int(getenv("MONOLITH_CH4_THREADS_SEMAPHORE", 1000)) THREADS_SEMAPHORE = int(getenv("MONOLITH_CH4_THREADS_SEMAPHORE", 1000))
# Target CPU usage percentage
TARGET_CPU_USAGE = float(getenv("MONOLITH_CH4_TARGET_CPU_USAGE", 50.0))
# Boards to crawl # Boards to crawl
BOARDS = getenv("MONOLITH_CH4_BOARDS", "").split(",") BOARDS = getenv("MONOLITH_CH4_BOARDS", "").split(",")
@@ -40,6 +44,19 @@ class Chan4(object):
name = self.__class__.__name__ name = self.__class__.__name__
self.log = util.get_logger(name) self.log = util.get_logger(name)
self.throttle = DynamicThrottle(
target_cpu_usage=TARGET_CPU_USAGE,
sleep_increment=0.01,
sleep_decrement=0.01,
sleep_max=0.1,
sleep_min=0,
psutil_interval=0.1,
log=self.log,
start_increment=False,
use_async=True,
)
self.wait = self.throttle.wait
self.api_endpoint = "https://a.4cdn.org" self.api_endpoint = "https://a.4cdn.org"
# self.boards = ["out", "g", "a", "3", "pol"] # # self.boards = ["out", "g", "a", "3", "pol"] #
self.boards = [] self.boards = []
@@ -76,6 +93,8 @@ class Chan4(object):
for board in response["boards"]: for board in response["boards"]:
self.boards.append(board["board"]) self.boards.append(board["board"])
self.log.debug(f"Got boards: {self.boards}") self.log.debug(f"Got boards: {self.boards}")
# await self.dynamic_throttle()
# TODO
async def get_thread_lists(self, boards): async def get_thread_lists(self, boards):
# self.log.debug(f"Getting thread list for {boards}") # self.log.debug(f"Getting thread list for {boards}")
@@ -91,6 +110,8 @@ class Chan4(object):
for threads in page["threads"]: for threads in page["threads"]:
no = threads["no"] no = threads["no"]
to_get.append((board, no)) to_get.append((board, no))
# await self.dynamic_throttle()
# TODO
if not to_get: if not to_get:
return return
@@ -100,6 +121,8 @@ class Chan4(object):
for index, thr in enumerate(split_threads): for index, thr in enumerate(split_threads):
self.log.debug(f"Series {index} - getting {len(thr)} threads") self.log.debug(f"Series {index} - getting {len(thr)} threads")
await self.get_threads_content(thr) await self.get_threads_content(thr)
# await self.dynamic_throttle()
# TODO
await asyncio.sleep(THREADS_DELAY) await asyncio.sleep(THREADS_DELAY)
def take_items(self, dict_list, n): def take_items(self, dict_list, n):
@@ -130,6 +153,8 @@ class Chan4(object):
continue continue
board, thread = mapped board, thread = mapped
all_posts[mapped] = response["posts"] all_posts[mapped] = response["posts"]
# await self.dynamic_throttle()
# TODO
if not all_posts: if not all_posts:
return return
@@ -147,6 +172,8 @@ class Chan4(object):
post["channel"] = thread post["channel"] = thread
to_store.append(post) to_store.append(post)
# await self.dynamic_throttle()
# TODO
if to_store: if to_store:
await db.queue_message_bulk(to_store) await db.queue_message_bulk(to_store)
@@ -161,6 +188,7 @@ class Chan4(object):
async def bound_fetch(self, sem, url, session, mapped): async def bound_fetch(self, sem, url, session, mapped):
# Getter function with semaphore. # Getter function with semaphore.
async with sem: async with sem:
await self.wait()
try: try:
return await self.fetch(url, session, mapped) return await self.fetch(url, session, mapped)
except: # noqa except: # noqa

View File

@@ -21,6 +21,7 @@ log = util.get_logger("ingest")
INGEST_MAX = int(getenv("MONOLITH_INGEST_MAX", "1000000")) INGEST_MAX = int(getenv("MONOLITH_INGEST_MAX", "1000000"))
INGEST_MIN = int(getenv("MONOLITH_INGEST_MIN", "100")) INGEST_MIN = int(getenv("MONOLITH_INGEST_MIN", "100"))
class Ingest(object): class Ingest(object):
def __init__(self): def __init__(self):
name = self.__class__.__name__ name = self.__class__.__name__
@@ -51,7 +52,7 @@ class Ingest(object):
if ingested < INGEST_INCREASE_BELOW: if ingested < INGEST_INCREASE_BELOW:
if CHUNK_SIZE + INGEST_INCREASE_BY < INGEST_MAX: if CHUNK_SIZE + INGEST_INCREASE_BY < INGEST_MAX:
self.log.info( self.log.debug(
( (
f"Increasing chunk size to " f"Increasing chunk size to "
f"{CHUNK_SIZE + INGEST_INCREASE_BY} " f"{CHUNK_SIZE + INGEST_INCREASE_BY} "
@@ -60,11 +61,13 @@ class Ingest(object):
) )
CHUNK_SIZE += INGEST_INCREASE_BY CHUNK_SIZE += INGEST_INCREASE_BY
else: else:
log.info(f"Chunk size ({CHUNK_SIZE}) at maximum, not increasing above: {INGEST_MAX}") log.debug(
f"Chunk size ({CHUNK_SIZE}) at maximum, not increasing above: {INGEST_MAX}"
)
elif ingested > INGEST_DECREASE_ABOVE: elif ingested > INGEST_DECREASE_ABOVE:
if CHUNK_SIZE - INGEST_DECREASE_BY > INGEST_MIN: if CHUNK_SIZE - INGEST_DECREASE_BY > INGEST_MIN:
self.log.info( self.log.debug(
( (
f"Decreasing chunk size to " f"Decreasing chunk size to "
f"{CHUNK_SIZE - INGEST_DECREASE_BY}" f"{CHUNK_SIZE - INGEST_DECREASE_BY}"
@@ -73,4 +76,6 @@ class Ingest(object):
) )
CHUNK_SIZE -= INGEST_DECREASE_BY CHUNK_SIZE -= INGEST_DECREASE_BY
else: else:
log.info(f"Chunk size ({CHUNK_SIZE}) at minimum, not decreasing below: {INGEST_MIN}") log.debug(
f"Chunk size ({CHUNK_SIZE}) at minimum, not decreasing below: {INGEST_MIN}"
)