diff --git a/clients/mexc.py b/clients/mexc.py new file mode 100644 index 0000000..e69de29 diff --git a/db.py b/db.py index 83701c6..bb448f4 100644 --- a/db.py +++ b/db.py @@ -1,18 +1,22 @@ +import asyncio from math import ceil +from os import getenv +from time import sleep +import aiomysql import aioredis import manticoresearch +import msgpack import orjson from manticoresearch.rest import ApiException from numpy import array_split from redis import StrictRedis -import msgpack -import asyncio import util from schemas import mc_s -from os import getenv -from time import sleep + +mysql_pool = None + configuration = manticoresearch.Configuration(host="http://127.0.0.1:9308") api_client = manticoresearch.ApiClient(configuration) @@ -24,24 +28,25 @@ log = util.get_logger("db") # r = StrictRedis(unix_socket_path="/var/run/redis/redis.sock", db=0) r = StrictRedis( host="127.0.0.1", # Replace with your Redis server's IP address - port=1289, # Replace with your Redis server's port - db=0 # Database number + port=1289, # Replace with your Redis server's port + db=0, # Database number ) # AIORedis # ar = aioredis.from_url("unix:///var/run/redis/redis.sock", db=0) -ar = aioredis.from_url( - "redis://127.0.0.1:1289", - db=0 -) +ar = aioredis.from_url("redis://127.0.0.1:1289", db=0) # /var/run/neptune-redis.sock # db = 10 pr = aioredis.from_url("unix://var/run/neptune-redis.sock", db=10) -#pr = aioredis.from_url("redis://redis_neptune:6379", db=10, password=getenv("REDIS_PASSWORD")) +# fr = aioredis.from_url("unix://var/run/fisk-redis.sock", db=10) +fr = aioredis.from_url("unix://var/run/redis.sock", db=10) +# pr = aioredis.from_url("redis://redis_neptune:6379", db=10, password=getenv("REDIS_PASSWORD")) KEYNAME = "queue" MESSAGE_KEY = "messages" +OHLC_MESSAGE_KEY = "ohlc" + TYPES_MAIN = [ "msg", @@ -60,50 +65,68 @@ TYPES_META = ["who"] TYPES_INT = ["conn", "highlight", "znc", "query", "self"] -# def store_message(msg): -# """ -# Store a message into Manticore -# :param msg: dict -# """ -# # Duplicated to avoid extra function call -# if msg["type"] in TYPES_MAIN: -# index = "main" -# schema = mc_s.schema_main -# elif msg["type"] in TYPES_META: -# index = "meta" -# schema = mc_s.schema_meta -# elif msg["type"] in TYPES_INT: -# index = "internal" -# schema = mc_s.schema_int -# # normalise fields -# for key, value in list(msg.items()): -# if value is None: -# del msg[key] -# if key in schema: -# if isinstance(value, int): -# if schema[key].startswith("string") or schema[key].startswith("text"): -# msg[key] = str(value) +async def init_mysql_pool(): + """ + Initialize the MySQL connection pool. + """ + global mysql_pool + mysql_pool = await aiomysql.create_pool( + host="127.0.0.1", port=9306, db="Manticore", minsize=1, maxsize=10 + ) -# body = [{"insert": {"index": index, "doc": msg}}] -# body_post = "" -# for item in body: -# body_post += orjson.dumps(item) -# body_post += "\n" -# # print(body_post) -# try: -# # Bulk index operations -# api_response = api_instance.bulk(body_post) # , async_req=True -# # print(api_response) -# except ApiException as e: -# print("Exception when calling IndexApi->bulk: %s\n" % e) -# print("ATTEMPT", body_post) +async def rts_store_message(index, data): + """ + Store a RTS message into MySQL using an existing connection pool. + Prioritizes instant PubSub delivery, with minimal data storage overhead. + :param index: str + :param data: dict + """ + # Publish to Redis PubSub + packed_index = msgpack.packb({"index": index, "data": data}, use_bin_type=True) + + try: + await fr.publish(OHLC_MESSAGE_KEY, packed_index) + except aioredis.exceptions.ConnectionError as e: + raise e + await asyncio.sleep(0.1) + + # Insert data into MySQL + try: + async with mysql_pool.acquire() as conn: + async with conn.cursor() as cur: + # Insert data into the table + query = f""" + INSERT INTO {index} (s, o, c, h, l, v, a, i, t, t2, ts) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + """ + # Bind the values directly + await cur.execute( + query, + ( + data["s"], # symbol + data["o"], # open + data["c"], # close + data["h"], # high + data["l"], # low + data["v"], # volume_base + data["a"], # volume_quote + data["i"], # interval + data["t"], # start_time + data["t2"], # end_time + data["ts"], # event_time + ), + ) + await conn.commit() + log.debug(f"Stored data for {data['s']} in MySQL.") + except aiomysql.Error as e: + log.error(f"MySQL error: {e}") async def store_batch(data): """ Store a message into Manticore - :param msg: dict + :param data: list """ if not data: return @@ -161,12 +184,11 @@ async def store_batch(data): body_post = "" for item in total: - #print("ITEM", item) + # print("ITEM", item) body_post += orjson.dumps(item).decode("utf-8") body_post += "\n" - #print("BODY POST INDEX", index, body_post) - + # print("BODY POST INDEX", index, body_post) try: # Bulk index operations @@ -186,6 +208,7 @@ def create_index(api_client): util_instance = manticoresearch.UtilsApi(api_client) schemas = { "main": mc_s.schema_main, + "rule_storage": mc_s.schema_rule_storage, "meta": mc_s.schema_meta, "internal": mc_s.schema_int, } @@ -216,14 +239,3 @@ async def queue_message_bulk(data): # TODO: msgpack message = orjson.dumps(msg) await ar.lpush(KEYNAME, message) - - -created = False -while not created: - try: - create_index(api_client) - created = True - except Exception as e: - print(f"Error creating index: {e}") - sleep(1) # Block the thread, just wait for the DB -update_schema() diff --git a/docker-compose.prod.yml b/docker-compose.prod.yml index 4ad5baa..f1282ee 100644 --- a/docker-compose.prod.yml +++ b/docker-compose.prod.yml @@ -14,6 +14,8 @@ services: environment: PORTAINER_GIT_DIR: "${PORTAINER_GIT_DIR}" MODULES_ENABLED: "${MODULES_ENABLED}" + MONOLITH_RTS_MEXC_API_ACCESS_KEY: "${MONOLITH_RTS_MEXC_API_ACCESS_KEY}" + MONOLITH_RTS_MEXC_API_SECRET_KEY: "${MONOLITH_RTS_MEXC_API_SECRET_KEY}" deploy: resources: limits: @@ -83,64 +85,61 @@ services: network_mode: host - # threshold: - # image: xf/threshold:latest - # container_name: threshold - # build: legacy/docker - # volumes: - # - ${PORTAINER_GIT_DIR}:/code - # - ${THRESHOLD_CONFIG_DIR}:/code/legacy/conf/live - # #- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates - # - ${THRESHOLD_CERT_DIR}:/code/legacy/conf/cert - # volumes_from: - # - tmp - # ports: - # - "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}" - # - "${THRESHOLD_RELAY_PORT}:${THRESHOLD_RELAY_PORT}" - # - "${THRESHOLD_API_PORT}:${THRESHOLD_API_PORT}" - # environment: - # PORTAINER_GIT_DIR: "${PORTAINER_GIT_DIR}" - # MODULES_ENABLED: "${MODULES_ENABLED}" - # DISCORD_TOKEN: "${DISCORD_TOKEN}" - # THRESHOLD_LISTENER_HOST: "${THRESHOLD_LISTENER_HOST}" - # THRESHOLD_LISTENER_PORT: "${THRESHOLD_LISTENER_PORT}" - # THRESHOLD_LISTENER_SSL: "${THRESHOLD_LISTENER_SSL}" - # THRESHOLD_RELAY_ENABLED: "${THRESHOLD_RELAY_ENABLED}" - # THRESHOLD_RELAY_HOST: "${THRESHOLD_RELAY_HOST}" - # THRESHOLD_RELAY_PORT: "${THRESHOLD_RELAY_PORT}" - # THRESHOLD_RELAY_SSL: "${THRESHOLD_RELAY_SSL}" - # THRESHOLD_API_ENABLED: "${THRESHOLD_API_ENABLED}" - # THRESHOLD_API_HOST: "${THRESHOLD_API_HOST}" - # THRESHOLD_API_PORT: "${THRESHOLD_API_PORT}" - # THRESHOLD_CONFIG_DIR: "${THRESHOLD_CONFIG_DIR}" - # #THRESHOLD_TEMPLATE_DIR: "${#THRESHOLD_TEMPLATE_DIR}" - # THRESHOLD_CERT_DIR: "${THRESHOLD_CERT_DIR}" - # # How many messages to ingest at once from Redis - # MONOLITH_INGEST_CHUNK_SIZE: "${MONOLITH_INGEST_CHUNK_SIZE}" - # # Time to wait between polling Redis again - # MONOLITH_INGEST_ITER_DELAY: "${MONOLITH_INGEST_ITER_DELAY}" - # # Number of 4chan threads to request at once - # MONOLITH_CH4_THREADS_CONCURRENT: "${MONOLITH_CH4_THREADS_CONCURRENT}" - # # Time to wait between every MONOLITH_CH4_THREADS_CONCURRENT threads - # MONOLITH_CH4_THREADS_DELAY: "${MONOLITH_CH4_THREADS_DELAY}" - # # Time to wait after finishing a crawl before starting again - # MONOLITH_CH4_CRAWL_DELAY: "${MONOLITH_CH4_CRAWL_DELAY}" - # # Semaphore value - # MONOLITH_CH4_THREADS_SEMAPHORE: "${MONOLITH_CH4_THREADS_SEMAPHORE}" - # # Threads to use for data processing - # # Leave uncommented to use all available threads - # MONOLITH_PROCESS_THREADS: "${MONOLITH_PROCESS_THREADS}" - # # Enable performance metrics after message processing - # MONOLITH_PROCESS_PERFSTATS: "${MONOLITH_PROCESS_PERFSTATS}" - # MONOLITH_CH4_BOARDS: "${MONOLITH_CH4_BOARDS}" - # REDIS_PASSWORD: "${REDIS_PASSWORD}" - # # for development - # extra_hosts: - # - "host.docker.internal:host-gateway" - # networks: - # - default - # - xf - # - db + threshold: + image: xf/threshold:latest + container_name: threshold + build: legacy/docker + volumes: + - ${PORTAINER_GIT_DIR}:/code + - ${THRESHOLD_CONFIG_DIR}:/code/legacy/conf/live + #- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates + - ${THRESHOLD_CERT_DIR}:/code/legacy/conf/cert + volumes_from: + - tmp + ports: + - "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}" + - "${THRESHOLD_RELAY_PORT}:${THRESHOLD_RELAY_PORT}" + - "${THRESHOLD_API_PORT}:${THRESHOLD_API_PORT}" + environment: + PORTAINER_GIT_DIR: "${PORTAINER_GIT_DIR}" + MODULES_ENABLED: "${MODULES_ENABLED}" + DISCORD_TOKEN: "${DISCORD_TOKEN}" + THRESHOLD_LISTENER_HOST: "${THRESHOLD_LISTENER_HOST}" + THRESHOLD_LISTENER_PORT: "${THRESHOLD_LISTENER_PORT}" + THRESHOLD_LISTENER_SSL: "${THRESHOLD_LISTENER_SSL}" + THRESHOLD_RELAY_ENABLED: "${THRESHOLD_RELAY_ENABLED}" + THRESHOLD_RELAY_HOST: "${THRESHOLD_RELAY_HOST}" + THRESHOLD_RELAY_PORT: "${THRESHOLD_RELAY_PORT}" + THRESHOLD_RELAY_SSL: "${THRESHOLD_RELAY_SSL}" + THRESHOLD_API_ENABLED: "${THRESHOLD_API_ENABLED}" + THRESHOLD_API_HOST: "${THRESHOLD_API_HOST}" + THRESHOLD_API_PORT: "${THRESHOLD_API_PORT}" + THRESHOLD_CONFIG_DIR: "${THRESHOLD_CONFIG_DIR}" + #THRESHOLD_TEMPLATE_DIR: "${#THRESHOLD_TEMPLATE_DIR}" + THRESHOLD_CERT_DIR: "${THRESHOLD_CERT_DIR}" + # How many messages to ingest at once from Redis + MONOLITH_INGEST_CHUNK_SIZE: "${MONOLITH_INGEST_CHUNK_SIZE}" + # Time to wait between polling Redis again + MONOLITH_INGEST_ITER_DELAY: "${MONOLITH_INGEST_ITER_DELAY}" + # Number of 4chan threads to request at once + MONOLITH_CH4_THREADS_CONCURRENT: "${MONOLITH_CH4_THREADS_CONCURRENT}" + # Time to wait between every MONOLITH_CH4_THREADS_CONCURRENT threads + MONOLITH_CH4_THREADS_DELAY: "${MONOLITH_CH4_THREADS_DELAY}" + # Time to wait after finishing a crawl before starting again + MONOLITH_CH4_CRAWL_DELAY: "${MONOLITH_CH4_CRAWL_DELAY}" + # Semaphore value + MONOLITH_CH4_THREADS_SEMAPHORE: "${MONOLITH_CH4_THREADS_SEMAPHORE}" + # Threads to use for data processing + # Leave uncommented to use all available threads + MONOLITH_PROCESS_THREADS: "${MONOLITH_PROCESS_THREADS}" + # Enable performance metrics after message processing + MONOLITH_PROCESS_PERFSTATS: "${MONOLITH_PROCESS_PERFSTATS}" + MONOLITH_CH4_BOARDS: "${MONOLITH_CH4_BOARDS}" + REDIS_PASSWORD: "${REDIS_PASSWORD}" + # for development + extra_hosts: + - "host.docker.internal:host-gateway" + network_mode: host ssdb: image: tsl0922/ssdb diff --git a/monolith.py b/monolith.py index a04f7d7..76a2309 100644 --- a/monolith.py +++ b/monolith.py @@ -1,8 +1,10 @@ import asyncio from os import getenv +from time import sleep import uvloop +import db import util from sources.ch4 import Chan4 from sources.dis import DiscordClient @@ -34,6 +36,17 @@ async def main(loop): loop.create_task(chan.run()) +created = False +while not created: + try: + db.create_index(db.api_client) + created = True + except Exception as e: + print(f"Error creating index: {e}") + sleep(1) # Block the thread, just wait for the DB +db.update_schema() + + loop = asyncio.get_event_loop() loop.create_task(main(loop)) diff --git a/oom b/oom new file mode 100644 index 0000000..e69de29 diff --git a/perf/__init__.py b/perf/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/perf/throttle.py b/perf/throttle.py index b909382..7be34a7 100644 --- a/perf/throttle.py +++ b/perf/throttle.py @@ -1,10 +1,12 @@ -import psutil import asyncio import time + +import psutil + import util -class DynamicThrottle(object): +class DynamicThrottle(object): def __init__(self, **kwargs): self.target_cpu_usage = kwargs.get("target_cpu_usage", 50) self.sleep_interval = 0.0 @@ -22,13 +24,14 @@ class DynamicThrottle(object): self.consecutive_increments = 0 self.consecutive_decrements = 0 - self.last_was_increment = False + self.consecutive_divisor = kwargs.get("consecutive_divisor", 1) - if kwargs.get("async"): - self.dynamic_throttle = self.dynamic_throttle_async + self.last_was_increment = kwargs.get("start_increment", True) + + if kwargs.get("use_async"): + self.wait = self.dynamic_throttle_async else: - self.dynamic_throttle = self.dynamic_throttle - + self.wait = self.dynamic_throttle async def dynamic_throttle_async(self): """ @@ -37,22 +40,48 @@ class DynamicThrottle(object): current_cpu_usage = psutil.cpu_percent(interval=self.psutil_interval) if current_cpu_usage > self.target_cpu_usage: - self.sleep_interval += self.sleep_increment + if self.last_was_increment: + self.consecutive_increments += 1 + # self.log.debug(f"High CPU consecutive increments: {self.consecutive_increments}") + else: + self.consecutive_increments = 0 # ? + self.consecutive_decrements = 0 # ? + # self.log.debug(f"High CPU alert reset.") + self.sleep_interval += self.sleep_increment * ( + max(1, self.consecutive_increments) / self.consecutive_divisor + ) + self.last_was_increment = True if self.sleep_interval > self.sleep_max: self.sleep_interval = self.sleep_max - self.log.info( - f"CPU {current_cpu_usage}% > {self.target_cpu_usage}%, " - f"=> sleep {self.sleep_interval:.3f}s" - ) - elif current_cpu_usage < self.target_cpu_usage and self.sleep_interval > self.sleep_min: - self.sleep_interval -= self.sleep_decrement - self.log.info( - f"CPU {current_cpu_usage}% < {self.target_cpu_usage}%, " - f"=> sleep {self.sleep_interval:.3f}s" + # self.log.debug(f"High CPU, but not increasing above {self.sleep_max:.3f}s") + # self.log.debug( + # f"High CPU: {current_cpu_usage}% > {self.target_cpu_usage}%, " + # f"=> sleep {self.sleep_interval:.3f}s" + # ) + elif current_cpu_usage < self.target_cpu_usage: + if not self.last_was_increment: + self.consecutive_decrements += 1 + # self.log.debug(f"Low CPU consecutive decrements: {self.consecutive_decrements}") + else: + self.consecutive_decrements = 0 # ? + self.consecutive_increments = 0 # ? + # self.log.debug(f"Low CPU alert reset.") + self.sleep_interval -= self.sleep_decrement * ( + max(1, self.consecutive_decrements) / self.consecutive_divisor ) + self.last_was_increment = False + if self.sleep_interval < self.sleep_min: + self.sleep_interval = self.sleep_min + # self.log.debug(f"Low CPU, but not decreasing below {self.sleep_min:.3f}s") + # self.log.debug( + # f"Low CPU: {current_cpu_usage}% < {self.target_cpu_usage}%, " + # f"=> sleep {self.sleep_interval:.3f}s" + # ) if self.sleep_interval > 0: await asyncio.sleep(self.sleep_interval) + return self.sleep_interval + return 0.0 def dynamic_throttle(self): """ @@ -61,19 +90,45 @@ class DynamicThrottle(object): current_cpu_usage = psutil.cpu_percent(interval=self.psutil_interval) if current_cpu_usage > self.target_cpu_usage: - self.sleep_interval += self.sleep_increment + if self.last_was_increment: + self.consecutive_increments += 1 + # self.log.debug(f"High CPU consecutive increments: {self.consecutive_increments}") + else: + self.consecutive_increments = 0 # ? + self.consecutive_decrements = 0 # ? + # self.log.debug(f"High CPU alert reset.") + self.sleep_interval += self.sleep_increment * ( + max(1, self.consecutive_increments) / self.consecutive_divisor + ) + self.last_was_increment = True if self.sleep_interval > self.sleep_max: self.sleep_interval = self.sleep_max - self.log.info( - f"CPU {current_cpu_usage}% > {self.target_cpu_usage}%, " - f"=> sleep {self.sleep_interval:.3f}s" - ) - elif current_cpu_usage < self.target_cpu_usage and self.sleep_interval > self.sleep_min: - self.sleep_interval -= self.sleep_decrement - self.log.info( - f"CPU {current_cpu_usage}% < {self.target_cpu_usage}%, " - f"=> sleep {self.sleep_interval:.3f}s" + # self.log.debug(f"High CPU, but not increasing above {self.sleep_max:.3f}s") + # self.log.debug( + # f"High CPU: {current_cpu_usage}% > {self.target_cpu_usage}%, " + # f"=> sleep {self.sleep_interval:.3f}s" + # ) + elif current_cpu_usage < self.target_cpu_usage: + if not self.last_was_increment: + self.consecutive_decrements += 1 + # self.log.debug(f"Low CPU consecutive decrements: {self.consecutive_decrements}") + else: + self.consecutive_decrements = 0 # ? + self.consecutive_increments = 0 # ? + # self.log.debug(f"Low CPU alert reset.") + self.sleep_interval -= self.sleep_decrement * ( + max(1, self.consecutive_decrements) / self.consecutive_divisor ) + self.last_was_increment = False + if self.sleep_interval < self.sleep_min: + self.sleep_interval = self.sleep_min + # self.log.debug(f"Low CPU, but not decreasing below {self.sleep_min:.3f}s") + # self.log.debug( + # f"Low CPU: {current_cpu_usage}% < {self.target_cpu_usage}%, " + # f"=> sleep {self.sleep_interval:.3f}s" + # ) if self.sleep_interval > 0: - time.sleep(self.sleep_interval) \ No newline at end of file + time.sleep(self.sleep_interval) + return self.sleep_interval + return 0.0 diff --git a/processing/ohlc.py b/processing/ohlc.py new file mode 100644 index 0000000..63d3973 --- /dev/null +++ b/processing/ohlc.py @@ -0,0 +1 @@ +# Resample 1Min into 5Min, 15Min, 30Min, 1H, 4H, 1D, 1W, 1M, 1Y diff --git a/processing/process.py b/processing/process.py index 949f819..1f810eb 100644 --- a/processing/process.py +++ b/processing/process.py @@ -8,9 +8,6 @@ import string # For timing import time -# For throttling -import psutil - # Squash errors import warnings from concurrent.futures import ProcessPoolExecutor @@ -50,6 +47,9 @@ from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer import db import util +# For throttling +from perf.throttle import DynamicThrottle + # 4chan schema from schemas.ch4_s import ATTRMAP @@ -62,8 +62,6 @@ MONOLITH_PROCESS_PERFSTATS = ( ) TARGET_CPU_USAGE = float(os.getenv("MONOLITH_PROCESS_TARGET_CPU_USAGE", 50.0)) -SLEEP_INTERVAL = 0.0 - CUSTOM_FILTERS = [ lambda x: x.lower(), strip_tags, # @@ -94,6 +92,19 @@ CPU_THREADS = int(os.getenv("MONOLITH_PROCESS_THREADS", os.cpu_count())) p = ProcessPoolExecutor(CPU_THREADS) +throttle = DynamicThrottle( + target_cpu_usage=TARGET_CPU_USAGE, + sleep_increment=0.02, + sleep_decrement=0.01, + sleep_max=0.5, + sleep_min=0, + psutil_interval=0.1, + consecutive_divisor=2, + log=log, + start_increment=True, + use_async=False, +) + def get_hash_key(): hash_key = db.r.get("hashing_key") @@ -136,7 +147,7 @@ async def spawn_processing_threads(chunk, length): # Join the results back from the split list flat_list = [item for sublist in results for item in sublist] total_messages = len(flat_list) - log.debug( + log.info( ( f"[{chunk}/{index}] Results from processing of {length} messages in " f"{cores} threads: {len(flat_list)}" @@ -149,7 +160,6 @@ async def spawn_processing_threads(chunk, length): def process_data(chunk, index, chunk_size): - global SLEEP_INTERVAL log.debug(f"[{chunk}/{index}] Processing {chunk_size} messages") to_store = [] @@ -159,7 +169,6 @@ def process_data(chunk, index, chunk_size): date_time = 0.0 nlp_time = 0.0 normalise_time = 0.0 - hash_time = 0.0 normal2_time = 0.0 soup_time = 0.0 sleep_time = 0.0 @@ -170,11 +179,28 @@ def process_data(chunk, index, chunk_size): analyzer = SentimentIntensityAnalyzer() for msg_index in range(chunk_size): + # Print percentage of msg_index relative to chunk_size + if msg_index % 10 == 0: + percentage_done = (msg_index / chunk_size) * 100 + log.debug( + f"[{chunk}/{index}] {percentage_done:.2f}% done ({msg_index}/{chunk_size})" + ) + msg = db.r.rpop(KEYNAME) if not msg: return - # TODO: msgpack msg = orjson.loads(msg) + if msg["src"] == "4ch": + board = msg["net"] + thread = msg["channel"] + redis_key = ( + f"cache.{board}.{thread}.{msg['no']}.{msg['resto']}.{msg['now']}" + ) + key_content = db.r.get(redis_key) + if key_content is not None: + continue + db.r.set(redis_key, "1") + total_start = time.process_time() # normalise fields start = time.process_time() @@ -200,29 +226,6 @@ def process_data(chunk, index, chunk_size): board = msg["net"] thread = msg["channel"] - # Calculate hash for post - start = time.process_time() - post_normalised = orjson.dumps(msg, option=orjson.OPT_SORT_KEYS) - hash = siphash(hash_key, post_normalised) - hash = str(hash) - redis_key = ( - f"cache.{board}.{thread}.{msg['no']}.{msg['resto']}.{msg['now']}" - ) - key_content = db.r.get(redis_key) - if key_content is not None: - key_content = key_content.decode("ascii") - if key_content == hash: - # This deletes the message since the append at the end won't be hit - continue - # pass - else: - # msg["type"] = "update" - # Fuck it, updates just brew spam - continue - db.r.set(redis_key, hash) - time_took = (time.process_time() - start) * 1000 - hash_time += time_took - start = time.process_time() for key2, value in list(msg.items()): if key2 in ATTRMAP: @@ -240,9 +243,10 @@ def process_data(chunk, index, chunk_size): old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S") else: old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M") - # new_ts = old_ts.isoformat() + # iso_ts = old_ts.isoformat() new_ts = int(old_ts.timestamp()) msg["ts"] = new_ts + # msg["iso"] = iso_ts else: raise Exception("No TS in msg") time_took = (time.process_time() - start) * 1000 @@ -302,39 +306,22 @@ def process_data(chunk, index, chunk_size): # Dynamic throttling to reduce CPU usage if msg_index % 5 == 0: - current_cpu_usage = psutil.cpu_percent(interval=0.2) - if current_cpu_usage > TARGET_CPU_USAGE: - SLEEP_INTERVAL += 0.02 - if SLEEP_INTERVAL > 0.5: - SLEEP_INTERVAL = 0.5 - log.info( - f"CPU {current_cpu_usage}% > {TARGET_CPU_USAGE}%, " - f"=> sleep {SLEEP_INTERVAL:.3f}s" - ) - elif current_cpu_usage < TARGET_CPU_USAGE and SLEEP_INTERVAL > 0.01: - SLEEP_INTERVAL -= 0.01 - log.info( - f"CPU {current_cpu_usage}% < {TARGET_CPU_USAGE}%, " - f"=> sleep {SLEEP_INTERVAL:.3f}s" - ) - time.sleep(SLEEP_INTERVAL) - sleep_time += SLEEP_INTERVAL + sleep_time += throttle.wait() if MONOLITH_PROCESS_PERFSTATS: - log.debug("=====================================") - log.debug(f"Chunk: {chunk}") - log.debug(f"Index: {index}") - log.debug(f"Sentiment: {sentiment_time}") - log.debug(f"Regex: {regex_time}") - log.debug(f"Polyglot: {polyglot_time}") - log.debug(f"Date: {date_time}") - log.debug(f"NLP: {nlp_time}") - log.debug(f"Normalise: {normalise_time}") - log.debug(f"Hash: {hash_time}") - log.debug(f"Normal2: {normal2_time}") - log.debug(f"Soup: {soup_time}") - log.debug(f"Total: {total_time}") - log.debug(f"Throttling: {sleep_time}") - log.debug("=====================================") + log.info("=====================================") + log.info(f"Chunk: {chunk}") + log.info(f"Index: {index}") + log.info(f"Sentiment: {sentiment_time}") + log.info(f"Regex: {regex_time}") + log.info(f"Polyglot: {polyglot_time}") + log.info(f"Date: {date_time}") + log.info(f"NLP: {nlp_time}") + log.info(f"Normalise: {normalise_time}") + log.info(f"Normal2: {normal2_time}") + log.info(f"Soup: {soup_time}") + log.info(f"Total: {total_time}") + log.info(f"Throttling: {sleep_time}") + log.info("=====================================") return to_store diff --git a/requirements.txt b/requirements.txt index 2e3a048..11065a1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -24,3 +24,6 @@ elasticsearch[async] msgpack # flpc psutil +pymexc +websockets +aiomysql diff --git a/rts.py b/rts.py index 2693df1..eb51d8b 100644 --- a/rts.py +++ b/rts.py @@ -1,31 +1,186 @@ import asyncio +import logging from os import getenv -import uvloop +import orjson +import websockets -import util +import db -# Use UVLoop -asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) +# Logger setup +logging.basicConfig(level=logging.INFO) +log = logging.getLogger("RTS") -log = util.get_logger("rts") +# Environment variables +MONOLITH_RTS_MEXC_API_ACCESS_KEY = getenv("MONOLITH_RTS_MEXC_API_ACCESS_KEY", None) +MONOLITH_RTS_MEXC_API_SECRET_KEY = getenv("MONOLITH_RTS_MEXC_API_SECRET_KEY", None) -modules_enabled = getenv("MODULES_ENABLED", False) -if "rts" not in modules_enabled: - log.info("RTS disabled.") - exit(0) +# WebSocket endpoint +MEXC_WS_URL = "wss://wbs.mexc.com/ws" + +{ + "d": { + "e": "spot@public.kline.v3.api", + "k": { + "t": 1737901140, # TS + "o": "684.4", # Open + "c": "684.5", # Close + "h": "684.5", # High + "l": "684.4", # Low + "v": "0.173", # Volume of the base + "a": "118.41", # Volume of the quote (Quantity) + "T": 1737901200, # ? + "i": "Min1", # ? + }, + }, + "c": "spot@public.kline.v3.api@BNBUSDT@Min1", # Channel + "t": 1737901159239, + "s": "BNBUSDT", # Symbol +} + +# Scan DB for last endtime (T) +# Request Kline data from last endtime (T) to now -async def main(loop): - log.info("RTS started.") +# Check Server Time + +# Response + +# { +# "serverTime" : 1645539742000 +# } + +# GET /api/v3/time + +# Weight(IP): 1 + +# Parameter: + +# NONE + +# Kline/Candlestick Data + +# Response + +# [ +# [ +# 1640804880000, +# "47482.36", +# "47482.36", +# "47416.57", +# "47436.1", +# "3.550717", +# 1640804940000, +# "168387.3" +# ] +# ] + +# GET /api/v3/klines + +# Weight(IP): 1 + +# Kline/candlestick bars for a symbol. Klines are uniquely identified by their open time. + +# Parameters: +# Name Type Mandatory Description +# symbol string YES +# interval ENUM YES ENUM: Kline Interval +# startTime long NO +# endTime long NO +# limit integer NO Default 500; max 1000. + +# Scrub function: +# For each record, ensure there are no time gaps +# When the 1m window goes over, the next t is always the last T. +# Check for gaps, and request all klines between those gaps to ensure a full DB, even with restarts. -loop = asyncio.get_event_loop() -loop.create_task(main(loop)) +# Idle jitter function - compare our time with server time. +# Compare ts with our time and print jitter. Add jitter warning to log and OHLC. +# High jitter may prevent us from getting the correct data for trading. +async def mex_handle(data): + message = orjson.loads(data) + # print(orjson.dumps(message, option=orjson.OPT_INDENT_2).decode("utf-8")) + if "code" in message: + if message["code"] == 0: + log.info("Control message received") + return -try: - loop.run_forever() -except KeyboardInterrupt: - log.info("RTS process terminating") -finally: - loop.close() + symbol = message["s"] + open = message["d"]["k"]["o"] + close = message["d"]["k"]["c"] + high = message["d"]["k"]["h"] + low = message["d"]["k"]["l"] + volume_base = message["d"]["k"]["v"] # ERROR IN API DOCS + volume_quote = message["d"]["k"]["a"] # > a bigDecimal volume + + interval = message["d"]["k"]["i"] + + start_time = message["d"]["k"]["t"] # > t long stratTime + end_time = message["d"]["k"]["T"] # > T long endTime + event_time = message["t"] # t long eventTime + + index = f"mex_ohlc_{symbol.lower()}" + + reformatted = { + "s": symbol, + "o": float(open), + "c": float(close), + "h": float(high), + "l": float(low), + "v": float(volume_base), + "a": float(volume_quote), + "i": interval, + "t": int(start_time), + "t2": int(end_time), + "ts": int(event_time), + } + + await db.rts_store_message(index, reformatted) + print(index) + print(orjson.dumps(reformatted, option=orjson.OPT_INDENT_2).decode("utf-8")) + print() + + +# Kline WebSocket handler +async def mex_main(): + await db.init_mysql_pool() + async with websockets.connect(MEXC_WS_URL) as websocket: + log.info("WebSocket connected") + + # Define symbols and intervals + symbols = ["BTCUSDT"] # Add more symbols as needed + interval = "Min1" # Kline interval + + # Prepare subscription requests for Kline streams + # Request: spot@public.kline.v3.api@@ + subscriptions = [ + f"spot@public.kline.v3.api@{symbol}@{interval}" for symbol in symbols + ] + + # Send subscription requests + subscribe_request = { + "method": "SUBSCRIPTION", + "params": subscriptions, + # "id": 1, + } + await websocket.send(orjson.dumps(subscribe_request).decode("utf-8")) + + log.info(f"Subscribed to: {subscriptions}") + + # Listen for messages + while True: + try: + message = await websocket.recv() + await mex_handle(message) + except websockets.exceptions.ConnectionClosed as e: + log.error(f"WebSocket connection closed: {e}") + break + + +# Entry point +if __name__ == "__main__": + try: + asyncio.run(mex_main()) + except KeyboardInterrupt: + log.info("RTS process terminated.") diff --git a/schemas/mc_s.py b/schemas/mc_s.py index 14d7950..c5c63d9 100644 --- a/schemas/mc_s.py +++ b/schemas/mc_s.py @@ -137,9 +137,11 @@ schema_main = { "rule_id": "bigint", "index": "string indexed attribute", "meta": "text", - + # "iso": "string indexed attribute", } +schema_rule_storage = schema_main + schema_meta = { "id": "bigint", # 393598265, #main, Rust Programmer's Club diff --git a/sources/ch4.py b/sources/ch4.py index 98a34d7..2e60897 100644 --- a/sources/ch4.py +++ b/sources/ch4.py @@ -8,10 +8,9 @@ from os import getenv import aiohttp from numpy import array_split -import psutil - import db import util +from perf.throttle import DynamicThrottle # CONFIGURATION # @@ -35,6 +34,7 @@ BOARDS = getenv("MONOLITH_CH4_BOARDS", "").split(",") # CONFIGURATION END # + class Chan4(object): """ 4chan indexer, crawler and ingester. @@ -44,7 +44,18 @@ class Chan4(object): name = self.__class__.__name__ self.log = util.get_logger(name) - self.sleep_interval = 0.0 + self.throttle = DynamicThrottle( + target_cpu_usage=TARGET_CPU_USAGE, + sleep_increment=0.01, + sleep_decrement=0.01, + sleep_max=0.1, + sleep_min=0, + psutil_interval=0.1, + log=self.log, + start_increment=False, + use_async=True, + ) + self.wait = self.throttle.wait self.api_endpoint = "https://a.4cdn.org" # self.boards = ["out", "g", "a", "3", "pol"] # @@ -65,33 +76,6 @@ class Chan4(object): self.hash_key = self.hash_key.decode("ascii") self.log.debug(f"Decoded hash key: {self.hash_key}") - async def dynamic_throttle(self): - """ - Dynamically sleeps before a request if CPU usage is above our target. - Also, if CPU usage is far below the target, reduce the sleep time. - Caps the sleep interval at 0.2s. - Prints CPU usage and sleep interval like process.py. - """ - current_cpu_usage = psutil.cpu_percent(interval=0.2) - - if current_cpu_usage > TARGET_CPU_USAGE: - self.sleep_interval += 0.01 - if self.sleep_interval > 0.1: - self.sleep_interval = 0.1 - self.log.info( - f"CPU {current_cpu_usage}% > {TARGET_CPU_USAGE}%, " - f"=> sleep {self.sleep_interval:.3f}s" - ) - elif current_cpu_usage < TARGET_CPU_USAGE and self.sleep_interval > 0.01: - self.sleep_interval -= 0.01 - self.log.info( - f"CPU {current_cpu_usage}% < {TARGET_CPU_USAGE}%, " - f"=> sleep {self.sleep_interval:.3f}s" - ) - - if self.sleep_interval > 0: - await asyncio.sleep(self.sleep_interval) - async def run(self): if "ALL" in BOARDS: await self.get_board_list() @@ -204,7 +188,7 @@ class Chan4(object): async def bound_fetch(self, sem, url, session, mapped): # Getter function with semaphore. async with sem: - await self.dynamic_throttle() + await self.wait() try: return await self.fetch(url, session, mapped) except: # noqa diff --git a/sources/ingest.py b/sources/ingest.py index 603fdf0..8d6534f 100644 --- a/sources/ingest.py +++ b/sources/ingest.py @@ -21,6 +21,7 @@ log = util.get_logger("ingest") INGEST_MAX = int(getenv("MONOLITH_INGEST_MAX", "1000000")) INGEST_MIN = int(getenv("MONOLITH_INGEST_MIN", "100")) + class Ingest(object): def __init__(self): name = self.__class__.__name__ @@ -51,7 +52,7 @@ class Ingest(object): if ingested < INGEST_INCREASE_BELOW: if CHUNK_SIZE + INGEST_INCREASE_BY < INGEST_MAX: - self.log.info( + self.log.debug( ( f"Increasing chunk size to " f"{CHUNK_SIZE + INGEST_INCREASE_BY} " @@ -60,11 +61,13 @@ class Ingest(object): ) CHUNK_SIZE += INGEST_INCREASE_BY else: - log.info(f"Chunk size ({CHUNK_SIZE}) at maximum, not increasing above: {INGEST_MAX}") + log.debug( + f"Chunk size ({CHUNK_SIZE}) at maximum, not increasing above: {INGEST_MAX}" + ) elif ingested > INGEST_DECREASE_ABOVE: if CHUNK_SIZE - INGEST_DECREASE_BY > INGEST_MIN: - self.log.info( + self.log.debug( ( f"Decreasing chunk size to " f"{CHUNK_SIZE - INGEST_DECREASE_BY}" @@ -73,4 +76,6 @@ class Ingest(object): ) CHUNK_SIZE -= INGEST_DECREASE_BY else: - log.info(f"Chunk size ({CHUNK_SIZE}) at minimum, not decreasing below: {INGEST_MIN}") + log.debug( + f"Chunk size ({CHUNK_SIZE}) at minimum, not decreasing below: {INGEST_MIN}" + )