Compare commits

...

1 Commits

Author SHA1 Message Date
81f05d4263 Begin implementing RTS 2026-02-17 12:14:29 +00:00
14 changed files with 484 additions and 268 deletions

0
clients/mexc.py Normal file
View File

136
db.py
View File

@@ -1,18 +1,22 @@
import asyncio
from math import ceil
from os import getenv
from time import sleep
import aiomysql
import aioredis
import manticoresearch
import msgpack
import orjson
from manticoresearch.rest import ApiException
from numpy import array_split
from redis import StrictRedis
import msgpack
import asyncio
import util
from schemas import mc_s
from os import getenv
from time import sleep
mysql_pool = None
configuration = manticoresearch.Configuration(host="http://127.0.0.1:9308")
api_client = manticoresearch.ApiClient(configuration)
@@ -24,24 +28,25 @@ log = util.get_logger("db")
# r = StrictRedis(unix_socket_path="/var/run/redis/redis.sock", db=0)
r = StrictRedis(
host="127.0.0.1", # Replace with your Redis server's IP address
port=1289, # Replace with your Redis server's port
db=0 # Database number
port=1289, # Replace with your Redis server's port
db=0, # Database number
)
# AIORedis
# ar = aioredis.from_url("unix:///var/run/redis/redis.sock", db=0)
ar = aioredis.from_url(
"redis://127.0.0.1:1289",
db=0
)
ar = aioredis.from_url("redis://127.0.0.1:1289", db=0)
# /var/run/neptune-redis.sock
# db = 10
pr = aioredis.from_url("unix://var/run/neptune-redis.sock", db=10)
#pr = aioredis.from_url("redis://redis_neptune:6379", db=10, password=getenv("REDIS_PASSWORD"))
# fr = aioredis.from_url("unix://var/run/fisk-redis.sock", db=10)
fr = aioredis.from_url("unix://var/run/redis.sock", db=10)
# pr = aioredis.from_url("redis://redis_neptune:6379", db=10, password=getenv("REDIS_PASSWORD"))
KEYNAME = "queue"
MESSAGE_KEY = "messages"
OHLC_MESSAGE_KEY = "ohlc"
TYPES_MAIN = [
"msg",
@@ -60,50 +65,68 @@ TYPES_META = ["who"]
TYPES_INT = ["conn", "highlight", "znc", "query", "self"]
# def store_message(msg):
# """
# Store a message into Manticore
# :param msg: dict
# """
# # Duplicated to avoid extra function call
# if msg["type"] in TYPES_MAIN:
# index = "main"
# schema = mc_s.schema_main
# elif msg["type"] in TYPES_META:
# index = "meta"
# schema = mc_s.schema_meta
# elif msg["type"] in TYPES_INT:
# index = "internal"
# schema = mc_s.schema_int
# # normalise fields
# for key, value in list(msg.items()):
# if value is None:
# del msg[key]
# if key in schema:
# if isinstance(value, int):
# if schema[key].startswith("string") or schema[key].startswith("text"):
# msg[key] = str(value)
async def init_mysql_pool():
"""
Initialize the MySQL connection pool.
"""
global mysql_pool
mysql_pool = await aiomysql.create_pool(
host="127.0.0.1", port=9306, db="Manticore", minsize=1, maxsize=10
)
# body = [{"insert": {"index": index, "doc": msg}}]
# body_post = ""
# for item in body:
# body_post += orjson.dumps(item)
# body_post += "\n"
# # print(body_post)
# try:
# # Bulk index operations
# api_response = api_instance.bulk(body_post) # , async_req=True
# # print(api_response)
# except ApiException as e:
# print("Exception when calling IndexApi->bulk: %s\n" % e)
# print("ATTEMPT", body_post)
async def rts_store_message(index, data):
"""
Store a RTS message into MySQL using an existing connection pool.
Prioritizes instant PubSub delivery, with minimal data storage overhead.
:param index: str
:param data: dict
"""
# Publish to Redis PubSub
packed_index = msgpack.packb({"index": index, "data": data}, use_bin_type=True)
try:
await fr.publish(OHLC_MESSAGE_KEY, packed_index)
except aioredis.exceptions.ConnectionError as e:
raise e
await asyncio.sleep(0.1)
# Insert data into MySQL
try:
async with mysql_pool.acquire() as conn:
async with conn.cursor() as cur:
# Insert data into the table
query = f"""
INSERT INTO {index} (s, o, c, h, l, v, a, i, t, t2, ts)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
# Bind the values directly
await cur.execute(
query,
(
data["s"], # symbol
data["o"], # open
data["c"], # close
data["h"], # high
data["l"], # low
data["v"], # volume_base
data["a"], # volume_quote
data["i"], # interval
data["t"], # start_time
data["t2"], # end_time
data["ts"], # event_time
),
)
await conn.commit()
log.debug(f"Stored data for {data['s']} in MySQL.")
except aiomysql.Error as e:
log.error(f"MySQL error: {e}")
async def store_batch(data):
"""
Store a message into Manticore
:param msg: dict
:param data: list
"""
if not data:
return
@@ -161,12 +184,11 @@ async def store_batch(data):
body_post = ""
for item in total:
#print("ITEM", item)
# print("ITEM", item)
body_post += orjson.dumps(item).decode("utf-8")
body_post += "\n"
#print("BODY POST INDEX", index, body_post)
# print("BODY POST INDEX", index, body_post)
try:
# Bulk index operations
@@ -186,6 +208,7 @@ def create_index(api_client):
util_instance = manticoresearch.UtilsApi(api_client)
schemas = {
"main": mc_s.schema_main,
"rule_storage": mc_s.schema_rule_storage,
"meta": mc_s.schema_meta,
"internal": mc_s.schema_int,
}
@@ -216,14 +239,3 @@ async def queue_message_bulk(data):
# TODO: msgpack
message = orjson.dumps(msg)
await ar.lpush(KEYNAME, message)
created = False
while not created:
try:
create_index(api_client)
created = True
except Exception as e:
print(f"Error creating index: {e}")
sleep(1) # Block the thread, just wait for the DB
update_schema()

View File

@@ -14,6 +14,8 @@ services:
environment:
PORTAINER_GIT_DIR: "${PORTAINER_GIT_DIR}"
MODULES_ENABLED: "${MODULES_ENABLED}"
MONOLITH_RTS_MEXC_API_ACCESS_KEY: "${MONOLITH_RTS_MEXC_API_ACCESS_KEY}"
MONOLITH_RTS_MEXC_API_SECRET_KEY: "${MONOLITH_RTS_MEXC_API_SECRET_KEY}"
deploy:
resources:
limits:
@@ -83,64 +85,61 @@ services:
network_mode: host
# threshold:
# image: xf/threshold:latest
# container_name: threshold
# build: legacy/docker
# volumes:
# - ${PORTAINER_GIT_DIR}:/code
# - ${THRESHOLD_CONFIG_DIR}:/code/legacy/conf/live
# #- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates
# - ${THRESHOLD_CERT_DIR}:/code/legacy/conf/cert
# volumes_from:
# - tmp
# ports:
# - "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}"
# - "${THRESHOLD_RELAY_PORT}:${THRESHOLD_RELAY_PORT}"
# - "${THRESHOLD_API_PORT}:${THRESHOLD_API_PORT}"
# environment:
# PORTAINER_GIT_DIR: "${PORTAINER_GIT_DIR}"
# MODULES_ENABLED: "${MODULES_ENABLED}"
# DISCORD_TOKEN: "${DISCORD_TOKEN}"
# THRESHOLD_LISTENER_HOST: "${THRESHOLD_LISTENER_HOST}"
# THRESHOLD_LISTENER_PORT: "${THRESHOLD_LISTENER_PORT}"
# THRESHOLD_LISTENER_SSL: "${THRESHOLD_LISTENER_SSL}"
# THRESHOLD_RELAY_ENABLED: "${THRESHOLD_RELAY_ENABLED}"
# THRESHOLD_RELAY_HOST: "${THRESHOLD_RELAY_HOST}"
# THRESHOLD_RELAY_PORT: "${THRESHOLD_RELAY_PORT}"
# THRESHOLD_RELAY_SSL: "${THRESHOLD_RELAY_SSL}"
# THRESHOLD_API_ENABLED: "${THRESHOLD_API_ENABLED}"
# THRESHOLD_API_HOST: "${THRESHOLD_API_HOST}"
# THRESHOLD_API_PORT: "${THRESHOLD_API_PORT}"
# THRESHOLD_CONFIG_DIR: "${THRESHOLD_CONFIG_DIR}"
# #THRESHOLD_TEMPLATE_DIR: "${#THRESHOLD_TEMPLATE_DIR}"
# THRESHOLD_CERT_DIR: "${THRESHOLD_CERT_DIR}"
# # How many messages to ingest at once from Redis
# MONOLITH_INGEST_CHUNK_SIZE: "${MONOLITH_INGEST_CHUNK_SIZE}"
# # Time to wait between polling Redis again
# MONOLITH_INGEST_ITER_DELAY: "${MONOLITH_INGEST_ITER_DELAY}"
# # Number of 4chan threads to request at once
# MONOLITH_CH4_THREADS_CONCURRENT: "${MONOLITH_CH4_THREADS_CONCURRENT}"
# # Time to wait between every MONOLITH_CH4_THREADS_CONCURRENT threads
# MONOLITH_CH4_THREADS_DELAY: "${MONOLITH_CH4_THREADS_DELAY}"
# # Time to wait after finishing a crawl before starting again
# MONOLITH_CH4_CRAWL_DELAY: "${MONOLITH_CH4_CRAWL_DELAY}"
# # Semaphore value
# MONOLITH_CH4_THREADS_SEMAPHORE: "${MONOLITH_CH4_THREADS_SEMAPHORE}"
# # Threads to use for data processing
# # Leave uncommented to use all available threads
# MONOLITH_PROCESS_THREADS: "${MONOLITH_PROCESS_THREADS}"
# # Enable performance metrics after message processing
# MONOLITH_PROCESS_PERFSTATS: "${MONOLITH_PROCESS_PERFSTATS}"
# MONOLITH_CH4_BOARDS: "${MONOLITH_CH4_BOARDS}"
# REDIS_PASSWORD: "${REDIS_PASSWORD}"
# # for development
# extra_hosts:
# - "host.docker.internal:host-gateway"
# networks:
# - default
# - xf
# - db
threshold:
image: xf/threshold:latest
container_name: threshold
build: legacy/docker
volumes:
- ${PORTAINER_GIT_DIR}:/code
- ${THRESHOLD_CONFIG_DIR}:/code/legacy/conf/live
#- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates
- ${THRESHOLD_CERT_DIR}:/code/legacy/conf/cert
volumes_from:
- tmp
ports:
- "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}"
- "${THRESHOLD_RELAY_PORT}:${THRESHOLD_RELAY_PORT}"
- "${THRESHOLD_API_PORT}:${THRESHOLD_API_PORT}"
environment:
PORTAINER_GIT_DIR: "${PORTAINER_GIT_DIR}"
MODULES_ENABLED: "${MODULES_ENABLED}"
DISCORD_TOKEN: "${DISCORD_TOKEN}"
THRESHOLD_LISTENER_HOST: "${THRESHOLD_LISTENER_HOST}"
THRESHOLD_LISTENER_PORT: "${THRESHOLD_LISTENER_PORT}"
THRESHOLD_LISTENER_SSL: "${THRESHOLD_LISTENER_SSL}"
THRESHOLD_RELAY_ENABLED: "${THRESHOLD_RELAY_ENABLED}"
THRESHOLD_RELAY_HOST: "${THRESHOLD_RELAY_HOST}"
THRESHOLD_RELAY_PORT: "${THRESHOLD_RELAY_PORT}"
THRESHOLD_RELAY_SSL: "${THRESHOLD_RELAY_SSL}"
THRESHOLD_API_ENABLED: "${THRESHOLD_API_ENABLED}"
THRESHOLD_API_HOST: "${THRESHOLD_API_HOST}"
THRESHOLD_API_PORT: "${THRESHOLD_API_PORT}"
THRESHOLD_CONFIG_DIR: "${THRESHOLD_CONFIG_DIR}"
#THRESHOLD_TEMPLATE_DIR: "${#THRESHOLD_TEMPLATE_DIR}"
THRESHOLD_CERT_DIR: "${THRESHOLD_CERT_DIR}"
# How many messages to ingest at once from Redis
MONOLITH_INGEST_CHUNK_SIZE: "${MONOLITH_INGEST_CHUNK_SIZE}"
# Time to wait between polling Redis again
MONOLITH_INGEST_ITER_DELAY: "${MONOLITH_INGEST_ITER_DELAY}"
# Number of 4chan threads to request at once
MONOLITH_CH4_THREADS_CONCURRENT: "${MONOLITH_CH4_THREADS_CONCURRENT}"
# Time to wait between every MONOLITH_CH4_THREADS_CONCURRENT threads
MONOLITH_CH4_THREADS_DELAY: "${MONOLITH_CH4_THREADS_DELAY}"
# Time to wait after finishing a crawl before starting again
MONOLITH_CH4_CRAWL_DELAY: "${MONOLITH_CH4_CRAWL_DELAY}"
# Semaphore value
MONOLITH_CH4_THREADS_SEMAPHORE: "${MONOLITH_CH4_THREADS_SEMAPHORE}"
# Threads to use for data processing
# Leave uncommented to use all available threads
MONOLITH_PROCESS_THREADS: "${MONOLITH_PROCESS_THREADS}"
# Enable performance metrics after message processing
MONOLITH_PROCESS_PERFSTATS: "${MONOLITH_PROCESS_PERFSTATS}"
MONOLITH_CH4_BOARDS: "${MONOLITH_CH4_BOARDS}"
REDIS_PASSWORD: "${REDIS_PASSWORD}"
# for development
extra_hosts:
- "host.docker.internal:host-gateway"
network_mode: host
ssdb:
image: tsl0922/ssdb

View File

@@ -1,8 +1,10 @@
import asyncio
from os import getenv
from time import sleep
import uvloop
import db
import util
from sources.ch4 import Chan4
from sources.dis import DiscordClient
@@ -34,6 +36,17 @@ async def main(loop):
loop.create_task(chan.run())
created = False
while not created:
try:
db.create_index(db.api_client)
created = True
except Exception as e:
print(f"Error creating index: {e}")
sleep(1) # Block the thread, just wait for the DB
db.update_schema()
loop = asyncio.get_event_loop()
loop.create_task(main(loop))

0
oom Normal file
View File

0
perf/__init__.py Normal file
View File

View File

@@ -1,10 +1,12 @@
import psutil
import asyncio
import time
import psutil
import util
class DynamicThrottle(object):
class DynamicThrottle(object):
def __init__(self, **kwargs):
self.target_cpu_usage = kwargs.get("target_cpu_usage", 50)
self.sleep_interval = 0.0
@@ -22,13 +24,14 @@ class DynamicThrottle(object):
self.consecutive_increments = 0
self.consecutive_decrements = 0
self.last_was_increment = False
self.consecutive_divisor = kwargs.get("consecutive_divisor", 1)
if kwargs.get("async"):
self.dynamic_throttle = self.dynamic_throttle_async
self.last_was_increment = kwargs.get("start_increment", True)
if kwargs.get("use_async"):
self.wait = self.dynamic_throttle_async
else:
self.dynamic_throttle = self.dynamic_throttle
self.wait = self.dynamic_throttle
async def dynamic_throttle_async(self):
"""
@@ -37,22 +40,48 @@ class DynamicThrottle(object):
current_cpu_usage = psutil.cpu_percent(interval=self.psutil_interval)
if current_cpu_usage > self.target_cpu_usage:
self.sleep_interval += self.sleep_increment
if self.last_was_increment:
self.consecutive_increments += 1
# self.log.debug(f"High CPU consecutive increments: {self.consecutive_increments}")
else:
self.consecutive_increments = 0 # ?
self.consecutive_decrements = 0 # ?
# self.log.debug(f"High CPU alert reset.")
self.sleep_interval += self.sleep_increment * (
max(1, self.consecutive_increments) / self.consecutive_divisor
)
self.last_was_increment = True
if self.sleep_interval > self.sleep_max:
self.sleep_interval = self.sleep_max
self.log.info(
f"CPU {current_cpu_usage}% > {self.target_cpu_usage}%, "
f"=> sleep {self.sleep_interval:.3f}s"
)
elif current_cpu_usage < self.target_cpu_usage and self.sleep_interval > self.sleep_min:
self.sleep_interval -= self.sleep_decrement
self.log.info(
f"CPU {current_cpu_usage}% < {self.target_cpu_usage}%, "
f"=> sleep {self.sleep_interval:.3f}s"
# self.log.debug(f"High CPU, but not increasing above {self.sleep_max:.3f}s")
# self.log.debug(
# f"High CPU: {current_cpu_usage}% > {self.target_cpu_usage}%, "
# f"=> sleep {self.sleep_interval:.3f}s"
# )
elif current_cpu_usage < self.target_cpu_usage:
if not self.last_was_increment:
self.consecutive_decrements += 1
# self.log.debug(f"Low CPU consecutive decrements: {self.consecutive_decrements}")
else:
self.consecutive_decrements = 0 # ?
self.consecutive_increments = 0 # ?
# self.log.debug(f"Low CPU alert reset.")
self.sleep_interval -= self.sleep_decrement * (
max(1, self.consecutive_decrements) / self.consecutive_divisor
)
self.last_was_increment = False
if self.sleep_interval < self.sleep_min:
self.sleep_interval = self.sleep_min
# self.log.debug(f"Low CPU, but not decreasing below {self.sleep_min:.3f}s")
# self.log.debug(
# f"Low CPU: {current_cpu_usage}% < {self.target_cpu_usage}%, "
# f"=> sleep {self.sleep_interval:.3f}s"
# )
if self.sleep_interval > 0:
await asyncio.sleep(self.sleep_interval)
return self.sleep_interval
return 0.0
def dynamic_throttle(self):
"""
@@ -61,19 +90,45 @@ class DynamicThrottle(object):
current_cpu_usage = psutil.cpu_percent(interval=self.psutil_interval)
if current_cpu_usage > self.target_cpu_usage:
self.sleep_interval += self.sleep_increment
if self.last_was_increment:
self.consecutive_increments += 1
# self.log.debug(f"High CPU consecutive increments: {self.consecutive_increments}")
else:
self.consecutive_increments = 0 # ?
self.consecutive_decrements = 0 # ?
# self.log.debug(f"High CPU alert reset.")
self.sleep_interval += self.sleep_increment * (
max(1, self.consecutive_increments) / self.consecutive_divisor
)
self.last_was_increment = True
if self.sleep_interval > self.sleep_max:
self.sleep_interval = self.sleep_max
self.log.info(
f"CPU {current_cpu_usage}% > {self.target_cpu_usage}%, "
f"=> sleep {self.sleep_interval:.3f}s"
)
elif current_cpu_usage < self.target_cpu_usage and self.sleep_interval > self.sleep_min:
self.sleep_interval -= self.sleep_decrement
self.log.info(
f"CPU {current_cpu_usage}% < {self.target_cpu_usage}%, "
f"=> sleep {self.sleep_interval:.3f}s"
# self.log.debug(f"High CPU, but not increasing above {self.sleep_max:.3f}s")
# self.log.debug(
# f"High CPU: {current_cpu_usage}% > {self.target_cpu_usage}%, "
# f"=> sleep {self.sleep_interval:.3f}s"
# )
elif current_cpu_usage < self.target_cpu_usage:
if not self.last_was_increment:
self.consecutive_decrements += 1
# self.log.debug(f"Low CPU consecutive decrements: {self.consecutive_decrements}")
else:
self.consecutive_decrements = 0 # ?
self.consecutive_increments = 0 # ?
# self.log.debug(f"Low CPU alert reset.")
self.sleep_interval -= self.sleep_decrement * (
max(1, self.consecutive_decrements) / self.consecutive_divisor
)
self.last_was_increment = False
if self.sleep_interval < self.sleep_min:
self.sleep_interval = self.sleep_min
# self.log.debug(f"Low CPU, but not decreasing below {self.sleep_min:.3f}s")
# self.log.debug(
# f"Low CPU: {current_cpu_usage}% < {self.target_cpu_usage}%, "
# f"=> sleep {self.sleep_interval:.3f}s"
# )
if self.sleep_interval > 0:
time.sleep(self.sleep_interval)
time.sleep(self.sleep_interval)
return self.sleep_interval
return 0.0

1
processing/ohlc.py Normal file
View File

@@ -0,0 +1 @@
# Resample 1Min into 5Min, 15Min, 30Min, 1H, 4H, 1D, 1W, 1M, 1Y

View File

@@ -8,9 +8,6 @@ import string
# For timing
import time
# For throttling
import psutil
# Squash errors
import warnings
from concurrent.futures import ProcessPoolExecutor
@@ -50,6 +47,9 @@ from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import db
import util
# For throttling
from perf.throttle import DynamicThrottle
# 4chan schema
from schemas.ch4_s import ATTRMAP
@@ -62,8 +62,6 @@ MONOLITH_PROCESS_PERFSTATS = (
)
TARGET_CPU_USAGE = float(os.getenv("MONOLITH_PROCESS_TARGET_CPU_USAGE", 50.0))
SLEEP_INTERVAL = 0.0
CUSTOM_FILTERS = [
lambda x: x.lower(),
strip_tags, #
@@ -94,6 +92,19 @@ CPU_THREADS = int(os.getenv("MONOLITH_PROCESS_THREADS", os.cpu_count()))
p = ProcessPoolExecutor(CPU_THREADS)
throttle = DynamicThrottle(
target_cpu_usage=TARGET_CPU_USAGE,
sleep_increment=0.02,
sleep_decrement=0.01,
sleep_max=0.5,
sleep_min=0,
psutil_interval=0.1,
consecutive_divisor=2,
log=log,
start_increment=True,
use_async=False,
)
def get_hash_key():
hash_key = db.r.get("hashing_key")
@@ -136,7 +147,7 @@ async def spawn_processing_threads(chunk, length):
# Join the results back from the split list
flat_list = [item for sublist in results for item in sublist]
total_messages = len(flat_list)
log.debug(
log.info(
(
f"[{chunk}/{index}] Results from processing of {length} messages in "
f"{cores} threads: {len(flat_list)}"
@@ -149,7 +160,6 @@ async def spawn_processing_threads(chunk, length):
def process_data(chunk, index, chunk_size):
global SLEEP_INTERVAL
log.debug(f"[{chunk}/{index}] Processing {chunk_size} messages")
to_store = []
@@ -159,7 +169,6 @@ def process_data(chunk, index, chunk_size):
date_time = 0.0
nlp_time = 0.0
normalise_time = 0.0
hash_time = 0.0
normal2_time = 0.0
soup_time = 0.0
sleep_time = 0.0
@@ -170,11 +179,28 @@ def process_data(chunk, index, chunk_size):
analyzer = SentimentIntensityAnalyzer()
for msg_index in range(chunk_size):
# Print percentage of msg_index relative to chunk_size
if msg_index % 10 == 0:
percentage_done = (msg_index / chunk_size) * 100
log.debug(
f"[{chunk}/{index}] {percentage_done:.2f}% done ({msg_index}/{chunk_size})"
)
msg = db.r.rpop(KEYNAME)
if not msg:
return
# TODO: msgpack
msg = orjson.loads(msg)
if msg["src"] == "4ch":
board = msg["net"]
thread = msg["channel"]
redis_key = (
f"cache.{board}.{thread}.{msg['no']}.{msg['resto']}.{msg['now']}"
)
key_content = db.r.get(redis_key)
if key_content is not None:
continue
db.r.set(redis_key, "1")
total_start = time.process_time()
# normalise fields
start = time.process_time()
@@ -200,29 +226,6 @@ def process_data(chunk, index, chunk_size):
board = msg["net"]
thread = msg["channel"]
# Calculate hash for post
start = time.process_time()
post_normalised = orjson.dumps(msg, option=orjson.OPT_SORT_KEYS)
hash = siphash(hash_key, post_normalised)
hash = str(hash)
redis_key = (
f"cache.{board}.{thread}.{msg['no']}.{msg['resto']}.{msg['now']}"
)
key_content = db.r.get(redis_key)
if key_content is not None:
key_content = key_content.decode("ascii")
if key_content == hash:
# This deletes the message since the append at the end won't be hit
continue
# pass
else:
# msg["type"] = "update"
# Fuck it, updates just brew spam
continue
db.r.set(redis_key, hash)
time_took = (time.process_time() - start) * 1000
hash_time += time_took
start = time.process_time()
for key2, value in list(msg.items()):
if key2 in ATTRMAP:
@@ -240,9 +243,10 @@ def process_data(chunk, index, chunk_size):
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S")
else:
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M")
# new_ts = old_ts.isoformat()
# iso_ts = old_ts.isoformat()
new_ts = int(old_ts.timestamp())
msg["ts"] = new_ts
# msg["iso"] = iso_ts
else:
raise Exception("No TS in msg")
time_took = (time.process_time() - start) * 1000
@@ -302,39 +306,22 @@ def process_data(chunk, index, chunk_size):
# Dynamic throttling to reduce CPU usage
if msg_index % 5 == 0:
current_cpu_usage = psutil.cpu_percent(interval=0.2)
if current_cpu_usage > TARGET_CPU_USAGE:
SLEEP_INTERVAL += 0.02
if SLEEP_INTERVAL > 0.5:
SLEEP_INTERVAL = 0.5
log.info(
f"CPU {current_cpu_usage}% > {TARGET_CPU_USAGE}%, "
f"=> sleep {SLEEP_INTERVAL:.3f}s"
)
elif current_cpu_usage < TARGET_CPU_USAGE and SLEEP_INTERVAL > 0.01:
SLEEP_INTERVAL -= 0.01
log.info(
f"CPU {current_cpu_usage}% < {TARGET_CPU_USAGE}%, "
f"=> sleep {SLEEP_INTERVAL:.3f}s"
)
time.sleep(SLEEP_INTERVAL)
sleep_time += SLEEP_INTERVAL
sleep_time += throttle.wait()
if MONOLITH_PROCESS_PERFSTATS:
log.debug("=====================================")
log.debug(f"Chunk: {chunk}")
log.debug(f"Index: {index}")
log.debug(f"Sentiment: {sentiment_time}")
log.debug(f"Regex: {regex_time}")
log.debug(f"Polyglot: {polyglot_time}")
log.debug(f"Date: {date_time}")
log.debug(f"NLP: {nlp_time}")
log.debug(f"Normalise: {normalise_time}")
log.debug(f"Hash: {hash_time}")
log.debug(f"Normal2: {normal2_time}")
log.debug(f"Soup: {soup_time}")
log.debug(f"Total: {total_time}")
log.debug(f"Throttling: {sleep_time}")
log.debug("=====================================")
log.info("=====================================")
log.info(f"Chunk: {chunk}")
log.info(f"Index: {index}")
log.info(f"Sentiment: {sentiment_time}")
log.info(f"Regex: {regex_time}")
log.info(f"Polyglot: {polyglot_time}")
log.info(f"Date: {date_time}")
log.info(f"NLP: {nlp_time}")
log.info(f"Normalise: {normalise_time}")
log.info(f"Normal2: {normal2_time}")
log.info(f"Soup: {soup_time}")
log.info(f"Total: {total_time}")
log.info(f"Throttling: {sleep_time}")
log.info("=====================================")
return to_store

View File

@@ -24,3 +24,6 @@ elasticsearch[async]
msgpack
# flpc
psutil
pymexc
websockets
aiomysql

193
rts.py
View File

@@ -1,31 +1,186 @@
import asyncio
import logging
from os import getenv
import uvloop
import orjson
import websockets
import util
import db
# Use UVLoop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# Logger setup
logging.basicConfig(level=logging.INFO)
log = logging.getLogger("RTS")
log = util.get_logger("rts")
# Environment variables
MONOLITH_RTS_MEXC_API_ACCESS_KEY = getenv("MONOLITH_RTS_MEXC_API_ACCESS_KEY", None)
MONOLITH_RTS_MEXC_API_SECRET_KEY = getenv("MONOLITH_RTS_MEXC_API_SECRET_KEY", None)
modules_enabled = getenv("MODULES_ENABLED", False)
if "rts" not in modules_enabled:
log.info("RTS disabled.")
exit(0)
# WebSocket endpoint
MEXC_WS_URL = "wss://wbs.mexc.com/ws"
{
"d": {
"e": "spot@public.kline.v3.api",
"k": {
"t": 1737901140, # TS
"o": "684.4", # Open
"c": "684.5", # Close
"h": "684.5", # High
"l": "684.4", # Low
"v": "0.173", # Volume of the base
"a": "118.41", # Volume of the quote (Quantity)
"T": 1737901200, # ?
"i": "Min1", # ?
},
},
"c": "spot@public.kline.v3.api@BNBUSDT@Min1", # Channel
"t": 1737901159239,
"s": "BNBUSDT", # Symbol
}
# Scan DB for last endtime (T)
# Request Kline data from last endtime (T) to now
async def main(loop):
log.info("RTS started.")
# Check Server Time
# Response
# {
# "serverTime" : 1645539742000
# }
# GET /api/v3/time
# Weight(IP): 1
# Parameter:
# NONE
# Kline/Candlestick Data
# Response
# [
# [
# 1640804880000,
# "47482.36",
# "47482.36",
# "47416.57",
# "47436.1",
# "3.550717",
# 1640804940000,
# "168387.3"
# ]
# ]
# GET /api/v3/klines
# Weight(IP): 1
# Kline/candlestick bars for a symbol. Klines are uniquely identified by their open time.
# Parameters:
# Name Type Mandatory Description
# symbol string YES
# interval ENUM YES ENUM: Kline Interval
# startTime long NO
# endTime long NO
# limit integer NO Default 500; max 1000.
# Scrub function:
# For each record, ensure there are no time gaps
# When the 1m window goes over, the next t is always the last T.
# Check for gaps, and request all klines between those gaps to ensure a full DB, even with restarts.
loop = asyncio.get_event_loop()
loop.create_task(main(loop))
# Idle jitter function - compare our time with server time.
# Compare ts with our time and print jitter. Add jitter warning to log and OHLC.
# High jitter may prevent us from getting the correct data for trading.
async def mex_handle(data):
message = orjson.loads(data)
# print(orjson.dumps(message, option=orjson.OPT_INDENT_2).decode("utf-8"))
if "code" in message:
if message["code"] == 0:
log.info("Control message received")
return
try:
loop.run_forever()
except KeyboardInterrupt:
log.info("RTS process terminating")
finally:
loop.close()
symbol = message["s"]
open = message["d"]["k"]["o"]
close = message["d"]["k"]["c"]
high = message["d"]["k"]["h"]
low = message["d"]["k"]["l"]
volume_base = message["d"]["k"]["v"] # ERROR IN API DOCS
volume_quote = message["d"]["k"]["a"] # > a bigDecimal volume
interval = message["d"]["k"]["i"]
start_time = message["d"]["k"]["t"] # > t long stratTime
end_time = message["d"]["k"]["T"] # > T long endTime
event_time = message["t"] # t long eventTime
index = f"mex_ohlc_{symbol.lower()}"
reformatted = {
"s": symbol,
"o": float(open),
"c": float(close),
"h": float(high),
"l": float(low),
"v": float(volume_base),
"a": float(volume_quote),
"i": interval,
"t": int(start_time),
"t2": int(end_time),
"ts": int(event_time),
}
await db.rts_store_message(index, reformatted)
print(index)
print(orjson.dumps(reformatted, option=orjson.OPT_INDENT_2).decode("utf-8"))
print()
# Kline WebSocket handler
async def mex_main():
await db.init_mysql_pool()
async with websockets.connect(MEXC_WS_URL) as websocket:
log.info("WebSocket connected")
# Define symbols and intervals
symbols = ["BTCUSDT"] # Add more symbols as needed
interval = "Min1" # Kline interval
# Prepare subscription requests for Kline streams
# Request: spot@public.kline.v3.api@<symbol>@<interval>
subscriptions = [
f"spot@public.kline.v3.api@{symbol}@{interval}" for symbol in symbols
]
# Send subscription requests
subscribe_request = {
"method": "SUBSCRIPTION",
"params": subscriptions,
# "id": 1,
}
await websocket.send(orjson.dumps(subscribe_request).decode("utf-8"))
log.info(f"Subscribed to: {subscriptions}")
# Listen for messages
while True:
try:
message = await websocket.recv()
await mex_handle(message)
except websockets.exceptions.ConnectionClosed as e:
log.error(f"WebSocket connection closed: {e}")
break
# Entry point
if __name__ == "__main__":
try:
asyncio.run(mex_main())
except KeyboardInterrupt:
log.info("RTS process terminated.")

View File

@@ -137,9 +137,11 @@ schema_main = {
"rule_id": "bigint",
"index": "string indexed attribute",
"meta": "text",
# "iso": "string indexed attribute",
}
schema_rule_storage = schema_main
schema_meta = {
"id": "bigint",
# 393598265, #main, Rust Programmer's Club

View File

@@ -8,10 +8,9 @@ from os import getenv
import aiohttp
from numpy import array_split
import psutil
import db
import util
from perf.throttle import DynamicThrottle
# CONFIGURATION #
@@ -35,6 +34,7 @@ BOARDS = getenv("MONOLITH_CH4_BOARDS", "").split(",")
# CONFIGURATION END #
class Chan4(object):
"""
4chan indexer, crawler and ingester.
@@ -44,7 +44,18 @@ class Chan4(object):
name = self.__class__.__name__
self.log = util.get_logger(name)
self.sleep_interval = 0.0
self.throttle = DynamicThrottle(
target_cpu_usage=TARGET_CPU_USAGE,
sleep_increment=0.01,
sleep_decrement=0.01,
sleep_max=0.1,
sleep_min=0,
psutil_interval=0.1,
log=self.log,
start_increment=False,
use_async=True,
)
self.wait = self.throttle.wait
self.api_endpoint = "https://a.4cdn.org"
# self.boards = ["out", "g", "a", "3", "pol"] #
@@ -65,33 +76,6 @@ class Chan4(object):
self.hash_key = self.hash_key.decode("ascii")
self.log.debug(f"Decoded hash key: {self.hash_key}")
async def dynamic_throttle(self):
"""
Dynamically sleeps before a request if CPU usage is above our target.
Also, if CPU usage is far below the target, reduce the sleep time.
Caps the sleep interval at 0.2s.
Prints CPU usage and sleep interval like process.py.
"""
current_cpu_usage = psutil.cpu_percent(interval=0.2)
if current_cpu_usage > TARGET_CPU_USAGE:
self.sleep_interval += 0.01
if self.sleep_interval > 0.1:
self.sleep_interval = 0.1
self.log.info(
f"CPU {current_cpu_usage}% > {TARGET_CPU_USAGE}%, "
f"=> sleep {self.sleep_interval:.3f}s"
)
elif current_cpu_usage < TARGET_CPU_USAGE and self.sleep_interval > 0.01:
self.sleep_interval -= 0.01
self.log.info(
f"CPU {current_cpu_usage}% < {TARGET_CPU_USAGE}%, "
f"=> sleep {self.sleep_interval:.3f}s"
)
if self.sleep_interval > 0:
await asyncio.sleep(self.sleep_interval)
async def run(self):
if "ALL" in BOARDS:
await self.get_board_list()
@@ -204,7 +188,7 @@ class Chan4(object):
async def bound_fetch(self, sem, url, session, mapped):
# Getter function with semaphore.
async with sem:
await self.dynamic_throttle()
await self.wait()
try:
return await self.fetch(url, session, mapped)
except: # noqa

View File

@@ -21,6 +21,7 @@ log = util.get_logger("ingest")
INGEST_MAX = int(getenv("MONOLITH_INGEST_MAX", "1000000"))
INGEST_MIN = int(getenv("MONOLITH_INGEST_MIN", "100"))
class Ingest(object):
def __init__(self):
name = self.__class__.__name__
@@ -51,7 +52,7 @@ class Ingest(object):
if ingested < INGEST_INCREASE_BELOW:
if CHUNK_SIZE + INGEST_INCREASE_BY < INGEST_MAX:
self.log.info(
self.log.debug(
(
f"Increasing chunk size to "
f"{CHUNK_SIZE + INGEST_INCREASE_BY} "
@@ -60,11 +61,13 @@ class Ingest(object):
)
CHUNK_SIZE += INGEST_INCREASE_BY
else:
log.info(f"Chunk size ({CHUNK_SIZE}) at maximum, not increasing above: {INGEST_MAX}")
log.debug(
f"Chunk size ({CHUNK_SIZE}) at maximum, not increasing above: {INGEST_MAX}"
)
elif ingested > INGEST_DECREASE_ABOVE:
if CHUNK_SIZE - INGEST_DECREASE_BY > INGEST_MIN:
self.log.info(
self.log.debug(
(
f"Decreasing chunk size to "
f"{CHUNK_SIZE - INGEST_DECREASE_BY}"
@@ -73,4 +76,6 @@ class Ingest(object):
)
CHUNK_SIZE -= INGEST_DECREASE_BY
else:
log.info(f"Chunk size ({CHUNK_SIZE}) at minimum, not decreasing below: {INGEST_MIN}")
log.debug(
f"Chunk size ({CHUNK_SIZE}) at minimum, not decreasing below: {INGEST_MIN}"
)