Compare commits

..

31 Commits

Author SHA1 Message Date
81f05d4263 Begin implementing RTS 2026-02-17 12:14:29 +00:00
dc533f266f Add psutil to requirements 2025-01-24 12:18:17 +00:00
ea4b5e6321 Begin implementation of dynamic throttling framework 2025-01-24 12:17:43 +00:00
54ecfbae64 Add throttling for performance 2025-01-24 12:17:22 +00:00
352909bec0 Begin implementing RTS process 2025-01-23 11:30:01 +00:00
1cc2ef629e Automatic optimisation for ingest chunking based on processed messages 2025-01-23 11:29:48 +00:00
1aeadaf3b7 Stringify tokens and return message number from processing 2025-01-23 11:29:07 +00:00
ba8c33d8fc Update socket and database URLs 2025-01-23 11:27:51 +00:00
2ef2249be7 Update container files to work with Podman 2025-01-23 11:26:58 +00:00
054e9caca0 Update to run with Podman 2024-12-29 17:35:57 +00:00
5ea4e5f460 Bump versions in pre-commit config 2023-02-09 07:20:13 +00:00
210237b50a Update pre-commit versions 2023-02-09 07:20:13 +00:00
87b81ac236 Retry Redis ingest if it failed 2023-01-13 07:20:27 +00:00
02071758b5 Send messages to Neptune Redis via PubSub 2023-01-12 07:20:43 +00:00
0ab67becff Give option for only crawling some boards 2022-12-22 07:20:26 +00:00
ebaf8c765d Allow disabling modules in environment variables 2022-12-22 07:20:26 +00:00
ce2d7684bc Run ingest task first 2022-12-22 10:11:48 +00:00
508b00e471 Pre-create meta index 2022-11-23 19:02:31 +00:00
9d1e4b44e8 Add pathogen network 2022-11-23 18:23:20 +00:00
371bce1094 Remove print statements 2022-11-22 21:43:56 +00:00
be0cf231b4 Fix mapping and make Threshold talk to SSDB 2022-11-22 21:42:35 +00:00
1993c8f1d2 Use Portainer Git directory for Redis config 2022-11-22 21:19:13 +00:00
9d35930b3b Re-add Portainer Git dir 2022-11-22 21:15:19 +00:00
34346006ab Remove leftover Docker files in legacy 2022-11-22 21:11:46 +00:00
42a3f5da03 Remove tarred Docker definition 2022-11-22 07:20:52 +00:00
42657aeee0 Lower Python version due to gensim incompatibility 2022-11-22 21:07:34 +00:00
1c34aa4a01 Clean up legacy and debugging code 2022-11-22 07:20:27 +00:00
6f3db61532 Add debugging statements for Portainer 2022-11-22 20:37:58 +00:00
3c18858c48 Use relative path for code 2022-11-22 20:35:13 +00:00
78c6ef96d2 Use relative path for build directory 2022-11-22 20:30:41 +00:00
d2a174c1c4 Remove Portainer Git volume 2022-11-22 20:28:44 +00:00
32 changed files with 1090 additions and 731 deletions

View File

@@ -1,18 +1,20 @@
repos: repos:
- repo: https://github.com/psf/black - repo: https://github.com/psf/black
rev: 22.6.0 rev: 23.1.0
hooks: hooks:
- id: black - id: black
exclude: ^core/migrations
- repo: https://github.com/PyCQA/isort - repo: https://github.com/PyCQA/isort
rev: 5.10.1 rev: 5.11.5
hooks: hooks:
- id: isort - id: isort
args: ["--profile", "black"] args: ["--profile", "black"]
- repo: https://github.com/PyCQA/flake8 - repo: https://github.com/PyCQA/flake8
rev: 4.0.1 rev: 6.0.0
hooks: hooks:
- id: flake8 - id: flake8
args: [--max-line-length=88] args: [--max-line-length=88]
exclude: ^core/migrations
- repo: https://github.com/sirwart/ripsecrets.git - repo: https://github.com/sirwart/ripsecrets.git
rev: v0.1.5 rev: v0.1.5
hooks: hooks:

View File

@@ -1,19 +1,19 @@
# syntax=docker/dockerfile:1 # syntax=docker/dockerfile:1
FROM python:3 FROM python:3.10
RUN useradd -d /code pathogen RUN useradd -d /code xf
RUN mkdir /code RUN mkdir /code
RUN chown pathogen:pathogen /code RUN chown xf:xf /code
RUN mkdir /venv RUN mkdir /venv
RUN chown pathogen:pathogen /venv RUN chown xf:xf /venv
USER pathogen USER xf
ENV PYTHONDONTWRITEBYTECODE=1 ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1 ENV PYTHONUNBUFFERED=1
WORKDIR /code WORKDIR /code
COPY requirements.txt /code/ COPY requirements.txt /code/
COPY discord-patched.tgz /code/ COPY docker/discord-patched.tgz /code/
RUN python -m venv /venv RUN python -m venv /venv
RUN . /venv/bin/activate && pip install -r requirements.txt RUN . /venv/bin/activate && pip install -r requirements.txt

View File

@@ -1,20 +1,20 @@
run: run:
docker-compose -f docker/docker-compose.prod.yml --env-file=stack.env up -d docker-compose -f docker-compose.prod.yml --env-file=stack.env up -d
build: build:
docker-compose -f docker/docker-compose.prod.yml --env-file=stack.env build docker-compose -f docker-compose.prod.yml --env-file=stack.env build
stop: stop:
docker-compose -f docker/docker-compose.prod.yml --env-file=stack.env down docker-compose -f docker-compose.prod.yml --env-file=stack.env down
log: log:
docker-compose -f docker/docker-compose.prod.yml --env-file=stack.env logs -f docker-compose -f docker-compose.prod.yml --env-file=stack.env logs -f --names
run-infra: run-infra:
docker-compose -f docker/docker-compose.infra.yml --env-file=stack.env up -d docker-compose -f docker-compose.infra.yml --env-file=stack.env up -d
stop-infra: stop-infra:
docker-compose -f docker/docker-compose.infra.yml --env-file=stack.env down docker-compose -f docker-compose.infra.yml --env-file=stack.env down
log-infra: log-infra:
docker-compose -f docker/docker-compose.infra.yml --env-file=stack.env logs -f docker-compose -f docker-compose.infra.yml --env-file=stack.env logs -f

0
clients/mexc.py Normal file
View File

273
db.py
View File

@@ -1,25 +1,52 @@
import asyncio
from math import ceil
from os import getenv from os import getenv
from time import sleep
import aiomysql
import aioredis import aioredis
import manticoresearch
import msgpack
import orjson import orjson
import redis from manticoresearch.rest import ApiException
from numpy import array_split
# Elasticsearch from redis import StrictRedis
from elasticsearch import AsyncElasticsearch
import util import util
from schemas import mc_s
trues = ("true", "1", "t", True) mysql_pool = None
# INDEX = "msg"
configuration = manticoresearch.Configuration(host="http://127.0.0.1:9308")
api_client = manticoresearch.ApiClient(configuration)
api_instance = manticoresearch.IndexApi(api_client)
log = util.get_logger("db") log = util.get_logger("db")
# Redis (legacy) # Redis (legacy)
r = redis.from_url("redis://ssdb:1289", db=0) # r = StrictRedis(unix_socket_path="/var/run/redis/redis.sock", db=0)
r = StrictRedis(
host="127.0.0.1", # Replace with your Redis server's IP address
port=1289, # Replace with your Redis server's port
db=0, # Database number
)
# AIORedis # AIORedis
ar = aioredis.from_url("redis://ssdb:1289", db=0) # ar = aioredis.from_url("unix:///var/run/redis/redis.sock", db=0)
ar = aioredis.from_url("redis://127.0.0.1:1289", db=0)
# /var/run/neptune-redis.sock
# db = 10
pr = aioredis.from_url("unix://var/run/neptune-redis.sock", db=10)
# fr = aioredis.from_url("unix://var/run/fisk-redis.sock", db=10)
fr = aioredis.from_url("unix://var/run/redis.sock", db=10)
# pr = aioredis.from_url("redis://redis_neptune:6379", db=10, password=getenv("REDIS_PASSWORD"))
KEYNAME = "queue"
MESSAGE_KEY = "messages"
OHLC_MESSAGE_KEY = "ohlc"
TYPES_MAIN = [ TYPES_MAIN = [
"msg", "msg",
@@ -34,103 +61,172 @@ TYPES_MAIN = [
"topic", "topic",
"update", "update",
] ]
MAIN_SRC_MAP = {
"dis": "main",
"irc": "restricted",
"4ch": "main",
}
TYPES_META = ["who"] TYPES_META = ["who"]
TYPES_INT = ["conn", "highlight", "znc", "query", "self"] TYPES_INT = ["conn", "highlight", "znc", "query", "self"]
KEYNAME = "queue"
ELASTICSEARCH_USERNAME = getenv("ELASTICSEARCH_USERNAME", "elastic")
ELASTICSEARCH_PASSWORD = getenv("ELASTICSEARCH_PASSWORD", "changeme")
ELASTICSEARCH_HOST = getenv("ELASTICSEARCH_HOST", "localhost")
ELASTICSEARCH_TLS = getenv("ELASTICSEARCH_TLS", "false") in trues
client = None
# These are sometimes numeric, sometimes strings.
# If they are seen to be numeric first, ES will erroneously
# index them as "long" and then subsequently fail to index messages
# with strings in the field.
keyword_fields = ["nick_id", "user_id", "net_id"]
mapping = {
"mappings": {
"properties": {
"ts": {"type": "date", "format": "epoch_second"},
"file_tim": {"type": "date", "format": "epoch_millis"},
}
}
}
for field in keyword_fields:
mapping["mappings"]["properties"][field] = {"type": "text"}
async def initialise_elasticsearch(): async def init_mysql_pool():
""" """
Initialise the Elasticsearch client. Initialize the MySQL connection pool.
""" """
auth = (ELASTICSEARCH_USERNAME, ELASTICSEARCH_PASSWORD) global mysql_pool
client = AsyncElasticsearch(ELASTICSEARCH_HOST, http_auth=auth, verify_certs=False) mysql_pool = await aiomysql.create_pool(
for index in ("main", "restricted"): host="127.0.0.1", port=9306, db="Manticore", minsize=1, maxsize=10
if await client.indices.exists(index=index): )
# update index with mapping
await client.indices.put_mapping(
index=index, properties=mapping["mappings"]["properties"] async def rts_store_message(index, data):
) """
else: Store a RTS message into MySQL using an existing connection pool.
await client.indices.create(index=index, mappings=mapping["mappings"]) Prioritizes instant PubSub delivery, with minimal data storage overhead.
return client :param index: str
:param data: dict
"""
# Publish to Redis PubSub
packed_index = msgpack.packb({"index": index, "data": data}, use_bin_type=True)
try:
await fr.publish(OHLC_MESSAGE_KEY, packed_index)
except aioredis.exceptions.ConnectionError as e:
raise e
await asyncio.sleep(0.1)
# Insert data into MySQL
try:
async with mysql_pool.acquire() as conn:
async with conn.cursor() as cur:
# Insert data into the table
query = f"""
INSERT INTO {index} (s, o, c, h, l, v, a, i, t, t2, ts)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
# Bind the values directly
await cur.execute(
query,
(
data["s"], # symbol
data["o"], # open
data["c"], # close
data["h"], # high
data["l"], # low
data["v"], # volume_base
data["a"], # volume_quote
data["i"], # interval
data["t"], # start_time
data["t2"], # end_time
data["ts"], # event_time
),
)
await conn.commit()
log.debug(f"Stored data for {data['s']} in MySQL.")
except aiomysql.Error as e:
log.error(f"MySQL error: {e}")
async def store_batch(data): async def store_batch(data):
global client """
if not client: Store a message into Manticore
client = await initialise_elasticsearch() :param data: list
indexmap = {} """
for msg in data: if not data:
if msg["type"] in TYPES_MAIN: return
# index = "main" # 10000: maximum inserts we can submit to
index = MAIN_SRC_MAP[msg["src"]] # Manticore as of Sept 2022
# schema = mc_s.schema_main split_posts = array_split(data, ceil(len(data) / 10000))
elif msg["type"] in TYPES_META: for messages in split_posts:
index = "meta" total = []
# schema = mc_s.schema_meta indexmap = {}
elif msg["type"] in TYPES_INT: for msg in messages:
index = "internal" if msg["type"] in TYPES_MAIN:
# schema = mc_s.schema_int index = "main"
schema = mc_s.schema_main
elif msg["type"] in TYPES_META:
index = "meta"
schema = mc_s.schema_meta
elif msg["type"] in TYPES_INT:
index = "internal"
schema = mc_s.schema_int
# normalise fields
for key, value in list(msg.items()):
if value is None:
del msg[key]
if key in schema:
if isinstance(value, int):
if schema[key].startswith("string") or schema[key].startswith(
"text"
):
msg[key] = str(value)
INDEX = index body = {"insert": {"index": index, "doc": msg}}
total.append(body)
if "ts" not in msg:
raise Exception("No TS in msg")
if index not in indexmap:
indexmap[index] = [msg]
else:
indexmap[index].append(msg)
# END MSG IN MESSAGES
# if key in schema: # Pack the indexmap with msgpack and publish it to Neptune
# if isinstance(value, int): packed_index = msgpack.packb(indexmap, use_bin_type=True)
# if schema[key].startswith("string") or schema[key].startswith( completed_publish = False
# "text" for i in range(10):
# ): if completed_publish:
# msg[key] = str(value) break
# body = orjson.dumps(msg) try:
if "ts" not in msg: await pr.publish(MESSAGE_KEY, packed_index)
raise Exception("No TS in msg") completed_publish = True
if INDEX not in indexmap: except aioredis.exceptions.ConnectionError as e:
indexmap[INDEX] = [msg] raise e
else: await asyncio.sleep(0.1)
indexmap[INDEX].append(msg) if not completed_publish:
log.error("Failed to publish to Neptune")
for index, index_messages in indexmap.items(): body_post = ""
for message in index_messages: for item in total:
result = await client.index(index=index, body=message) # print("ITEM", item)
if not result["result"] == "created": body_post += orjson.dumps(item).decode("utf-8")
log.error(f"Indexing failed: {result}") body_post += "\n"
log.debug(f"Indexed {len(data)} messages in ES")
# print("BODY POST INDEX", index, body_post)
try:
# Bulk index operations
api_response = api_instance.bulk(body_post) # , async_req=True
except ApiException as e:
log.error("Exception when calling IndexApi->bulk: %s\n" % e)
log.error("body_post attempted to send", body_post)
log.info(f"Completed ingest to MC of length {len(total)}")
# END MESSAGES IN SPLIT
def update_schema():
pass
def create_index(api_client):
util_instance = manticoresearch.UtilsApi(api_client)
schemas = {
"main": mc_s.schema_main,
"rule_storage": mc_s.schema_rule_storage,
"meta": mc_s.schema_meta,
"internal": mc_s.schema_int,
}
for name, schema in schemas.items():
schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()])
create_query = (
f"create table if not exists {name}({schema_types}) engine='columnar'"
)
print("Schema types", create_query)
util_instance.sql(create_query)
async def queue_message(msg): async def queue_message(msg):
""" """
Queue a message on the Redis buffer. Queue a message on the Redis buffer.
""" """
# TODO: msgpack
message = orjson.dumps(msg) message = orjson.dumps(msg)
await ar.lpush(KEYNAME, message) await ar.lpush(KEYNAME, message)
@@ -140,5 +236,6 @@ async def queue_message_bulk(data):
Queue multiple messages on the Redis buffer. Queue multiple messages on the Redis buffer.
""" """
for msg in data: for msg in data:
# TODO: msgpack
message = orjson.dumps(msg) message = orjson.dumps(msg)
await ar.lpush(KEYNAME, message) await ar.lpush(KEYNAME, message)

174
db_old_ref.py Normal file
View File

@@ -0,0 +1,174 @@
import asyncio
from os import getenv
import aioredis
import msgpack
import orjson
import redis
# Elasticsearch
from elasticsearch import AsyncElasticsearch
import util
trues = ("true", "1", "t", True)
# INDEX = "msg"
log = util.get_logger("db")
# Redis (legacy)
# r = redis.from_url("redis://ssdb:1289", db=0)
# AIORedis
ar = aioredis.from_url("redis://ssdb:1289", db=0)
# Neptune redis for PubSub
pr = aioredis.from_url("redis://redis_neptune:6379", db=10)
TYPES_MAIN = [
"msg",
"notice",
"action",
"part",
"join",
"kick",
"quit",
"nick",
"mode",
"topic",
"update",
]
MAIN_SRC_MAP = {
"dis": "main",
"irc": "restricted",
"4ch": "main",
}
TYPES_META = ["who"]
TYPES_INT = ["conn", "highlight", "znc", "query", "self"]
KEYNAME = "queue"
MESSAGE_KEY = "messages"
ELASTICSEARCH_USERNAME = getenv("ELASTICSEARCH_USERNAME", "elastic")
ELASTICSEARCH_PASSWORD = getenv("ELASTICSEARCH_PASSWORD", "changeme")
ELASTICSEARCH_HOST = getenv("ELASTICSEARCH_HOST", "localhost")
ELASTICSEARCH_TLS = getenv("ELASTICSEARCH_TLS", "false") in trues
client = None
# These are sometimes numeric, sometimes strings.
# If they are seen to be numeric first, ES will erroneously
# index them as "long" and then subsequently fail to index messages
# with strings in the field.
keyword_fields = ["nick_id", "user_id", "net_id"]
mapping_int = {
"mappings": {
"properties": {
"ts": {"type": "date", "format": "epoch_second"},
"file_tim": {"type": "date", "format": "epoch_millis"},
}
}
}
mapping = dict(mapping_int)
for field in keyword_fields:
mapping["mappings"]["properties"][field] = {"type": "text"}
del mapping_int["mappings"]["properties"]["file_tim"]
async def initialise_elasticsearch():
"""
Initialise the Elasticsearch client.
"""
auth = (ELASTICSEARCH_USERNAME, ELASTICSEARCH_PASSWORD)
client = AsyncElasticsearch(ELASTICSEARCH_HOST, http_auth=auth, verify_certs=False)
for index in ("main", "meta", "restricted", "internal"):
if index == "internal":
map_dict = mapping_int
else:
map_dict = mapping
if await client.indices.exists(index=index):
# update index with mapping
await client.indices.put_mapping(
index=index, properties=map_dict["mappings"]["properties"]
)
else:
await client.indices.create(index=index, mappings=map_dict["mappings"])
return client
async def store_batch(data):
global client
if not client:
client = await initialise_elasticsearch()
indexmap = {}
for msg in data:
if msg["type"] in TYPES_MAIN:
# index = "main"
index = MAIN_SRC_MAP[msg["src"]]
# schema = mc_s.schema_main
elif msg["type"] in TYPES_META:
index = "meta"
# schema = mc_s.schema_meta
elif msg["type"] in TYPES_INT:
index = "internal"
# schema = mc_s.schema_int
INDEX = index
# if key in schema:
# if isinstance(value, int):
# if schema[key].startswith("string") or schema[key].startswith(
# "text"
# ):
# msg[key] = str(value)
# body = orjson.dumps(msg)
if "ts" not in msg:
raise Exception("No TS in msg")
if INDEX not in indexmap:
indexmap[INDEX] = [msg]
else:
indexmap[INDEX].append(msg)
# Pack the indexmap with msgpack and publish it to Neptune
packed_index = msgpack.packb(indexmap, use_bin_type=True)
completed_publish = False
for i in range(10):
if completed_publish:
break
try:
await pr.publish(MESSAGE_KEY, packed_index)
completed_publish = True
except aioredis.exceptions.ConnectionError:
await asyncio.sleep(0.1)
if not completed_publish:
log.error("Failed to publish to Neptune")
for index, index_messages in indexmap.items():
for message in index_messages:
result = await client.index(index=index, body=message)
if not result["result"] == "created":
log.error(f"Indexing failed: {result}")
log.debug(f"Indexed {len(data)} messages in ES")
async def queue_message(msg):
"""
Queue a message on the Redis buffer.
"""
# TODO: msgpack
message = orjson.dumps(msg)
await ar.lpush(KEYNAME, message)
async def queue_message_bulk(data):
"""
Queue multiple messages on the Redis buffer.
"""
for msg in data:
# TODO: msgpack
message = orjson.dumps(msg)
await ar.lpush(KEYNAME, message)

206
docker-compose.prod.yml Normal file
View File

@@ -0,0 +1,206 @@
version: "2.2"
services:
rts:
image: xf/monolith:latest
container_name: rts_monolith
command: sh -c '. /venv/bin/activate && exec python rts.py'
build: .
volumes:
- ${PORTAINER_GIT_DIR}:/code
- type: bind
source: /code/run
target: /var/run
environment:
PORTAINER_GIT_DIR: "${PORTAINER_GIT_DIR}"
MODULES_ENABLED: "${MODULES_ENABLED}"
MONOLITH_RTS_MEXC_API_ACCESS_KEY: "${MONOLITH_RTS_MEXC_API_ACCESS_KEY}"
MONOLITH_RTS_MEXC_API_SECRET_KEY: "${MONOLITH_RTS_MEXC_API_SECRET_KEY}"
deploy:
resources:
limits:
cpus: '0.5'
memory: 1.0G
network_mode: host
app:
image: xf/monolith:latest
container_name: monolith
#command: sh -c '. /venv/bin/activate && exec python -m cProfile -o /tmp/profile.out monolith.py'
build: .
volumes:
- ${PORTAINER_GIT_DIR}:/code
- type: bind
source: /code/run
target: /var/run
environment:
PORTAINER_GIT_DIR: "${PORTAINER_GIT_DIR}"
MODULES_ENABLED: "${MODULES_ENABLED}"
DISCORD_TOKEN: "${DISCORD_TOKEN}"
THRESHOLD_LISTENER_HOST: "${THRESHOLD_LISTENER_HOST}"
THRESHOLD_LISTENER_PORT: "${THRESHOLD_LISTENER_PORT}"
THRESHOLD_LISTENER_SSL: "${THRESHOLD_LISTENER_SSL}"
THRESHOLD_RELAY_ENABLED: "${THRESHOLD_RELAY_ENABLED}"
THRESHOLD_RELAY_HOST: "${THRESHOLD_RELAY_HOST}"
THRESHOLD_RELAY_PORT: "${THRESHOLD_RELAY_PORT}"
THRESHOLD_RELAY_SSL: "${THRESHOLD_RELAY_SSL}"
THRESHOLD_API_ENABLED: "${THRESHOLD_API_ENABLED}"
THRESHOLD_API_HOST: "${THRESHOLD_API_HOST}"
THRESHOLD_API_PORT: "${THRESHOLD_API_PORT}"
THRESHOLD_CONFIG_DIR: "${THRESHOLD_CONFIG_DIR}"
#THRESHOLD_TEMPLATE_DIR: "${#THRESHOLD_TEMPLATE_DIR}"
THRESHOLD_CERT_DIR: "${THRESHOLD_CERT_DIR}"
# How many messages to ingest at once from Redis
MONOLITH_INGEST_CHUNK_SIZE: "${MONOLITH_INGEST_CHUNK_SIZE}"
# Time to wait between polling Redis again
MONOLITH_INGEST_ITER_DELAY: "${MONOLITH_INGEST_ITER_DELAY}"
# Number of 4chan threads to request at once
MONOLITH_CH4_THREADS_CONCURRENT: "${MONOLITH_CH4_THREADS_CONCURRENT}"
# Time to wait between every MONOLITH_CH4_THREADS_CONCURRENT threads
MONOLITH_CH4_THREADS_DELAY: "${MONOLITH_CH4_THREADS_DELAY}"
# Time to wait after finishing a crawl before starting again
MONOLITH_CH4_CRAWL_DELAY: "${MONOLITH_CH4_CRAWL_DELAY}"
# Semaphore value
MONOLITH_CH4_THREADS_SEMAPHORE: "${MONOLITH_CH4_THREADS_SEMAPHORE}"
# Threads to use for data processing
# Leave uncommented to use all available threads
MONOLITH_PROCESS_THREADS: "${MONOLITH_PROCESS_THREADS}"
# Enable performance metrics after message processing
MONOLITH_PROCESS_PERFSTATS: "${MONOLITH_PROCESS_PERFSTATS}"
MONOLITH_PROCESS_TARGET_CPU_USAGE: "${MONOLITH_PROCESS_TARGET_CPU_USAGE}"
MONOLITH_CH4_TARGET_CPU_USAGE: "${MONOLITH_CH4_TARGET_CPU_USAGE}"
MONOLITH_CH4_BOARDS: "${MONOLITH_CH4_BOARDS}"
REDIS_PASSWORD: "${REDIS_PASSWORD}"
MONOLITH_INGEST_INCREASE_BELOW: "${MONOLITH_INGEST_INCREASE_BELOW}"
MONOLITH_INGEST_INCREASE_BY: "${MONOLITH_INGEST_INCREASE_BY}"
MONOLITH_INGEST_DECREASE_ABOVE: "${MONOLITH_INGEST_DECREASE_ABOVE}"
MONOLITH_INGEST_DECREASE_BY: "${MONOLITH_INGEST_DECREASE_BY}"
MONOLITH_INGEST_MAX: "${MONOLITH_INGEST_MAX}"
MONOLITH_INGEST_MIN: "${MONOLITH_INGEST_MIN}"
deploy:
resources:
limits:
cpus: '0.5'
memory: 1.0G
network_mode: host
threshold:
image: xf/threshold:latest
container_name: threshold
build: legacy/docker
volumes:
- ${PORTAINER_GIT_DIR}:/code
- ${THRESHOLD_CONFIG_DIR}:/code/legacy/conf/live
#- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates
- ${THRESHOLD_CERT_DIR}:/code/legacy/conf/cert
volumes_from:
- tmp
ports:
- "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}"
- "${THRESHOLD_RELAY_PORT}:${THRESHOLD_RELAY_PORT}"
- "${THRESHOLD_API_PORT}:${THRESHOLD_API_PORT}"
environment:
PORTAINER_GIT_DIR: "${PORTAINER_GIT_DIR}"
MODULES_ENABLED: "${MODULES_ENABLED}"
DISCORD_TOKEN: "${DISCORD_TOKEN}"
THRESHOLD_LISTENER_HOST: "${THRESHOLD_LISTENER_HOST}"
THRESHOLD_LISTENER_PORT: "${THRESHOLD_LISTENER_PORT}"
THRESHOLD_LISTENER_SSL: "${THRESHOLD_LISTENER_SSL}"
THRESHOLD_RELAY_ENABLED: "${THRESHOLD_RELAY_ENABLED}"
THRESHOLD_RELAY_HOST: "${THRESHOLD_RELAY_HOST}"
THRESHOLD_RELAY_PORT: "${THRESHOLD_RELAY_PORT}"
THRESHOLD_RELAY_SSL: "${THRESHOLD_RELAY_SSL}"
THRESHOLD_API_ENABLED: "${THRESHOLD_API_ENABLED}"
THRESHOLD_API_HOST: "${THRESHOLD_API_HOST}"
THRESHOLD_API_PORT: "${THRESHOLD_API_PORT}"
THRESHOLD_CONFIG_DIR: "${THRESHOLD_CONFIG_DIR}"
#THRESHOLD_TEMPLATE_DIR: "${#THRESHOLD_TEMPLATE_DIR}"
THRESHOLD_CERT_DIR: "${THRESHOLD_CERT_DIR}"
# How many messages to ingest at once from Redis
MONOLITH_INGEST_CHUNK_SIZE: "${MONOLITH_INGEST_CHUNK_SIZE}"
# Time to wait between polling Redis again
MONOLITH_INGEST_ITER_DELAY: "${MONOLITH_INGEST_ITER_DELAY}"
# Number of 4chan threads to request at once
MONOLITH_CH4_THREADS_CONCURRENT: "${MONOLITH_CH4_THREADS_CONCURRENT}"
# Time to wait between every MONOLITH_CH4_THREADS_CONCURRENT threads
MONOLITH_CH4_THREADS_DELAY: "${MONOLITH_CH4_THREADS_DELAY}"
# Time to wait after finishing a crawl before starting again
MONOLITH_CH4_CRAWL_DELAY: "${MONOLITH_CH4_CRAWL_DELAY}"
# Semaphore value
MONOLITH_CH4_THREADS_SEMAPHORE: "${MONOLITH_CH4_THREADS_SEMAPHORE}"
# Threads to use for data processing
# Leave uncommented to use all available threads
MONOLITH_PROCESS_THREADS: "${MONOLITH_PROCESS_THREADS}"
# Enable performance metrics after message processing
MONOLITH_PROCESS_PERFSTATS: "${MONOLITH_PROCESS_PERFSTATS}"
MONOLITH_CH4_BOARDS: "${MONOLITH_CH4_BOARDS}"
REDIS_PASSWORD: "${REDIS_PASSWORD}"
# for development
extra_hosts:
- "host.docker.internal:host-gateway"
network_mode: host
ssdb:
image: tsl0922/ssdb
container_name: ssdb_monolith
ports:
- "1289:1289"
environment:
- SSDB_PORT=1289
volumes:
- monolith_ssdb_data:/var/lib/ssdb
# networks:
# - default
# - db
deploy:
resources:
limits:
cpus: '0.5'
memory: 1.0G
network_mode: host
redis:
image: redis
container_name: redis_monolith
command: redis-server /etc/redis.conf
ulimits:
nproc: 65535
nofile:
soft: 65535
hard: 65535
volumes:
- ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf
- monolith_redis_data:/data
- type: bind
source: /code/run
target: /var/run
# volumes_from:
# - tmp
healthcheck:
test: "redis-cli ping"
interval: 2s
timeout: 2s
retries: 15
# networks:
# - default
# - xf
# - db
deploy:
resources:
limits:
cpus: '0.5'
memory: 1.0G
network_mode: host
# networks:
# default:
# driver: bridge
# xf:
# external: true
# db:
# external: true
volumes:
monolith_redis_data:
monolith_ssdb_data:

View File

@@ -1,86 +0,0 @@
version: "2.2"
services:
app:
image: pathogen/monolith:latest
container_name: monolith
build: ${PORTAINER_GIT_DIR}/docker
volumes:
- ${PORTAINER_GIT_DIR}:/code
env_file:
- ../stack.env
networks:
- default
- pathogen
- elastic
threshold:
image: pathogen/threshold:latest
container_name: threshold
build: ./legacy/docker
volumes:
- ${PORTAINER_GIT_DIR}:/code
- ${THRESHOLD_CONFIG_DIR}:/code/legacy/conf/live
#- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates
- ${THRESHOLD_CERT_DIR}:/code/legacy/conf/cert
volumes_from:
- tmp
ports:
- "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}"
- "${THRESHOLD_RELAY_PORT}:${THRESHOLD_RELAY_PORT}"
- "${THRESHOLD_API_PORT}:${THRESHOLD_API_PORT}"
env_file:
- ../stack.env
# for development
extra_hosts:
- "host.docker.internal:host-gateway"
networks:
- default
ssdb:
image: tsl0922/ssdb
container_name: ssdb_monolith
ports:
- "1289:1289"
environment:
- SSDB_PORT=1289
networks:
- default
tmp:
image: busybox
container_name: tmp_monolith
command: chmod -R 777 /var/run/socks
volumes:
- /var/run/socks
redis:
image: redis
container_name: redis_monolith
command: redis-server /etc/redis.conf
ulimits:
nproc: 65535
nofile:
soft: 65535
hard: 65535
volumes:
- ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf
- redis_data:/data
volumes_from:
- tmp
healthcheck:
test: "redis-cli -s /var/run/socks/redis.sock ping"
interval: 2s
timeout: 2s
retries: 15
networks:
default:
driver: bridge
pathogen:
external: true
elastic:
external: true
volumes:
redis_data:

View File

@@ -1,87 +0,0 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Java tuning
#DRUID_XMX=1g
#DRUID_XMS=1g
#DRUID_MAXNEWSIZE=250m
#DRUID_NEWSIZE=250m
#DRUID_MAXDIRECTMEMORYSIZE=1g
#druid_emitter_logging_logLevel=debug
#druid_extensions_loadList=["druid-histogram", "druid-datasketches", "druid-lookups-cached-global", "postgresql-metadata-storage", "druid-kafka-indexing-service"]
#druid_zk_service_host=zookeeper
#druid_metadata_storage_host=
#druid_metadata_storage_type=postgresql
#druid_metadata_storage_connector_connectURI=jdbc:postgresql://postgres:5432/druid
#druid_metadata_storage_connector_user=druid
#druid_metadata_storage_connector_password=FoolishPassword
#druid_coordinator_balancer_strategy=cachingCost
#druid_indexer_runner_javaOptsArray=["-server", "-Xmx1g", "-Xms1g", "-XX:MaxDirectMemorySize=3g", "-Duser.timezone=UTC", "-Dfile.encoding=UTF-8", "-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
#druid_indexer_fork_property_druid_processing_buffer_sizeBytes=128MiB
#druid_processing_buffer_sizeBytes=268435456 # 256MiB
#druid_storage_type=local
#druid_storage_storageDirectory=/opt/shared/segments
#druid_indexer_logs_type=file
#druid_indexer_logs_directory=/opt/shared/indexing-logs
#druid_processing_numThreads=1
#druid_processing_numMergeBuffers=1
#DRUID_LOG4J=<?xml version="1.0" encoding="UTF-8" ?><Configuration status="WARN"><Appenders><Console name="Console" target="SYSTEM_OUT"><PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/></Console></Appenders><Loggers><Root level="info"><AppenderRef ref="Console"/></Root><Logger name="org.apache.druid.jetty.RequestLog" additivity="false" level="DEBUG"><AppenderRef ref="Console"/></Logger></Loggers></Configuration>
# Java tuning
#DRUID_XMX=1g
#DRUID_XMS=1g
#DRUID_MAXNEWSIZE=250m
#DRUID_NEWSIZE=250m
#DRUID_MAXDIRECTMEMORYSIZE=6172m
DRUID_SINGLE_NODE_CONF=nano-quickstart
druid_emitter_logging_logLevel=debug
druid_extensions_loadList=["druid-histogram", "druid-datasketches", "druid-lookups-cached-global", "postgresql-metadata-storage", "druid-kafka-indexing-service"]
druid_zk_service_host=zookeeper
druid_metadata_storage_host=
druid_metadata_storage_type=postgresql
druid_metadata_storage_connector_connectURI=jdbc:postgresql://postgres:5432/druid
druid_metadata_storage_connector_user=druid
druid_metadata_storage_connector_password=FoolishPassword
druid_coordinator_balancer_strategy=cachingCost
druid_indexer_runner_javaOptsArray=["-server", "-Xmx1g", "-Xms1g", "-XX:MaxDirectMemorySize=3g", "-Duser.timezone=UTC", "-Dfile.encoding=UTF-8", "-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
druid_indexer_fork_property_druid_processing_buffer_sizeBytes=256MiB
druid_storage_type=local
druid_storage_storageDirectory=/opt/shared/segments
druid_indexer_logs_type=file
druid_indexer_logs_directory=/opt/shared/indexing-logs
druid_processing_numThreads=2
druid_processing_numMergeBuffers=2
DRUID_LOG4J=<?xml version="1.0" encoding="UTF-8" ?><Configuration status="WARN"><Appenders><Console name="Console" target="SYSTEM_OUT"><PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/></Console></Appenders><Loggers><Root level="info"><AppenderRef ref="Console"/></Root><Logger name="org.apache.druid.jetty.RequestLog" additivity="false" level="DEBUG"><AppenderRef ref="Console"/></Logger></Loggers></Configuration>

View File

@@ -1,265 +0,0 @@
#!/bin/sh
ip=`hostname -i|rev|cut -d\ -f 1|rev`
cat << EOF
searchd {
# https://manual.manticoresearch.com/Server_settings/Searchd#access_plain_attrs
# access_plain_attrs = mmap_preread
# https://manual.manticoresearch.com/Server_settings/Searchd#access_blob_attrs
# access_blob_attrs = mmap_preread
# https://manual.manticoresearch.com/Server_settings/Searchd#access_doclists
# access_doclists = file
# https://manual.manticoresearch.com/Server_settings/Searchd#access_hitlists
# access_hitlists = file
# https://manual.manticoresearch.com/Server_settings/Searchd#agent_connect_timeout
# agent_connect_timeout =
# https://manual.manticoresearch.com/Server_settings/Searchd#agent_query_timeout
# agent_query_timeout =
# https://manual.manticoresearch.com/Server_settings/Searchd#agent_retry_count
# agent_retry_count = 0
# https://manual.manticoresearch.com/Server_settings/Searchd#agent_retry_delay
# agent_retry_delay = 500
# https://manual.manticoresearch.com/Server_settings/Searchd#attr_flush_period
# attr_flush_period = 0
# https://manual.manticoresearch.com/Server_settings/Searchd#binlog_flush
# binlog_flush = 2
# https://manual.manticoresearch.com/Server_settings/Searchd#binlog_max_log_size
# binlog_max_log_size = 268435456
# https://manual.manticoresearch.com/Server_settings/Searchd#binlog_path
# binlog_path =
# https://manual.manticoresearch.com/Server_settings/Searchd#client_timeout
# client_timeout = 300
# https://manual.manticoresearch.com/Server_settings/Searchd#collation_libc_locale
# collation_libc_locale = C
# https://manual.manticoresearch.com/Server_settings/Searchd#collation_server
# collation_server = libc_ci
# https://manual.manticoresearch.com/Server_settings/Searchd#data_dir
data_dir = /var/lib/manticore
# https://manual.manticoresearch.com/Server_settings/Searchd#docstore_cache_size
# docstore_cache_size = 16m
# https://manual.manticoresearch.com/Server_settings/Searchd#expansion_limit
# expansion_limit = 0
# https://manual.manticoresearch.com/Server_settings/Searchd#grouping_in_utc
# grouping_in_utc = 0
# https://manual.manticoresearch.com/Server_settings/Searchd#ha_period_karma
# ha_period_karma = 60
# https://manual.manticoresearch.com/Server_settings/Searchd#ha_ping_interval
# ha_ping_interval = 1000
# https://manual.manticoresearch.com/Server_settings/Searchd#hostname_lookup
# hostname_lookup =
# https://manual.manticoresearch.com/Server_settings/Searchd#jobs_queue_size
# jobs_queue_size =
# https://manual.manticoresearch.com/Server_settings/Searchd#listen_backlog
# listen_backlog = 5
# https://manual.manticoresearch.com/Server_settings/Searchd#listen
# listen_env = this directive allows to append listeners from environment variables
listen = 9306:mysql41
listen = /var/run/mysqld/mysqld.sock:mysql41
listen = $ip:9312
listen = 9308:http
listen = $ip:9315-9325:replication
# https://manual.manticoresearch.com/Server_settings/Searchd#listen_tfo
# listen_tfo = 0
# https://manual.manticoresearch.com/Server_settings/Searchd#log
log = /var/log/manticore/searchd.log
# https://manual.manticoresearch.com/Server_settings/Searchd#max_batch_queries
# max_batch_queries = 32
# https://manual.manticoresearch.com/Server_settings/Searchd#threads
# threads =
# https://manual.manticoresearch.com/Server_settings/Searchd#max_filters
# max_filters = 256
# https://manual.manticoresearch.com/Server_settings/Searchd#max_filter_values
# max_filter_values = 4096
# https://manual.manticoresearch.com/Server_settings/Searchd#max_open_files
# max_open_files = max
# https://manual.manticoresearch.com/Server_settings/Searchd#max_packet_size
max_packet_size = 128M
# https://manual.manticoresearch.com/Server_settings/Searchd#mysql_version_string
# mysql_version_string =
# https://manual.manticoresearch.com/Server_settings/Searchd#net_workers
# net_workers = 1
# https://manual.manticoresearch.com/Server_settings/Searchd#net_wait_tm
# net_wait_tm = -1
# https://manual.manticoresearch.com/Server_settings/Searchd#net_throttle_accept
# net_throttle_accept = 0
# https://manual.manticoresearch.com/Server_settings/Searchd#net_throttle_action
# net_throttle_action = 0
# https://manual.manticoresearch.com/Server_settings/Searchd#node_address
# node_address =
# https://manual.manticoresearch.com/Server_settings/Searchd#ondisk_attrs_default
# ondisk_attrs_default = 0
# https://manual.manticoresearch.com/Server_settings/Searchd#persistent_connections_limit
# persistent_connections_limit =
# https://manual.manticoresearch.com/Server_settings/Searchd#pid_file
pid_file = /var/run/manticore/searchd.pid
# https://manual.manticoresearch.com/Server_settings/Searchd#predicted_time_costs
# predicted_time_costs = doc=64, hit=48, skip=2048, match=64
# https://manual.manticoresearch.com/Server_settings/Searchd#preopen_indexes
# preopen_indexes = 1
# https://manual.manticoresearch.com/Server_settings/Searchd#qcache_max_bytes
qcache_max_bytes = 128Mb
# https://manual.manticoresearch.com/Server_settings/Searchd#qcache_thresh_msec
qcache_thresh_msec = 150
# https://manual.manticoresearch.com/Server_settings/Searchd#qcache_ttl_sec
qcache_ttl_sec = 120
# https://manual.manticoresearch.com/Server_settings/Searchd#query_log_format
query_log_format = sphinxql
# https://manual.manticoresearch.com/Server_settings/Searchd#query_log_min_msec
# query_log_min_msec = 0
# https://manual.manticoresearch.com/Server_settings/Searchd#query_log
# query_log = /var/log/manticore/query.log
# https://manual.manticoresearch.com/Server_settings/Searchd#query_log_mode
# query_log_mode = 600
# https://manual.manticoresearch.com/Server_settings/Searchd#max_connections
# max_connections =
# https://manual.manticoresearch.com/Server_settings/Searchd#network_timeout
# network_timeout = 5
# https://manual.manticoresearch.com/Server_settings/Searchd#read_buffer
# read_buffer = 256K
# https://manual.manticoresearch.com/Server_settings/Searchd#read_buffer_docs
# read_buffer_docs = 256K
# https://manual.manticoresearch.com/Server_settings/Searchd#read_buffer_hits
# read_buffer_hits = 256K
# https://manual.manticoresearch.com/Server_settings/Searchd#read_unhinted
# read_unhinted 32K
# https://manual.manticoresearch.com/Server_settings/Searchd#rt_flush_period
# rt_flush_period =
# https://manual.manticoresearch.com/Server_settings/Searchd#rt_merge_iops
# rt_merge_iops = 0
# https://manual.manticoresearch.com/Server_settings/Searchd#rt_merge_maxiosize
# rt_merge_maxiosize = 0
# https://manual.manticoresearch.com/Server_settings/Searchd#seamless_rotate
# seamless_rotate = 1
# https://manual.manticoresearch.com/Server_settings/Searchd#server_id
# server_id =
# https://manual.manticoresearch.com/Server_settings/Searchd#shutdown_timeout
# shutdown_timeout = 3
# https://manual.manticoresearch.com/Server_settings/Searchd#shutdown_token
# shutdown_token =
# https://manual.manticoresearch.com/Server_settings/Searchd#snippets_file_prefix
# snippets_file_prefix =
# https://manual.manticoresearch.com/Server_settings/Searchd#sphinxql_state
# sphinxql_state =
# https://manual.manticoresearch.com/Server_settings/Searchd#sphinxql_timeout
# sphinxql_timeout = 900
# https://manual.manticoresearch.com/Server_settings/Searchd#ssl_ca
# ssl_ca =
# https://manual.manticoresearch.com/Server_settings/Searchd#ssl_cert
# ssl_cert =
# https://manual.manticoresearch.com/Server_settings/Searchd#ssl_key
# ssl_key =
# https://manual.manticoresearch.com/Server_settings/Searchd#subtree_docs_cache
# subtree_docs_cache = 0
# https://manual.manticoresearch.com/Server_settings/Searchd#subtree_hits_cache
# subtree_hits_cache = 0
# https://manual.manticoresearch.com/Server_settings/Searchd#thread_stack
# thread_stack =
# https://manual.manticoresearch.com/Server_settings/Searchd#unlink_old
# unlink_old = 1
# https://manual.manticoresearch.com/Server_settings/Searchd#watchdog
# watchdog = 1
}
common {
# https://manual.manticoresearch.com/Server_settings/Common#lemmatizer_base
# lemmatizer_base = /usr/local/share
# https://manual.manticoresearch.com/Server_settings/Common#progressive_merge
# progressive_merge =
# https://manual.manticoresearch.com/Server_settings/Common#json_autoconv_keynames
# json_autoconv_keynames =
# https://manual.manticoresearch.com/Server_settings/Common#json_autoconv_numbers
# json_autoconv_numbers = 0
# https://manual.manticoresearch.com/Server_settings/Common#on_json_attr_error
# on_json_attr_error = ignore_attr
# https://manual.manticoresearch.com/Server_settings/Common#plugin_dir
# plugin_dir =
}
# indexer {
# lemmatizer_cache = 1024M
# max_iops = 0
# max_iosize = 0
# mem_limit = 1024M
# }
EOF

View File

@@ -1,2 +1,5 @@
unixsocket /var/run/socks/redis.sock unixsocket /var/run/monolith-redis.sock
unixsocketperm 777 unixsocketperm 777
port 0
# port 6379
# requirepass changeme

View File

@@ -1,24 +0,0 @@
wheel
beautifulsoup4
redis
siphashc
aiohttp[speedups]
python-dotenv
#manticoresearch
numpy
aioredis[hiredis]
#aiokafka
vaderSentiment
polyglot
pyicu
pycld2
morfessor
six
nltk
#spacy
gensim
python-Levenshtein
orjson
uvloop
numba
elasticsearch[async]

View File

@@ -1,3 +0,0 @@
clusters:
- name: druid
guardDataCubes: true

View File

@@ -1,17 +0,0 @@
repos:
- repo: https://github.com/psf/black
rev: 22.6.0
hooks:
- id: black
args:
- --line-length=120
- repo: https://github.com/PyCQA/isort
rev: 5.10.1
hooks:
- id: isort
args: ["--profile", "black"]
- repo: https://github.com/PyCQA/flake8
rev: 4.0.1
hooks:
- id: flake8
args: [--max-line-length=120]

View File

@@ -1,41 +0,0 @@
version: "2"
services:
app:
image: pathogen/threshold:latest
build: ./docker
volumes:
- ${PORTAINER_GIT_DIR}:/code
- ${THRESHOLD_CONFIG_DIR}:/code/conf/live
#- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates
- ${THRESHOLD_CERT_DIR}:/code/conf/cert
ports:
- "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}"
- "${THRESHOLD_RELAY_PORT}:${THRESHOLD_RELAY_PORT}"
- "${THRESHOLD_API_PORT}:${THRESHOLD_API_PORT}"
env_file:
- .env
# for development
extra_hosts:
- "host.docker.internal:host-gateway"
volumes_from:
- tmp
tmp:
image: busybox
command: chmod -R 777 /var/run/redis
volumes:
- /var/run/redis
redis:
image: redis
command: redis-server /etc/redis.conf
volumes:
- ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf
volumes_from:
- tmp
networks:
default:
external:
name: pathogen

View File

@@ -1,38 +0,0 @@
version: "2"
services:
app:
image: pathogen/threshold:latest
build: ./docker
volumes:
- ${PORTAINER_GIT_DIR}:/code
- ${THRESHOLD_CONFIG_DIR}:/code/conf/live
#- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates
- ${THRESHOLD_CERT_DIR}:/code/conf/cert
ports:
- "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}"
- "${THRESHOLD_RELAY_PORT}:${THRESHOLD_RELAY_PORT}"
- "${THRESHOLD_API_PORT}:${THRESHOLD_API_PORT}"
env_file:
- ../stack.env
volumes_from:
- tmp
tmp:
image: busybox
command: chmod -R 777 /var/run/redis
volumes:
- /var/run/redis
redis:
image: redis
command: redis-server /etc/redis.conf
volumes:
- ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf
volumes_from:
- tmp
networks:
default:
external:
name: pathogen

View File

@@ -4,6 +4,7 @@ from os import urandom
from os.path import exists from os.path import exists
from string import digits from string import digits
import redis
from redis import StrictRedis from redis import StrictRedis
# List of errors ZNC can give us # List of errors ZNC can give us
@@ -121,7 +122,7 @@ def initConf():
def initMain(): def initMain():
global r, g global r, g, x
initConf() initConf()
r = StrictRedis( r = StrictRedis(
unix_socket_path=config["RedisSocket"], db=config["RedisDBEphemeral"] # noqa unix_socket_path=config["RedisSocket"], db=config["RedisDBEphemeral"] # noqa
@@ -129,3 +130,5 @@ def initMain():
g = StrictRedis( g = StrictRedis(
unix_socket_path=config["RedisSocket"], db=config["RedisDBPersistent"] unix_socket_path=config["RedisSocket"], db=config["RedisDBPersistent"]
) # noqa ) # noqa
# SSDB for communication with Monolith
x = redis.from_url("redis://ssdb:1289", db=0)

View File

@@ -67,7 +67,7 @@ def parsemeta(numName, c):
def queue_message(c): def queue_message(c):
message = json.dumps(c) message = json.dumps(c)
main.g.lpush("queue", message) main.x.lpush("queue", message)
def event( def event(

View File

@@ -1,9 +0,0 @@
wheel
pre-commit
twisted
pyOpenSSL
redis
pyYaML
service_identity
siphashc
Klein

View File

@@ -98,7 +98,6 @@ class IRCRelayFactory(ReconnectingClientFactory):
self.relayCommands, self.user, self.stage2 = relayCommands, user, stage2 self.relayCommands, self.user, self.stage2 = relayCommands, user, stage2
def buildProtocol(self, addr): def buildProtocol(self, addr):
entry = IRCRelay(self.num, self.relayCommands, self.user, self.stage2) entry = IRCRelay(self.num, self.relayCommands, self.user, self.stage2)
self.client = entry self.client = entry

View File

@@ -1,8 +1,10 @@
import asyncio import asyncio
from os import getenv from os import getenv
from time import sleep
import uvloop import uvloop
import db
import util import util
from sources.ch4 import Chan4 from sources.ch4 import Chan4
from sources.dis import DiscordClient from sources.dis import DiscordClient
@@ -21,14 +23,28 @@ if not token:
async def main(loop): async def main(loop):
client = DiscordClient() if "ingest" in modules_enabled:
loop.create_task(client.start(token)) ingest = Ingest()
loop.create_task(ingest.run())
chan = Chan4() if "dis" in modules_enabled:
loop.create_task(chan.run()) client = DiscordClient()
loop.create_task(client.start(token))
ingest = Ingest() if "ch4" in modules_enabled:
loop.create_task(ingest.run()) chan = Chan4()
loop.create_task(chan.run())
created = False
while not created:
try:
db.create_index(db.api_client)
created = True
except Exception as e:
print(f"Error creating index: {e}")
sleep(1) # Block the thread, just wait for the DB
db.update_schema()
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()

0
oom Normal file
View File

0
perf/__init__.py Normal file
View File

134
perf/throttle.py Normal file
View File

@@ -0,0 +1,134 @@
import asyncio
import time
import psutil
import util
class DynamicThrottle(object):
def __init__(self, **kwargs):
self.target_cpu_usage = kwargs.get("target_cpu_usage", 50)
self.sleep_interval = 0.0
self.sleep_increment = kwargs.get("sleep_increment", 0.01)
self.sleep_decrement = kwargs.get("sleep_decrement", 0.01)
self.sleep_max = kwargs.get("sleep_max", 0.1)
self.sleep_min = kwargs.get("sleep_min", 0.01)
self.psutil_interval = kwargs.get("psutil_interval", 0.1)
self.log = kwargs.get("log", util.get_logger(self.__class__.__name__))
self.consecutive_increments = 0
self.consecutive_decrements = 0
self.consecutive_divisor = kwargs.get("consecutive_divisor", 1)
self.last_was_increment = kwargs.get("start_increment", True)
if kwargs.get("use_async"):
self.wait = self.dynamic_throttle_async
else:
self.wait = self.dynamic_throttle
async def dynamic_throttle_async(self):
"""
Dynamically sleeps before a request if CPU usage is above our target.
"""
current_cpu_usage = psutil.cpu_percent(interval=self.psutil_interval)
if current_cpu_usage > self.target_cpu_usage:
if self.last_was_increment:
self.consecutive_increments += 1
# self.log.debug(f"High CPU consecutive increments: {self.consecutive_increments}")
else:
self.consecutive_increments = 0 # ?
self.consecutive_decrements = 0 # ?
# self.log.debug(f"High CPU alert reset.")
self.sleep_interval += self.sleep_increment * (
max(1, self.consecutive_increments) / self.consecutive_divisor
)
self.last_was_increment = True
if self.sleep_interval > self.sleep_max:
self.sleep_interval = self.sleep_max
# self.log.debug(f"High CPU, but not increasing above {self.sleep_max:.3f}s")
# self.log.debug(
# f"High CPU: {current_cpu_usage}% > {self.target_cpu_usage}%, "
# f"=> sleep {self.sleep_interval:.3f}s"
# )
elif current_cpu_usage < self.target_cpu_usage:
if not self.last_was_increment:
self.consecutive_decrements += 1
# self.log.debug(f"Low CPU consecutive decrements: {self.consecutive_decrements}")
else:
self.consecutive_decrements = 0 # ?
self.consecutive_increments = 0 # ?
# self.log.debug(f"Low CPU alert reset.")
self.sleep_interval -= self.sleep_decrement * (
max(1, self.consecutive_decrements) / self.consecutive_divisor
)
self.last_was_increment = False
if self.sleep_interval < self.sleep_min:
self.sleep_interval = self.sleep_min
# self.log.debug(f"Low CPU, but not decreasing below {self.sleep_min:.3f}s")
# self.log.debug(
# f"Low CPU: {current_cpu_usage}% < {self.target_cpu_usage}%, "
# f"=> sleep {self.sleep_interval:.3f}s"
# )
if self.sleep_interval > 0:
await asyncio.sleep(self.sleep_interval)
return self.sleep_interval
return 0.0
def dynamic_throttle(self):
"""
Dynamically sleeps before a request if CPU usage is above our target.
"""
current_cpu_usage = psutil.cpu_percent(interval=self.psutil_interval)
if current_cpu_usage > self.target_cpu_usage:
if self.last_was_increment:
self.consecutive_increments += 1
# self.log.debug(f"High CPU consecutive increments: {self.consecutive_increments}")
else:
self.consecutive_increments = 0 # ?
self.consecutive_decrements = 0 # ?
# self.log.debug(f"High CPU alert reset.")
self.sleep_interval += self.sleep_increment * (
max(1, self.consecutive_increments) / self.consecutive_divisor
)
self.last_was_increment = True
if self.sleep_interval > self.sleep_max:
self.sleep_interval = self.sleep_max
# self.log.debug(f"High CPU, but not increasing above {self.sleep_max:.3f}s")
# self.log.debug(
# f"High CPU: {current_cpu_usage}% > {self.target_cpu_usage}%, "
# f"=> sleep {self.sleep_interval:.3f}s"
# )
elif current_cpu_usage < self.target_cpu_usage:
if not self.last_was_increment:
self.consecutive_decrements += 1
# self.log.debug(f"Low CPU consecutive decrements: {self.consecutive_decrements}")
else:
self.consecutive_decrements = 0 # ?
self.consecutive_increments = 0 # ?
# self.log.debug(f"Low CPU alert reset.")
self.sleep_interval -= self.sleep_decrement * (
max(1, self.consecutive_decrements) / self.consecutive_divisor
)
self.last_was_increment = False
if self.sleep_interval < self.sleep_min:
self.sleep_interval = self.sleep_min
# self.log.debug(f"Low CPU, but not decreasing below {self.sleep_min:.3f}s")
# self.log.debug(
# f"Low CPU: {current_cpu_usage}% < {self.target_cpu_usage}%, "
# f"=> sleep {self.sleep_interval:.3f}s"
# )
if self.sleep_interval > 0:
time.sleep(self.sleep_interval)
return self.sleep_interval
return 0.0

1
processing/ohlc.py Normal file
View File

@@ -0,0 +1 @@
# Resample 1Min into 5Min, 15Min, 30Min, 1H, 4H, 1D, 1W, 1M, 1Y

View File

@@ -47,6 +47,9 @@ from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import db import db
import util import util
# For throttling
from perf.throttle import DynamicThrottle
# 4chan schema # 4chan schema
from schemas.ch4_s import ATTRMAP from schemas.ch4_s import ATTRMAP
@@ -57,6 +60,7 @@ KEYNAME = "queue"
MONOLITH_PROCESS_PERFSTATS = ( MONOLITH_PROCESS_PERFSTATS = (
getenv("MONOLITH_PROCESS_PERFSTATS", "false").lower() in trues getenv("MONOLITH_PROCESS_PERFSTATS", "false").lower() in trues
) )
TARGET_CPU_USAGE = float(os.getenv("MONOLITH_PROCESS_TARGET_CPU_USAGE", 50.0))
CUSTOM_FILTERS = [ CUSTOM_FILTERS = [
lambda x: x.lower(), lambda x: x.lower(),
@@ -88,6 +92,19 @@ CPU_THREADS = int(os.getenv("MONOLITH_PROCESS_THREADS", os.cpu_count()))
p = ProcessPoolExecutor(CPU_THREADS) p = ProcessPoolExecutor(CPU_THREADS)
throttle = DynamicThrottle(
target_cpu_usage=TARGET_CPU_USAGE,
sleep_increment=0.02,
sleep_decrement=0.01,
sleep_max=0.5,
sleep_min=0,
psutil_interval=0.1,
consecutive_divisor=2,
log=log,
start_increment=True,
use_async=False,
)
def get_hash_key(): def get_hash_key():
hash_key = db.r.get("hashing_key") hash_key = db.r.get("hashing_key")
@@ -129,13 +146,15 @@ async def spawn_processing_threads(chunk, length):
# Join the results back from the split list # Join the results back from the split list
flat_list = [item for sublist in results for item in sublist] flat_list = [item for sublist in results for item in sublist]
log.debug( total_messages = len(flat_list)
log.info(
( (
f"[{chunk}/{index}] Results from processing of {length} messages in " f"[{chunk}/{index}] Results from processing of {length} messages in "
f"{cores} threads: {len(flat_list)}" f"{cores} threads: {len(flat_list)}"
) )
) )
await db.store_batch(flat_list) await db.store_batch(flat_list)
return total_messages
# log.debug(f"Finished processing {len_data} messages") # log.debug(f"Finished processing {len_data} messages")
@@ -150,19 +169,38 @@ def process_data(chunk, index, chunk_size):
date_time = 0.0 date_time = 0.0
nlp_time = 0.0 nlp_time = 0.0
normalise_time = 0.0 normalise_time = 0.0
hash_time = 0.0
normal2_time = 0.0 normal2_time = 0.0
soup_time = 0.0 soup_time = 0.0
sleep_time = 0.0
total_time = 0.0 total_time = 0.0
# Initialise sentiment analyser # Initialise sentiment analyser
analyzer = SentimentIntensityAnalyzer() analyzer = SentimentIntensityAnalyzer()
for msg_index in range(chunk_size): for msg_index in range(chunk_size):
# Print percentage of msg_index relative to chunk_size
if msg_index % 10 == 0:
percentage_done = (msg_index / chunk_size) * 100
log.debug(
f"[{chunk}/{index}] {percentage_done:.2f}% done ({msg_index}/{chunk_size})"
)
msg = db.r.rpop(KEYNAME) msg = db.r.rpop(KEYNAME)
if not msg: if not msg:
return return
msg = orjson.loads(msg) msg = orjson.loads(msg)
if msg["src"] == "4ch":
board = msg["net"]
thread = msg["channel"]
redis_key = (
f"cache.{board}.{thread}.{msg['no']}.{msg['resto']}.{msg['now']}"
)
key_content = db.r.get(redis_key)
if key_content is not None:
continue
db.r.set(redis_key, "1")
total_start = time.process_time() total_start = time.process_time()
# normalise fields # normalise fields
start = time.process_time() start = time.process_time()
@@ -188,27 +226,6 @@ def process_data(chunk, index, chunk_size):
board = msg["net"] board = msg["net"]
thread = msg["channel"] thread = msg["channel"]
# Calculate hash for post
start = time.process_time()
post_normalised = orjson.dumps(msg, option=orjson.OPT_SORT_KEYS)
hash = siphash(hash_key, post_normalised)
hash = str(hash)
redis_key = (
f"cache.{board}.{thread}.{msg['no']}.{msg['resto']}.{msg['now']}"
)
key_content = db.r.get(redis_key)
if key_content is not None:
key_content = key_content.decode("ascii")
if key_content == hash:
# This deletes the message since the append at the end won't be hit
continue
# pass
else:
msg["type"] = "update"
db.r.set(redis_key, hash)
time_took = (time.process_time() - start) * 1000
hash_time += time_took
start = time.process_time() start = time.process_time()
for key2, value in list(msg.items()): for key2, value in list(msg.items()):
if key2 in ATTRMAP: if key2 in ATTRMAP:
@@ -226,9 +243,10 @@ def process_data(chunk, index, chunk_size):
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S") old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S")
else: else:
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M") old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M")
# new_ts = old_ts.isoformat() # iso_ts = old_ts.isoformat()
new_ts = int(old_ts.timestamp()) new_ts = int(old_ts.timestamp())
msg["ts"] = new_ts msg["ts"] = new_ts
# msg["iso"] = iso_ts
else: else:
raise Exception("No TS in msg") raise Exception("No TS in msg")
time_took = (time.process_time() - start) * 1000 time_took = (time.process_time() - start) * 1000
@@ -273,7 +291,7 @@ def process_data(chunk, index, chunk_size):
# Tokens # Tokens
start = time.process_time() start = time.process_time()
tokens = preprocess_string(msg["msg"], CUSTOM_FILTERS) tokens = preprocess_string(msg["msg"], CUSTOM_FILTERS)
msg["tokens"] = tokens msg["tokens"] = str(tokens)
# n = nlp(msg["msg"]) # n = nlp(msg["msg"])
# for tag in TAGS: # for tag in TAGS:
# tag_name = tag.lower() # tag_name = tag.lower()
@@ -286,20 +304,24 @@ def process_data(chunk, index, chunk_size):
to_store.append(msg) to_store.append(msg)
total_time += (time.process_time() - total_start) * 1000 total_time += (time.process_time() - total_start) * 1000
# Dynamic throttling to reduce CPU usage
if msg_index % 5 == 0:
sleep_time += throttle.wait()
if MONOLITH_PROCESS_PERFSTATS: if MONOLITH_PROCESS_PERFSTATS:
log.debug("=====================================") log.info("=====================================")
log.debug(f"Chunk: {chunk}") log.info(f"Chunk: {chunk}")
log.debug(f"Index: {index}") log.info(f"Index: {index}")
log.debug(f"Sentiment: {sentiment_time}") log.info(f"Sentiment: {sentiment_time}")
log.debug(f"Regex: {regex_time}") log.info(f"Regex: {regex_time}")
log.debug(f"Polyglot: {polyglot_time}") log.info(f"Polyglot: {polyglot_time}")
log.debug(f"Date: {date_time}") log.info(f"Date: {date_time}")
log.debug(f"NLP: {nlp_time}") log.info(f"NLP: {nlp_time}")
log.debug(f"Normalise: {normalise_time}") log.info(f"Normalise: {normalise_time}")
log.debug(f"Hash: {hash_time}") log.info(f"Normal2: {normal2_time}")
log.debug(f"Normal2: {normal2_time}") log.info(f"Soup: {soup_time}")
log.debug(f"Soup: {soup_time}") log.info(f"Total: {total_time}")
log.debug(f"Total: {total_time}") log.info(f"Throttling: {sleep_time}")
log.debug("=====================================") log.info("=====================================")
return to_store return to_store

View File

@@ -1,11 +1,10 @@
wheel wheel
pre-commit
beautifulsoup4 beautifulsoup4
redis redis
siphashc siphashc
aiohttp[speedups] aiohttp[speedups]
python-dotenv python-dotenv
#manticoresearch manticoresearch
numpy numpy
aioredis[hiredis] aioredis[hiredis]
#aiokafka #aiokafka
@@ -21,5 +20,10 @@ gensim
python-Levenshtein python-Levenshtein
orjson orjson
uvloop uvloop
numba
elasticsearch[async] elasticsearch[async]
msgpack
# flpc
psutil
pymexc
websockets
aiomysql

186
rts.py Normal file
View File

@@ -0,0 +1,186 @@
import asyncio
import logging
from os import getenv
import orjson
import websockets
import db
# Logger setup
logging.basicConfig(level=logging.INFO)
log = logging.getLogger("RTS")
# Environment variables
MONOLITH_RTS_MEXC_API_ACCESS_KEY = getenv("MONOLITH_RTS_MEXC_API_ACCESS_KEY", None)
MONOLITH_RTS_MEXC_API_SECRET_KEY = getenv("MONOLITH_RTS_MEXC_API_SECRET_KEY", None)
# WebSocket endpoint
MEXC_WS_URL = "wss://wbs.mexc.com/ws"
{
"d": {
"e": "spot@public.kline.v3.api",
"k": {
"t": 1737901140, # TS
"o": "684.4", # Open
"c": "684.5", # Close
"h": "684.5", # High
"l": "684.4", # Low
"v": "0.173", # Volume of the base
"a": "118.41", # Volume of the quote (Quantity)
"T": 1737901200, # ?
"i": "Min1", # ?
},
},
"c": "spot@public.kline.v3.api@BNBUSDT@Min1", # Channel
"t": 1737901159239,
"s": "BNBUSDT", # Symbol
}
# Scan DB for last endtime (T)
# Request Kline data from last endtime (T) to now
# Check Server Time
# Response
# {
# "serverTime" : 1645539742000
# }
# GET /api/v3/time
# Weight(IP): 1
# Parameter:
# NONE
# Kline/Candlestick Data
# Response
# [
# [
# 1640804880000,
# "47482.36",
# "47482.36",
# "47416.57",
# "47436.1",
# "3.550717",
# 1640804940000,
# "168387.3"
# ]
# ]
# GET /api/v3/klines
# Weight(IP): 1
# Kline/candlestick bars for a symbol. Klines are uniquely identified by their open time.
# Parameters:
# Name Type Mandatory Description
# symbol string YES
# interval ENUM YES ENUM: Kline Interval
# startTime long NO
# endTime long NO
# limit integer NO Default 500; max 1000.
# Scrub function:
# For each record, ensure there are no time gaps
# When the 1m window goes over, the next t is always the last T.
# Check for gaps, and request all klines between those gaps to ensure a full DB, even with restarts.
# Idle jitter function - compare our time with server time.
# Compare ts with our time and print jitter. Add jitter warning to log and OHLC.
# High jitter may prevent us from getting the correct data for trading.
async def mex_handle(data):
message = orjson.loads(data)
# print(orjson.dumps(message, option=orjson.OPT_INDENT_2).decode("utf-8"))
if "code" in message:
if message["code"] == 0:
log.info("Control message received")
return
symbol = message["s"]
open = message["d"]["k"]["o"]
close = message["d"]["k"]["c"]
high = message["d"]["k"]["h"]
low = message["d"]["k"]["l"]
volume_base = message["d"]["k"]["v"] # ERROR IN API DOCS
volume_quote = message["d"]["k"]["a"] # > a bigDecimal volume
interval = message["d"]["k"]["i"]
start_time = message["d"]["k"]["t"] # > t long stratTime
end_time = message["d"]["k"]["T"] # > T long endTime
event_time = message["t"] # t long eventTime
index = f"mex_ohlc_{symbol.lower()}"
reformatted = {
"s": symbol,
"o": float(open),
"c": float(close),
"h": float(high),
"l": float(low),
"v": float(volume_base),
"a": float(volume_quote),
"i": interval,
"t": int(start_time),
"t2": int(end_time),
"ts": int(event_time),
}
await db.rts_store_message(index, reformatted)
print(index)
print(orjson.dumps(reformatted, option=orjson.OPT_INDENT_2).decode("utf-8"))
print()
# Kline WebSocket handler
async def mex_main():
await db.init_mysql_pool()
async with websockets.connect(MEXC_WS_URL) as websocket:
log.info("WebSocket connected")
# Define symbols and intervals
symbols = ["BTCUSDT"] # Add more symbols as needed
interval = "Min1" # Kline interval
# Prepare subscription requests for Kline streams
# Request: spot@public.kline.v3.api@<symbol>@<interval>
subscriptions = [
f"spot@public.kline.v3.api@{symbol}@{interval}" for symbol in symbols
]
# Send subscription requests
subscribe_request = {
"method": "SUBSCRIPTION",
"params": subscriptions,
# "id": 1,
}
await websocket.send(orjson.dumps(subscribe_request).decode("utf-8"))
log.info(f"Subscribed to: {subscriptions}")
# Listen for messages
while True:
try:
message = await websocket.recv()
await mex_handle(message)
except websockets.exceptions.ConnectionClosed as e:
log.error(f"WebSocket connection closed: {e}")
break
# Entry point
if __name__ == "__main__":
try:
asyncio.run(mex_main())
except KeyboardInterrupt:
log.info("RTS process terminated.")

View File

@@ -129,8 +129,19 @@ schema_main = {
"version_sentiment": "int", "version_sentiment": "int",
# 1, 2 # 1, 2
"version_tokens": "int", "version_tokens": "int",
# en, ru
"lang_code": "string indexed attribute",
"lang_name": "text",
"match_ts": "timestamp",
"batch_id": "bigint",
"rule_id": "bigint",
"index": "string indexed attribute",
"meta": "text",
# "iso": "string indexed attribute",
} }
schema_rule_storage = schema_main
schema_meta = { schema_meta = {
"id": "bigint", "id": "bigint",
# 393598265, #main, Rust Programmer's Club # 393598265, #main, Rust Programmer's Club

View File

@@ -10,6 +10,7 @@ from numpy import array_split
import db import db
import util import util
from perf.throttle import DynamicThrottle
# CONFIGURATION # # CONFIGURATION #
@@ -25,6 +26,12 @@ CRAWL_DELAY = int(getenv("MONOLITH_CH4_CRAWL_DELAY", 5))
# Semaphore value ? # Semaphore value ?
THREADS_SEMAPHORE = int(getenv("MONOLITH_CH4_THREADS_SEMAPHORE", 1000)) THREADS_SEMAPHORE = int(getenv("MONOLITH_CH4_THREADS_SEMAPHORE", 1000))
# Target CPU usage percentage
TARGET_CPU_USAGE = float(getenv("MONOLITH_CH4_TARGET_CPU_USAGE", 50.0))
# Boards to crawl
BOARDS = getenv("MONOLITH_CH4_BOARDS", "").split(",")
# CONFIGURATION END # # CONFIGURATION END #
@@ -37,6 +44,19 @@ class Chan4(object):
name = self.__class__.__name__ name = self.__class__.__name__
self.log = util.get_logger(name) self.log = util.get_logger(name)
self.throttle = DynamicThrottle(
target_cpu_usage=TARGET_CPU_USAGE,
sleep_increment=0.01,
sleep_decrement=0.01,
sleep_max=0.1,
sleep_min=0,
psutil_interval=0.1,
log=self.log,
start_increment=False,
use_async=True,
)
self.wait = self.throttle.wait
self.api_endpoint = "https://a.4cdn.org" self.api_endpoint = "https://a.4cdn.org"
# self.boards = ["out", "g", "a", "3", "pol"] # # self.boards = ["out", "g", "a", "3", "pol"] #
self.boards = [] self.boards = []
@@ -53,12 +73,14 @@ class Chan4(object):
self.log.debug(f"Created new hash key: {self.hash_key}") self.log.debug(f"Created new hash key: {self.hash_key}")
db.r.set("hashing_key", self.hash_key) db.r.set("hashing_key", self.hash_key)
else: else:
self.hash_key = self.hash_key.decode("ascii") self.hash_key = self.hash_key.decode("ascii")
self.log.debug(f"Decoded hash key: {self.hash_key}") self.log.debug(f"Decoded hash key: {self.hash_key}")
async def run(self): async def run(self):
await self.get_board_list() if "ALL" in BOARDS:
await self.get_board_list()
else:
self.boards = BOARDS
while True: while True:
await self.get_thread_lists(self.boards) await self.get_thread_lists(self.boards)
await asyncio.sleep(CRAWL_DELAY) await asyncio.sleep(CRAWL_DELAY)
@@ -71,6 +93,8 @@ class Chan4(object):
for board in response["boards"]: for board in response["boards"]:
self.boards.append(board["board"]) self.boards.append(board["board"])
self.log.debug(f"Got boards: {self.boards}") self.log.debug(f"Got boards: {self.boards}")
# await self.dynamic_throttle()
# TODO
async def get_thread_lists(self, boards): async def get_thread_lists(self, boards):
# self.log.debug(f"Getting thread list for {boards}") # self.log.debug(f"Getting thread list for {boards}")
@@ -86,6 +110,8 @@ class Chan4(object):
for threads in page["threads"]: for threads in page["threads"]:
no = threads["no"] no = threads["no"]
to_get.append((board, no)) to_get.append((board, no))
# await self.dynamic_throttle()
# TODO
if not to_get: if not to_get:
return return
@@ -95,6 +121,8 @@ class Chan4(object):
for index, thr in enumerate(split_threads): for index, thr in enumerate(split_threads):
self.log.debug(f"Series {index} - getting {len(thr)} threads") self.log.debug(f"Series {index} - getting {len(thr)} threads")
await self.get_threads_content(thr) await self.get_threads_content(thr)
# await self.dynamic_throttle()
# TODO
await asyncio.sleep(THREADS_DELAY) await asyncio.sleep(THREADS_DELAY)
def take_items(self, dict_list, n): def take_items(self, dict_list, n):
@@ -125,6 +153,8 @@ class Chan4(object):
continue continue
board, thread = mapped board, thread = mapped
all_posts[mapped] = response["posts"] all_posts[mapped] = response["posts"]
# await self.dynamic_throttle()
# TODO
if not all_posts: if not all_posts:
return return
@@ -142,6 +172,8 @@ class Chan4(object):
post["channel"] = thread post["channel"] = thread
to_store.append(post) to_store.append(post)
# await self.dynamic_throttle()
# TODO
if to_store: if to_store:
await db.queue_message_bulk(to_store) await db.queue_message_bulk(to_store)
@@ -156,6 +188,7 @@ class Chan4(object):
async def bound_fetch(self, sem, url, session, mapped): async def bound_fetch(self, sem, url, session, mapped):
# Getter function with semaphore. # Getter function with semaphore.
async with sem: async with sem:
await self.wait()
try: try:
return await self.fetch(url, session, mapped) return await self.fetch(url, session, mapped)
except: # noqa except: # noqa

View File

@@ -11,8 +11,16 @@ KEYNAME = "queue"
CHUNK_SIZE = int(getenv("MONOLITH_INGEST_CHUNK_SIZE", "900")) CHUNK_SIZE = int(getenv("MONOLITH_INGEST_CHUNK_SIZE", "900"))
ITER_DELAY = float(getenv("MONOLITH_INGEST_ITER_DELAY", "0.5")) ITER_DELAY = float(getenv("MONOLITH_INGEST_ITER_DELAY", "0.5"))
INGEST_INCREASE_BELOW = int(getenv("MONOLITH_INGEST_INCREASE_BELOW", "2500"))
INGEST_DECREASE_ABOVE = int(getenv("MONOLITH_INGEST_DECREASE_ABOVE", "10000"))
INGEST_INCREASE_BY = int(getenv("MONOLITH_INGEST_INCREASE_BY", "100"))
INGEST_DECREASE_BY = int(getenv("MONOLITH_INGEST_DECREASE_BY", "100"))
log = util.get_logger("ingest") log = util.get_logger("ingest")
INGEST_MAX = int(getenv("MONOLITH_INGEST_MAX", "1000000"))
INGEST_MIN = int(getenv("MONOLITH_INGEST_MIN", "100"))
class Ingest(object): class Ingest(object):
def __init__(self): def __init__(self):
@@ -34,9 +42,40 @@ class Ingest(object):
await asyncio.sleep(ITER_DELAY) await asyncio.sleep(ITER_DELAY)
async def get_chunk(self): async def get_chunk(self):
global CHUNK_SIZE
length = await db.ar.llen(KEYNAME) length = await db.ar.llen(KEYNAME)
if length > CHUNK_SIZE: if length > CHUNK_SIZE:
length = CHUNK_SIZE length = CHUNK_SIZE
if not length: if not length:
return return
await process.spawn_processing_threads(self.current_chunk, length) ingested = await process.spawn_processing_threads(self.current_chunk, length)
if ingested < INGEST_INCREASE_BELOW:
if CHUNK_SIZE + INGEST_INCREASE_BY < INGEST_MAX:
self.log.debug(
(
f"Increasing chunk size to "
f"{CHUNK_SIZE + INGEST_INCREASE_BY} "
f"due to low ingestion ({ingested})"
)
)
CHUNK_SIZE += INGEST_INCREASE_BY
else:
log.debug(
f"Chunk size ({CHUNK_SIZE}) at maximum, not increasing above: {INGEST_MAX}"
)
elif ingested > INGEST_DECREASE_ABOVE:
if CHUNK_SIZE - INGEST_DECREASE_BY > INGEST_MIN:
self.log.debug(
(
f"Decreasing chunk size to "
f"{CHUNK_SIZE - INGEST_DECREASE_BY}"
f"due to high ingestion ({ingested})"
)
)
CHUNK_SIZE -= INGEST_DECREASE_BY
else:
log.debug(
f"Chunk size ({CHUNK_SIZE}) at minimum, not decreasing below: {INGEST_MIN}"
)

View File

@@ -43,7 +43,6 @@ class ColoredFormatter(logging.Formatter):
def get_logger(name): def get_logger(name):
# Define the logging format # Define the logging format
FORMAT = "%(asctime)s %(levelname)18s $BOLD%(name)13s$RESET - %(message)s" FORMAT = "%(asctime)s %(levelname)18s $BOLD%(name)13s$RESET - %(message)s"
COLOR_FORMAT = formatter_message(FORMAT, True) COLOR_FORMAT = formatter_message(FORMAT, True)