Compare commits

...

34 Commits

Author SHA1 Message Date
81f05d4263 Begin implementing RTS 2026-02-17 12:14:29 +00:00
dc533f266f Add psutil to requirements 2025-01-24 12:18:17 +00:00
ea4b5e6321 Begin implementation of dynamic throttling framework 2025-01-24 12:17:43 +00:00
54ecfbae64 Add throttling for performance 2025-01-24 12:17:22 +00:00
352909bec0 Begin implementing RTS process 2025-01-23 11:30:01 +00:00
1cc2ef629e Automatic optimisation for ingest chunking based on processed messages 2025-01-23 11:29:48 +00:00
1aeadaf3b7 Stringify tokens and return message number from processing 2025-01-23 11:29:07 +00:00
ba8c33d8fc Update socket and database URLs 2025-01-23 11:27:51 +00:00
2ef2249be7 Update container files to work with Podman 2025-01-23 11:26:58 +00:00
054e9caca0 Update to run with Podman 2024-12-29 17:35:57 +00:00
5ea4e5f460 Bump versions in pre-commit config 2023-02-09 07:20:13 +00:00
210237b50a Update pre-commit versions 2023-02-09 07:20:13 +00:00
87b81ac236 Retry Redis ingest if it failed 2023-01-13 07:20:27 +00:00
02071758b5 Send messages to Neptune Redis via PubSub 2023-01-12 07:20:43 +00:00
0ab67becff Give option for only crawling some boards 2022-12-22 07:20:26 +00:00
ebaf8c765d Allow disabling modules in environment variables 2022-12-22 07:20:26 +00:00
ce2d7684bc Run ingest task first 2022-12-22 10:11:48 +00:00
508b00e471 Pre-create meta index 2022-11-23 19:02:31 +00:00
9d1e4b44e8 Add pathogen network 2022-11-23 18:23:20 +00:00
371bce1094 Remove print statements 2022-11-22 21:43:56 +00:00
be0cf231b4 Fix mapping and make Threshold talk to SSDB 2022-11-22 21:42:35 +00:00
1993c8f1d2 Use Portainer Git directory for Redis config 2022-11-22 21:19:13 +00:00
9d35930b3b Re-add Portainer Git dir 2022-11-22 21:15:19 +00:00
34346006ab Remove leftover Docker files in legacy 2022-11-22 21:11:46 +00:00
42a3f5da03 Remove tarred Docker definition 2022-11-22 07:20:52 +00:00
42657aeee0 Lower Python version due to gensim incompatibility 2022-11-22 21:07:34 +00:00
1c34aa4a01 Clean up legacy and debugging code 2022-11-22 07:20:27 +00:00
6f3db61532 Add debugging statements for Portainer 2022-11-22 20:37:58 +00:00
3c18858c48 Use relative path for code 2022-11-22 20:35:13 +00:00
78c6ef96d2 Use relative path for build directory 2022-11-22 20:30:41 +00:00
d2a174c1c4 Remove Portainer Git volume 2022-11-22 20:28:44 +00:00
c53438d07b Remove port variable 2022-11-22 20:17:51 +00:00
93353f34e7 Update env example file 2022-11-22 20:17:40 +00:00
6b1604b724 Remove old compose file 2022-11-22 20:17:28 +00:00
34 changed files with 1108 additions and 1120 deletions

View File

@@ -1,18 +1,20 @@
repos:
- repo: https://github.com/psf/black
rev: 22.6.0
rev: 23.1.0
hooks:
- id: black
exclude: ^core/migrations
- repo: https://github.com/PyCQA/isort
rev: 5.10.1
rev: 5.11.5
hooks:
- id: isort
args: ["--profile", "black"]
- repo: https://github.com/PyCQA/flake8
rev: 4.0.1
rev: 6.0.0
hooks:
- id: flake8
args: [--max-line-length=88]
exclude: ^core/migrations
- repo: https://github.com/sirwart/ripsecrets.git
rev: v0.1.5
hooks:

View File

@@ -1,19 +1,19 @@
# syntax=docker/dockerfile:1
FROM python:3
FROM python:3.10
RUN useradd -d /code pathogen
RUN useradd -d /code xf
RUN mkdir /code
RUN chown pathogen:pathogen /code
RUN chown xf:xf /code
RUN mkdir /venv
RUN chown pathogen:pathogen /venv
RUN chown xf:xf /venv
USER pathogen
USER xf
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
WORKDIR /code
COPY requirements.txt /code/
COPY discord-patched.tgz /code/
COPY docker/discord-patched.tgz /code/
RUN python -m venv /venv
RUN . /venv/bin/activate && pip install -r requirements.txt

View File

@@ -1,20 +1,20 @@
run:
docker-compose -f docker/docker-compose.prod.yml --env-file=stack.env up -d
docker-compose -f docker-compose.prod.yml --env-file=stack.env up -d
build:
docker-compose -f docker/docker-compose.prod.yml --env-file=stack.env build
docker-compose -f docker-compose.prod.yml --env-file=stack.env build
stop:
docker-compose -f docker/docker-compose.prod.yml --env-file=stack.env down
docker-compose -f docker-compose.prod.yml --env-file=stack.env down
log:
docker-compose -f docker/docker-compose.prod.yml --env-file=stack.env logs -f
docker-compose -f docker-compose.prod.yml --env-file=stack.env logs -f --names
run-infra:
docker-compose -f docker/docker-compose.infra.yml --env-file=stack.env up -d
docker-compose -f docker-compose.infra.yml --env-file=stack.env up -d
stop-infra:
docker-compose -f docker/docker-compose.infra.yml --env-file=stack.env down
docker-compose -f docker-compose.infra.yml --env-file=stack.env down
log-infra:
docker-compose -f docker/docker-compose.infra.yml --env-file=stack.env logs -f
docker-compose -f docker-compose.infra.yml --env-file=stack.env logs -f

0
clients/mexc.py Normal file
View File

274
db.py
View File

@@ -1,25 +1,52 @@
import asyncio
from math import ceil
from os import getenv
from time import sleep
import aiomysql
import aioredis
import manticoresearch
import msgpack
import orjson
import redis
# Elasticsearch
from elasticsearch import AsyncElasticsearch
from manticoresearch.rest import ApiException
from numpy import array_split
from redis import StrictRedis
import util
from schemas import mc_s
trues = ("true", "1", "t", True)
mysql_pool = None
# INDEX = "msg"
configuration = manticoresearch.Configuration(host="http://127.0.0.1:9308")
api_client = manticoresearch.ApiClient(configuration)
api_instance = manticoresearch.IndexApi(api_client)
log = util.get_logger("db")
# Redis (legacy)
r = redis.from_url("redis://ssdb:1289", db=0)
# r = StrictRedis(unix_socket_path="/var/run/redis/redis.sock", db=0)
r = StrictRedis(
host="127.0.0.1", # Replace with your Redis server's IP address
port=1289, # Replace with your Redis server's port
db=0, # Database number
)
# AIORedis
ar = aioredis.from_url("redis://ssdb:1289", db=0)
# ar = aioredis.from_url("unix:///var/run/redis/redis.sock", db=0)
ar = aioredis.from_url("redis://127.0.0.1:1289", db=0)
# /var/run/neptune-redis.sock
# db = 10
pr = aioredis.from_url("unix://var/run/neptune-redis.sock", db=10)
# fr = aioredis.from_url("unix://var/run/fisk-redis.sock", db=10)
fr = aioredis.from_url("unix://var/run/redis.sock", db=10)
# pr = aioredis.from_url("redis://redis_neptune:6379", db=10, password=getenv("REDIS_PASSWORD"))
KEYNAME = "queue"
MESSAGE_KEY = "messages"
OHLC_MESSAGE_KEY = "ohlc"
TYPES_MAIN = [
"msg",
@@ -34,104 +61,172 @@ TYPES_MAIN = [
"topic",
"update",
]
MAIN_SRC_MAP = {
"dis": "main",
"irc": "restricted",
"4ch": "main",
}
TYPES_META = ["who"]
TYPES_INT = ["conn", "highlight", "znc", "query", "self"]
KEYNAME = "queue"
ELASTICSEARCH_USERNAME = getenv("ELASTICSEARCH_USERNAME", "elastic")
ELASTICSEARCH_PASSWORD = getenv("ELASTICSEARCH_PASSWORD", "changeme")
ELASTICSEARCH_HOST = getenv("ELASTICSEARCH_HOST", "localhost")
ELASTICSEARCH_PORT = int(getenv("ELASTICSEARCH_PORT", "9200"))
ELASTICSEARCH_TLS = getenv("ELASTICSEARCH_TLS", "false") in trues
client = None
# These are sometimes numeric, sometimes strings.
# If they are seen to be numeric first, ES will erroneously
# index them as "long" and then subsequently fail to index messages
# with strings in the field.
keyword_fields = ["nick_id", "user_id", "net_id"]
mapping = {
"mappings": {
"properties": {
"ts": {"type": "date", "format": "epoch_second"},
"file_tim": {"type": "date", "format": "epoch_millis"},
}
}
}
for field in keyword_fields:
mapping["mappings"]["properties"][field] = {"type": "text"}
async def initialise_elasticsearch():
async def init_mysql_pool():
"""
Initialise the Elasticsearch client.
Initialize the MySQL connection pool.
"""
auth = (ELASTICSEARCH_USERNAME, ELASTICSEARCH_PASSWORD)
client = AsyncElasticsearch(ELASTICSEARCH_HOST, http_auth=auth, verify_certs=False)
for index in ("main", "restricted"):
if await client.indices.exists(index=index):
# update index with mapping
await client.indices.put_mapping(
index=index, properties=mapping["mappings"]["properties"]
)
else:
await client.indices.create(index=index, mappings=mapping["mappings"])
return client
global mysql_pool
mysql_pool = await aiomysql.create_pool(
host="127.0.0.1", port=9306, db="Manticore", minsize=1, maxsize=10
)
async def rts_store_message(index, data):
"""
Store a RTS message into MySQL using an existing connection pool.
Prioritizes instant PubSub delivery, with minimal data storage overhead.
:param index: str
:param data: dict
"""
# Publish to Redis PubSub
packed_index = msgpack.packb({"index": index, "data": data}, use_bin_type=True)
try:
await fr.publish(OHLC_MESSAGE_KEY, packed_index)
except aioredis.exceptions.ConnectionError as e:
raise e
await asyncio.sleep(0.1)
# Insert data into MySQL
try:
async with mysql_pool.acquire() as conn:
async with conn.cursor() as cur:
# Insert data into the table
query = f"""
INSERT INTO {index} (s, o, c, h, l, v, a, i, t, t2, ts)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
# Bind the values directly
await cur.execute(
query,
(
data["s"], # symbol
data["o"], # open
data["c"], # close
data["h"], # high
data["l"], # low
data["v"], # volume_base
data["a"], # volume_quote
data["i"], # interval
data["t"], # start_time
data["t2"], # end_time
data["ts"], # event_time
),
)
await conn.commit()
log.debug(f"Stored data for {data['s']} in MySQL.")
except aiomysql.Error as e:
log.error(f"MySQL error: {e}")
async def store_batch(data):
global client
if not client:
client = await initialise_elasticsearch()
indexmap = {}
for msg in data:
if msg["type"] in TYPES_MAIN:
# index = "main"
index = MAIN_SRC_MAP[msg["src"]]
# schema = mc_s.schema_main
elif msg["type"] in TYPES_META:
index = "meta"
# schema = mc_s.schema_meta
elif msg["type"] in TYPES_INT:
index = "internal"
# schema = mc_s.schema_int
"""
Store a message into Manticore
:param data: list
"""
if not data:
return
# 10000: maximum inserts we can submit to
# Manticore as of Sept 2022
split_posts = array_split(data, ceil(len(data) / 10000))
for messages in split_posts:
total = []
indexmap = {}
for msg in messages:
if msg["type"] in TYPES_MAIN:
index = "main"
schema = mc_s.schema_main
elif msg["type"] in TYPES_META:
index = "meta"
schema = mc_s.schema_meta
elif msg["type"] in TYPES_INT:
index = "internal"
schema = mc_s.schema_int
# normalise fields
for key, value in list(msg.items()):
if value is None:
del msg[key]
if key in schema:
if isinstance(value, int):
if schema[key].startswith("string") or schema[key].startswith(
"text"
):
msg[key] = str(value)
INDEX = index
body = {"insert": {"index": index, "doc": msg}}
total.append(body)
if "ts" not in msg:
raise Exception("No TS in msg")
if index not in indexmap:
indexmap[index] = [msg]
else:
indexmap[index].append(msg)
# END MSG IN MESSAGES
# if key in schema:
# if isinstance(value, int):
# if schema[key].startswith("string") or schema[key].startswith(
# "text"
# ):
# msg[key] = str(value)
# body = orjson.dumps(msg)
if "ts" not in msg:
raise Exception("No TS in msg")
if INDEX not in indexmap:
indexmap[INDEX] = [msg]
else:
indexmap[INDEX].append(msg)
# Pack the indexmap with msgpack and publish it to Neptune
packed_index = msgpack.packb(indexmap, use_bin_type=True)
completed_publish = False
for i in range(10):
if completed_publish:
break
try:
await pr.publish(MESSAGE_KEY, packed_index)
completed_publish = True
except aioredis.exceptions.ConnectionError as e:
raise e
await asyncio.sleep(0.1)
if not completed_publish:
log.error("Failed to publish to Neptune")
for index, index_messages in indexmap.items():
for message in index_messages:
result = await client.index(index=index, body=message)
if not result["result"] == "created":
log.error(f"Indexing failed: {result}")
log.debug(f"Indexed {len(data)} messages in ES")
body_post = ""
for item in total:
# print("ITEM", item)
body_post += orjson.dumps(item).decode("utf-8")
body_post += "\n"
# print("BODY POST INDEX", index, body_post)
try:
# Bulk index operations
api_response = api_instance.bulk(body_post) # , async_req=True
except ApiException as e:
log.error("Exception when calling IndexApi->bulk: %s\n" % e)
log.error("body_post attempted to send", body_post)
log.info(f"Completed ingest to MC of length {len(total)}")
# END MESSAGES IN SPLIT
def update_schema():
pass
def create_index(api_client):
util_instance = manticoresearch.UtilsApi(api_client)
schemas = {
"main": mc_s.schema_main,
"rule_storage": mc_s.schema_rule_storage,
"meta": mc_s.schema_meta,
"internal": mc_s.schema_int,
}
for name, schema in schemas.items():
schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()])
create_query = (
f"create table if not exists {name}({schema_types}) engine='columnar'"
)
print("Schema types", create_query)
util_instance.sql(create_query)
async def queue_message(msg):
"""
Queue a message on the Redis buffer.
"""
# TODO: msgpack
message = orjson.dumps(msg)
await ar.lpush(KEYNAME, message)
@@ -141,5 +236,6 @@ async def queue_message_bulk(data):
Queue multiple messages on the Redis buffer.
"""
for msg in data:
# TODO: msgpack
message = orjson.dumps(msg)
await ar.lpush(KEYNAME, message)

174
db_old_ref.py Normal file
View File

@@ -0,0 +1,174 @@
import asyncio
from os import getenv
import aioredis
import msgpack
import orjson
import redis
# Elasticsearch
from elasticsearch import AsyncElasticsearch
import util
trues = ("true", "1", "t", True)
# INDEX = "msg"
log = util.get_logger("db")
# Redis (legacy)
# r = redis.from_url("redis://ssdb:1289", db=0)
# AIORedis
ar = aioredis.from_url("redis://ssdb:1289", db=0)
# Neptune redis for PubSub
pr = aioredis.from_url("redis://redis_neptune:6379", db=10)
TYPES_MAIN = [
"msg",
"notice",
"action",
"part",
"join",
"kick",
"quit",
"nick",
"mode",
"topic",
"update",
]
MAIN_SRC_MAP = {
"dis": "main",
"irc": "restricted",
"4ch": "main",
}
TYPES_META = ["who"]
TYPES_INT = ["conn", "highlight", "znc", "query", "self"]
KEYNAME = "queue"
MESSAGE_KEY = "messages"
ELASTICSEARCH_USERNAME = getenv("ELASTICSEARCH_USERNAME", "elastic")
ELASTICSEARCH_PASSWORD = getenv("ELASTICSEARCH_PASSWORD", "changeme")
ELASTICSEARCH_HOST = getenv("ELASTICSEARCH_HOST", "localhost")
ELASTICSEARCH_TLS = getenv("ELASTICSEARCH_TLS", "false") in trues
client = None
# These are sometimes numeric, sometimes strings.
# If they are seen to be numeric first, ES will erroneously
# index them as "long" and then subsequently fail to index messages
# with strings in the field.
keyword_fields = ["nick_id", "user_id", "net_id"]
mapping_int = {
"mappings": {
"properties": {
"ts": {"type": "date", "format": "epoch_second"},
"file_tim": {"type": "date", "format": "epoch_millis"},
}
}
}
mapping = dict(mapping_int)
for field in keyword_fields:
mapping["mappings"]["properties"][field] = {"type": "text"}
del mapping_int["mappings"]["properties"]["file_tim"]
async def initialise_elasticsearch():
"""
Initialise the Elasticsearch client.
"""
auth = (ELASTICSEARCH_USERNAME, ELASTICSEARCH_PASSWORD)
client = AsyncElasticsearch(ELASTICSEARCH_HOST, http_auth=auth, verify_certs=False)
for index in ("main", "meta", "restricted", "internal"):
if index == "internal":
map_dict = mapping_int
else:
map_dict = mapping
if await client.indices.exists(index=index):
# update index with mapping
await client.indices.put_mapping(
index=index, properties=map_dict["mappings"]["properties"]
)
else:
await client.indices.create(index=index, mappings=map_dict["mappings"])
return client
async def store_batch(data):
global client
if not client:
client = await initialise_elasticsearch()
indexmap = {}
for msg in data:
if msg["type"] in TYPES_MAIN:
# index = "main"
index = MAIN_SRC_MAP[msg["src"]]
# schema = mc_s.schema_main
elif msg["type"] in TYPES_META:
index = "meta"
# schema = mc_s.schema_meta
elif msg["type"] in TYPES_INT:
index = "internal"
# schema = mc_s.schema_int
INDEX = index
# if key in schema:
# if isinstance(value, int):
# if schema[key].startswith("string") or schema[key].startswith(
# "text"
# ):
# msg[key] = str(value)
# body = orjson.dumps(msg)
if "ts" not in msg:
raise Exception("No TS in msg")
if INDEX not in indexmap:
indexmap[INDEX] = [msg]
else:
indexmap[INDEX].append(msg)
# Pack the indexmap with msgpack and publish it to Neptune
packed_index = msgpack.packb(indexmap, use_bin_type=True)
completed_publish = False
for i in range(10):
if completed_publish:
break
try:
await pr.publish(MESSAGE_KEY, packed_index)
completed_publish = True
except aioredis.exceptions.ConnectionError:
await asyncio.sleep(0.1)
if not completed_publish:
log.error("Failed to publish to Neptune")
for index, index_messages in indexmap.items():
for message in index_messages:
result = await client.index(index=index, body=message)
if not result["result"] == "created":
log.error(f"Indexing failed: {result}")
log.debug(f"Indexed {len(data)} messages in ES")
async def queue_message(msg):
"""
Queue a message on the Redis buffer.
"""
# TODO: msgpack
message = orjson.dumps(msg)
await ar.lpush(KEYNAME, message)
async def queue_message_bulk(data):
"""
Queue multiple messages on the Redis buffer.
"""
for msg in data:
# TODO: msgpack
message = orjson.dumps(msg)
await ar.lpush(KEYNAME, message)

206
docker-compose.prod.yml Normal file
View File

@@ -0,0 +1,206 @@
version: "2.2"
services:
rts:
image: xf/monolith:latest
container_name: rts_monolith
command: sh -c '. /venv/bin/activate && exec python rts.py'
build: .
volumes:
- ${PORTAINER_GIT_DIR}:/code
- type: bind
source: /code/run
target: /var/run
environment:
PORTAINER_GIT_DIR: "${PORTAINER_GIT_DIR}"
MODULES_ENABLED: "${MODULES_ENABLED}"
MONOLITH_RTS_MEXC_API_ACCESS_KEY: "${MONOLITH_RTS_MEXC_API_ACCESS_KEY}"
MONOLITH_RTS_MEXC_API_SECRET_KEY: "${MONOLITH_RTS_MEXC_API_SECRET_KEY}"
deploy:
resources:
limits:
cpus: '0.5'
memory: 1.0G
network_mode: host
app:
image: xf/monolith:latest
container_name: monolith
#command: sh -c '. /venv/bin/activate && exec python -m cProfile -o /tmp/profile.out monolith.py'
build: .
volumes:
- ${PORTAINER_GIT_DIR}:/code
- type: bind
source: /code/run
target: /var/run
environment:
PORTAINER_GIT_DIR: "${PORTAINER_GIT_DIR}"
MODULES_ENABLED: "${MODULES_ENABLED}"
DISCORD_TOKEN: "${DISCORD_TOKEN}"
THRESHOLD_LISTENER_HOST: "${THRESHOLD_LISTENER_HOST}"
THRESHOLD_LISTENER_PORT: "${THRESHOLD_LISTENER_PORT}"
THRESHOLD_LISTENER_SSL: "${THRESHOLD_LISTENER_SSL}"
THRESHOLD_RELAY_ENABLED: "${THRESHOLD_RELAY_ENABLED}"
THRESHOLD_RELAY_HOST: "${THRESHOLD_RELAY_HOST}"
THRESHOLD_RELAY_PORT: "${THRESHOLD_RELAY_PORT}"
THRESHOLD_RELAY_SSL: "${THRESHOLD_RELAY_SSL}"
THRESHOLD_API_ENABLED: "${THRESHOLD_API_ENABLED}"
THRESHOLD_API_HOST: "${THRESHOLD_API_HOST}"
THRESHOLD_API_PORT: "${THRESHOLD_API_PORT}"
THRESHOLD_CONFIG_DIR: "${THRESHOLD_CONFIG_DIR}"
#THRESHOLD_TEMPLATE_DIR: "${#THRESHOLD_TEMPLATE_DIR}"
THRESHOLD_CERT_DIR: "${THRESHOLD_CERT_DIR}"
# How many messages to ingest at once from Redis
MONOLITH_INGEST_CHUNK_SIZE: "${MONOLITH_INGEST_CHUNK_SIZE}"
# Time to wait between polling Redis again
MONOLITH_INGEST_ITER_DELAY: "${MONOLITH_INGEST_ITER_DELAY}"
# Number of 4chan threads to request at once
MONOLITH_CH4_THREADS_CONCURRENT: "${MONOLITH_CH4_THREADS_CONCURRENT}"
# Time to wait between every MONOLITH_CH4_THREADS_CONCURRENT threads
MONOLITH_CH4_THREADS_DELAY: "${MONOLITH_CH4_THREADS_DELAY}"
# Time to wait after finishing a crawl before starting again
MONOLITH_CH4_CRAWL_DELAY: "${MONOLITH_CH4_CRAWL_DELAY}"
# Semaphore value
MONOLITH_CH4_THREADS_SEMAPHORE: "${MONOLITH_CH4_THREADS_SEMAPHORE}"
# Threads to use for data processing
# Leave uncommented to use all available threads
MONOLITH_PROCESS_THREADS: "${MONOLITH_PROCESS_THREADS}"
# Enable performance metrics after message processing
MONOLITH_PROCESS_PERFSTATS: "${MONOLITH_PROCESS_PERFSTATS}"
MONOLITH_PROCESS_TARGET_CPU_USAGE: "${MONOLITH_PROCESS_TARGET_CPU_USAGE}"
MONOLITH_CH4_TARGET_CPU_USAGE: "${MONOLITH_CH4_TARGET_CPU_USAGE}"
MONOLITH_CH4_BOARDS: "${MONOLITH_CH4_BOARDS}"
REDIS_PASSWORD: "${REDIS_PASSWORD}"
MONOLITH_INGEST_INCREASE_BELOW: "${MONOLITH_INGEST_INCREASE_BELOW}"
MONOLITH_INGEST_INCREASE_BY: "${MONOLITH_INGEST_INCREASE_BY}"
MONOLITH_INGEST_DECREASE_ABOVE: "${MONOLITH_INGEST_DECREASE_ABOVE}"
MONOLITH_INGEST_DECREASE_BY: "${MONOLITH_INGEST_DECREASE_BY}"
MONOLITH_INGEST_MAX: "${MONOLITH_INGEST_MAX}"
MONOLITH_INGEST_MIN: "${MONOLITH_INGEST_MIN}"
deploy:
resources:
limits:
cpus: '0.5'
memory: 1.0G
network_mode: host
threshold:
image: xf/threshold:latest
container_name: threshold
build: legacy/docker
volumes:
- ${PORTAINER_GIT_DIR}:/code
- ${THRESHOLD_CONFIG_DIR}:/code/legacy/conf/live
#- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates
- ${THRESHOLD_CERT_DIR}:/code/legacy/conf/cert
volumes_from:
- tmp
ports:
- "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}"
- "${THRESHOLD_RELAY_PORT}:${THRESHOLD_RELAY_PORT}"
- "${THRESHOLD_API_PORT}:${THRESHOLD_API_PORT}"
environment:
PORTAINER_GIT_DIR: "${PORTAINER_GIT_DIR}"
MODULES_ENABLED: "${MODULES_ENABLED}"
DISCORD_TOKEN: "${DISCORD_TOKEN}"
THRESHOLD_LISTENER_HOST: "${THRESHOLD_LISTENER_HOST}"
THRESHOLD_LISTENER_PORT: "${THRESHOLD_LISTENER_PORT}"
THRESHOLD_LISTENER_SSL: "${THRESHOLD_LISTENER_SSL}"
THRESHOLD_RELAY_ENABLED: "${THRESHOLD_RELAY_ENABLED}"
THRESHOLD_RELAY_HOST: "${THRESHOLD_RELAY_HOST}"
THRESHOLD_RELAY_PORT: "${THRESHOLD_RELAY_PORT}"
THRESHOLD_RELAY_SSL: "${THRESHOLD_RELAY_SSL}"
THRESHOLD_API_ENABLED: "${THRESHOLD_API_ENABLED}"
THRESHOLD_API_HOST: "${THRESHOLD_API_HOST}"
THRESHOLD_API_PORT: "${THRESHOLD_API_PORT}"
THRESHOLD_CONFIG_DIR: "${THRESHOLD_CONFIG_DIR}"
#THRESHOLD_TEMPLATE_DIR: "${#THRESHOLD_TEMPLATE_DIR}"
THRESHOLD_CERT_DIR: "${THRESHOLD_CERT_DIR}"
# How many messages to ingest at once from Redis
MONOLITH_INGEST_CHUNK_SIZE: "${MONOLITH_INGEST_CHUNK_SIZE}"
# Time to wait between polling Redis again
MONOLITH_INGEST_ITER_DELAY: "${MONOLITH_INGEST_ITER_DELAY}"
# Number of 4chan threads to request at once
MONOLITH_CH4_THREADS_CONCURRENT: "${MONOLITH_CH4_THREADS_CONCURRENT}"
# Time to wait between every MONOLITH_CH4_THREADS_CONCURRENT threads
MONOLITH_CH4_THREADS_DELAY: "${MONOLITH_CH4_THREADS_DELAY}"
# Time to wait after finishing a crawl before starting again
MONOLITH_CH4_CRAWL_DELAY: "${MONOLITH_CH4_CRAWL_DELAY}"
# Semaphore value
MONOLITH_CH4_THREADS_SEMAPHORE: "${MONOLITH_CH4_THREADS_SEMAPHORE}"
# Threads to use for data processing
# Leave uncommented to use all available threads
MONOLITH_PROCESS_THREADS: "${MONOLITH_PROCESS_THREADS}"
# Enable performance metrics after message processing
MONOLITH_PROCESS_PERFSTATS: "${MONOLITH_PROCESS_PERFSTATS}"
MONOLITH_CH4_BOARDS: "${MONOLITH_CH4_BOARDS}"
REDIS_PASSWORD: "${REDIS_PASSWORD}"
# for development
extra_hosts:
- "host.docker.internal:host-gateway"
network_mode: host
ssdb:
image: tsl0922/ssdb
container_name: ssdb_monolith
ports:
- "1289:1289"
environment:
- SSDB_PORT=1289
volumes:
- monolith_ssdb_data:/var/lib/ssdb
# networks:
# - default
# - db
deploy:
resources:
limits:
cpus: '0.5'
memory: 1.0G
network_mode: host
redis:
image: redis
container_name: redis_monolith
command: redis-server /etc/redis.conf
ulimits:
nproc: 65535
nofile:
soft: 65535
hard: 65535
volumes:
- ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf
- monolith_redis_data:/data
- type: bind
source: /code/run
target: /var/run
# volumes_from:
# - tmp
healthcheck:
test: "redis-cli ping"
interval: 2s
timeout: 2s
retries: 15
# networks:
# - default
# - xf
# - db
deploy:
resources:
limits:
cpus: '0.5'
memory: 1.0G
network_mode: host
# networks:
# default:
# driver: bridge
# xf:
# external: true
# db:
# external: true
volumes:
monolith_redis_data:
monolith_ssdb_data:

View File

@@ -1,379 +0,0 @@
version: "2.2"
x-superset-image: &superset-image apache/superset:${TAG:-latest-dev}
x-superset-depends-on: &superset-depends-on
- db
- redis_superset
x-superset-volumes: &superset-volumes
# /app/pythonpath_docker will be appended to the PYTHONPATH in the final container
- ${PORTAINER_GIT_DIR}/docker/superset:/app/docker
- superset_home:/app/superset_home
services:
app:
image: pathogen/monolith:latest
container_name: monolith
build: ./docker
volumes:
- ${PORTAINER_GIT_DIR}:/code
env_file:
- .env
volumes_from:
- tmp
depends_on:
druid:
condition: service_started
kafka:
condition: service_healthy
tmp:
condition: service_started
redis:
condition: service_healthy
# - db
threshold:
image: pathogen/threshold:latest
container_name: threshold
build: ./legacy/docker
volumes:
- ${PORTAINER_GIT_DIR}:/code
- ${THRESHOLD_CONFIG_DIR}:/code/legacy/conf/live
#- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates
- ${THRESHOLD_CERT_DIR}:/code/legacy/conf/cert
ports:
- "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}"
- "${THRESHOLD_RELAY_PORT}:${THRESHOLD_RELAY_PORT}"
- "${THRESHOLD_API_PORT}:${THRESHOLD_API_PORT}"
env_file:
- .env
# for development
extra_hosts:
- "host.docker.internal:host-gateway"
volumes_from:
- tmp
depends_on:
tmp:
condition: service_started
redis:
condition: service_healthy
# db:
#image: pathogen/manticore:kibana
# image: manticoresearch/manticore:latest
#build:
# context: ./docker/manticore
# args:
# DEV: 1
# restart: always
# turnilo:
# container_name: turnilo
# image: uchhatre/turnilo:latest
# ports:
# - 9093:9090
# environment:
# - DRUID_BROKER_URL=http://broker:8082
# - CONFIG_FILE=/config.yaml
# volumes:
# - ${PORTAINER_GIT_DIR}/docker/turnilo.yaml:/config.yaml
# depends_on:
# - broker
# metabase:
# container_name: metabase
# image: metabase/metabase:latest
# ports:
# - 3096:3000
# environment:
# JAVA_OPTS: -Xmx1g
# MB_DB_TYPE: postgres
# MB_DB_DBNAME: metabase
# MB_DB_PORT: 5432
# MB_DB_USER: druid
# MB_DB_PASS: FoolishPassword
# MB_DB_HOST: postgres
# depends_on:
# - broker
redis_superset:
image: redis:latest
container_name: superset_cache
restart: unless-stopped
volumes:
- redis_superset:/data
db:
env_file: docker/.env-non-dev
image: postgres:10
container_name: superset_db
restart: unless-stopped
volumes:
- db_home:/var/lib/postgresql/data
superset:
env_file: docker/.env-non-dev
image: *superset-image
container_name: superset_app
command: ["/app/docker/docker-bootstrap.sh", "app-gunicorn"]
user: "root"
restart: unless-stopped
ports:
- 8088:8088
depends_on: *superset-depends-on
volumes: *superset-volumes
superset-init:
image: *superset-image
container_name: superset_init
command: ["/app/docker/docker-init.sh"]
env_file: docker/.env-non-dev
depends_on: *superset-depends-on
user: "root"
volumes: *superset-volumes
superset-worker:
image: *superset-image
container_name: superset_worker
command: ["/app/docker/docker-bootstrap.sh", "worker"]
env_file: docker/.env-non-dev
restart: unless-stopped
depends_on: *superset-depends-on
user: "root"
volumes: *superset-volumes
superset-worker-beat:
image: *superset-image
container_name: superset_worker_beat
command: ["/app/docker/docker-bootstrap.sh", "beat"]
env_file: docker/.env-non-dev
restart: unless-stopped
depends_on: *superset-depends-on
user: "root"
volumes: *superset-volumes
postgres:
container_name: postgres
image: postgres:latest
volumes:
- metadata_data:/var/lib/postgresql/data
environment:
POSTGRES_PASSWORD: FoolishPassword
POSTGRES_USER: druid
POSTGRES_DB: druid
# Need 3.5 or later for container nodes
zookeeper:
container_name: zookeeper
image: zookeeper:3.5
ports:
- "2181:2181"
environment:
- ZOO_MY_ID=1
kafka:
image: wurstmeister/kafka:latest
container_name: kafka
depends_on:
- zookeeper
ports:
- 9092:9092
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_MESSAGE_MAX_BYTES: 2000000
#KAFKA_HEAP_OPTS: -Xmx2g
healthcheck:
test: ["CMD", "kafka-topics.sh", "--list", "--bootstrap-server", "kafka:9092"]
start_period: 15s
interval: 30s
timeout: 30s
retries: 45
druid:
image: pathogen/druid:0.23.0
build: ./docker/druid/
container_name: druid
volumes:
- druid_shared:/opt/shared
- druid_var:/opt/druid/var
depends_on:
- zookeeper
- postgres
ports:
- "8081:8081"
- "8082:8082"
- "8083:8083"
- "8888:8888"
env_file:
- environment
# coordinator:
# #image: apache/druid:0.23.0
# image: pathogen/druid:0.23.0
# build: ./docker/druid/
# container_name: coordinator
# volumes:
# - druid_shared:/opt/shared
# - coordinator_var:/opt/druid/var
# depends_on:
# - zookeeper
# - postgres
# ports:
# - "8081:8081"
# command:
# - coordinator
# env_file:
# - environment
# broker:
# #image: apache/druid:0.23.0
# image: pathogen/druid:0.23.0
# build: ./docker/druid/
# container_name: broker
# volumes:
# - broker_var:/opt/druid/var
# depends_on:
# - zookeeper
# - postgres
# - coordinator
# ports:
# - "8082:8082"
# command:
# - broker
# env_file:
# - environment
# historical:
# #image: apache/druid:0.23.0
# image: pathogen/druid:0.23.0
# build: ./docker/druid/
# container_name: historical
# volumes:
# - druid_shared:/opt/shared
# - historical_var:/opt/druid/var
# depends_on:
# - zookeeper
# - postgres
# - coordinator
# ports:
# - "8083:8083"
# command:
# - historical
# env_file:
# - environment
# middlemanager:
# #image: apache/druid:0.23.0
# image: pathogen/druid:0.23.0
# build: ./docker/druid/
# container_name: middlemanager
# volumes:
# - druid_shared:/opt/shared
# - middle_var:/opt/druid/var
# depends_on:
# - zookeeper
# - postgres
# - coordinator
# ports:
# - "8091:8091"
# - "8100-8105:8100-8105"
# command:
# - middleManager
# env_file:
# - environment
# router:
# #image: apache/druid:0.23.0
# image: pathogen/druid:0.23.0
# build: ./docker/druid/
# container_name: router
# volumes:
# - router_var:/opt/druid/var
# depends_on:
# - zookeeper
# - postgres
# - coordinator
# ports:
# - "8888:8888"
# command:
# - router
# env_file:
# - environment
# db:
# #image: pathogen/manticore:kibana
# image: manticoresearch/manticore:dev
# #build:
# # context: ./docker/manticore
# # args:
# # DEV: 1
# restart: always
# ports:
# - 9308
# - 9312
# - 9306
# ulimits:
# nproc: 65535
# nofile:
# soft: 65535
# hard: 65535
# memlock:
# soft: -1
# hard: -1
# environment:
# - MCL=1
# volumes:
# - ./docker/data:/var/lib/manticore
# - ./docker/manticore.conf:/etc/manticoresearch/manticore.conf
tmp:
image: busybox
command: chmod -R 777 /var/run/redis
volumes:
- /var/run/redis
redis:
image: redis
command: redis-server /etc/redis.conf
ulimits:
nproc: 65535
nofile:
soft: 65535
hard: 65535
volumes:
- ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf
- redis_data:/data
volumes_from:
- tmp
healthcheck:
test: "redis-cli -s /var/run/redis/redis.sock ping"
interval: 2s
timeout: 2s
retries: 15
networks:
default:
external:
name: pathogen
volumes:
superset_home:
external: false
db_home:
external: false
redis_superset:
external: false
redis_data: {}
metadata_data: {}
# middle_var: {}
# historical_var: {}
# broker_var: {}
# coordinator_var: {}
druid_var: {}
druid_shared: {}

View File

@@ -1,86 +0,0 @@
version: "2.2"
services:
app:
image: pathogen/monolith:latest
container_name: monolith
build: ${PORTAINER_GIT_DIR}/docker
volumes:
- ${PORTAINER_GIT_DIR}:/code
env_file:
- ../stack.env
networks:
- default
- pathogen
- elastic
threshold:
image: pathogen/threshold:latest
container_name: threshold
build: ./legacy/docker
volumes:
- ${PORTAINER_GIT_DIR}:/code
- ${THRESHOLD_CONFIG_DIR}:/code/legacy/conf/live
#- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates
- ${THRESHOLD_CERT_DIR}:/code/legacy/conf/cert
volumes_from:
- tmp
ports:
- "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}"
- "${THRESHOLD_RELAY_PORT}:${THRESHOLD_RELAY_PORT}"
- "${THRESHOLD_API_PORT}:${THRESHOLD_API_PORT}"
env_file:
- ../stack.env
# for development
extra_hosts:
- "host.docker.internal:host-gateway"
networks:
- default
ssdb:
image: tsl0922/ssdb
container_name: ssdb_monolith
ports:
- "1289:1289"
environment:
- SSDB_PORT=1289
networks:
- default
tmp:
image: busybox
container_name: tmp_monolith
command: chmod -R 777 /var/run/socks
volumes:
- /var/run/socks
redis:
image: redis
container_name: redis_monolith
command: redis-server /etc/redis.conf
ulimits:
nproc: 65535
nofile:
soft: 65535
hard: 65535
volumes:
- ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf
- redis_data:/data
volumes_from:
- tmp
healthcheck:
test: "redis-cli -s /var/run/socks/redis.sock ping"
interval: 2s
timeout: 2s
retries: 15
networks:
default:
driver: bridge
pathogen:
external: true
elastic:
external: true
volumes:
redis_data:

View File

@@ -1,87 +0,0 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Java tuning
#DRUID_XMX=1g
#DRUID_XMS=1g
#DRUID_MAXNEWSIZE=250m
#DRUID_NEWSIZE=250m
#DRUID_MAXDIRECTMEMORYSIZE=1g
#druid_emitter_logging_logLevel=debug
#druid_extensions_loadList=["druid-histogram", "druid-datasketches", "druid-lookups-cached-global", "postgresql-metadata-storage", "druid-kafka-indexing-service"]
#druid_zk_service_host=zookeeper
#druid_metadata_storage_host=
#druid_metadata_storage_type=postgresql
#druid_metadata_storage_connector_connectURI=jdbc:postgresql://postgres:5432/druid
#druid_metadata_storage_connector_user=druid
#druid_metadata_storage_connector_password=FoolishPassword
#druid_coordinator_balancer_strategy=cachingCost
#druid_indexer_runner_javaOptsArray=["-server", "-Xmx1g", "-Xms1g", "-XX:MaxDirectMemorySize=3g", "-Duser.timezone=UTC", "-Dfile.encoding=UTF-8", "-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
#druid_indexer_fork_property_druid_processing_buffer_sizeBytes=128MiB
#druid_processing_buffer_sizeBytes=268435456 # 256MiB
#druid_storage_type=local
#druid_storage_storageDirectory=/opt/shared/segments
#druid_indexer_logs_type=file
#druid_indexer_logs_directory=/opt/shared/indexing-logs
#druid_processing_numThreads=1
#druid_processing_numMergeBuffers=1
#DRUID_LOG4J=<?xml version="1.0" encoding="UTF-8" ?><Configuration status="WARN"><Appenders><Console name="Console" target="SYSTEM_OUT"><PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/></Console></Appenders><Loggers><Root level="info"><AppenderRef ref="Console"/></Root><Logger name="org.apache.druid.jetty.RequestLog" additivity="false" level="DEBUG"><AppenderRef ref="Console"/></Logger></Loggers></Configuration>
# Java tuning
#DRUID_XMX=1g
#DRUID_XMS=1g
#DRUID_MAXNEWSIZE=250m
#DRUID_NEWSIZE=250m
#DRUID_MAXDIRECTMEMORYSIZE=6172m
DRUID_SINGLE_NODE_CONF=nano-quickstart
druid_emitter_logging_logLevel=debug
druid_extensions_loadList=["druid-histogram", "druid-datasketches", "druid-lookups-cached-global", "postgresql-metadata-storage", "druid-kafka-indexing-service"]
druid_zk_service_host=zookeeper
druid_metadata_storage_host=
druid_metadata_storage_type=postgresql
druid_metadata_storage_connector_connectURI=jdbc:postgresql://postgres:5432/druid
druid_metadata_storage_connector_user=druid
druid_metadata_storage_connector_password=FoolishPassword
druid_coordinator_balancer_strategy=cachingCost
druid_indexer_runner_javaOptsArray=["-server", "-Xmx1g", "-Xms1g", "-XX:MaxDirectMemorySize=3g", "-Duser.timezone=UTC", "-Dfile.encoding=UTF-8", "-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
druid_indexer_fork_property_druid_processing_buffer_sizeBytes=256MiB
druid_storage_type=local
druid_storage_storageDirectory=/opt/shared/segments
druid_indexer_logs_type=file
druid_indexer_logs_directory=/opt/shared/indexing-logs
druid_processing_numThreads=2
druid_processing_numMergeBuffers=2
DRUID_LOG4J=<?xml version="1.0" encoding="UTF-8" ?><Configuration status="WARN"><Appenders><Console name="Console" target="SYSTEM_OUT"><PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/></Console></Appenders><Loggers><Root level="info"><AppenderRef ref="Console"/></Root><Logger name="org.apache.druid.jetty.RequestLog" additivity="false" level="DEBUG"><AppenderRef ref="Console"/></Logger></Loggers></Configuration>

View File

@@ -1,265 +0,0 @@
#!/bin/sh
ip=`hostname -i|rev|cut -d\ -f 1|rev`
cat << EOF
searchd {
# https://manual.manticoresearch.com/Server_settings/Searchd#access_plain_attrs
# access_plain_attrs = mmap_preread
# https://manual.manticoresearch.com/Server_settings/Searchd#access_blob_attrs
# access_blob_attrs = mmap_preread
# https://manual.manticoresearch.com/Server_settings/Searchd#access_doclists
# access_doclists = file
# https://manual.manticoresearch.com/Server_settings/Searchd#access_hitlists
# access_hitlists = file
# https://manual.manticoresearch.com/Server_settings/Searchd#agent_connect_timeout
# agent_connect_timeout =
# https://manual.manticoresearch.com/Server_settings/Searchd#agent_query_timeout
# agent_query_timeout =
# https://manual.manticoresearch.com/Server_settings/Searchd#agent_retry_count
# agent_retry_count = 0
# https://manual.manticoresearch.com/Server_settings/Searchd#agent_retry_delay
# agent_retry_delay = 500
# https://manual.manticoresearch.com/Server_settings/Searchd#attr_flush_period
# attr_flush_period = 0
# https://manual.manticoresearch.com/Server_settings/Searchd#binlog_flush
# binlog_flush = 2
# https://manual.manticoresearch.com/Server_settings/Searchd#binlog_max_log_size
# binlog_max_log_size = 268435456
# https://manual.manticoresearch.com/Server_settings/Searchd#binlog_path
# binlog_path =
# https://manual.manticoresearch.com/Server_settings/Searchd#client_timeout
# client_timeout = 300
# https://manual.manticoresearch.com/Server_settings/Searchd#collation_libc_locale
# collation_libc_locale = C
# https://manual.manticoresearch.com/Server_settings/Searchd#collation_server
# collation_server = libc_ci
# https://manual.manticoresearch.com/Server_settings/Searchd#data_dir
data_dir = /var/lib/manticore
# https://manual.manticoresearch.com/Server_settings/Searchd#docstore_cache_size
# docstore_cache_size = 16m
# https://manual.manticoresearch.com/Server_settings/Searchd#expansion_limit
# expansion_limit = 0
# https://manual.manticoresearch.com/Server_settings/Searchd#grouping_in_utc
# grouping_in_utc = 0
# https://manual.manticoresearch.com/Server_settings/Searchd#ha_period_karma
# ha_period_karma = 60
# https://manual.manticoresearch.com/Server_settings/Searchd#ha_ping_interval
# ha_ping_interval = 1000
# https://manual.manticoresearch.com/Server_settings/Searchd#hostname_lookup
# hostname_lookup =
# https://manual.manticoresearch.com/Server_settings/Searchd#jobs_queue_size
# jobs_queue_size =
# https://manual.manticoresearch.com/Server_settings/Searchd#listen_backlog
# listen_backlog = 5
# https://manual.manticoresearch.com/Server_settings/Searchd#listen
# listen_env = this directive allows to append listeners from environment variables
listen = 9306:mysql41
listen = /var/run/mysqld/mysqld.sock:mysql41
listen = $ip:9312
listen = 9308:http
listen = $ip:9315-9325:replication
# https://manual.manticoresearch.com/Server_settings/Searchd#listen_tfo
# listen_tfo = 0
# https://manual.manticoresearch.com/Server_settings/Searchd#log
log = /var/log/manticore/searchd.log
# https://manual.manticoresearch.com/Server_settings/Searchd#max_batch_queries
# max_batch_queries = 32
# https://manual.manticoresearch.com/Server_settings/Searchd#threads
# threads =
# https://manual.manticoresearch.com/Server_settings/Searchd#max_filters
# max_filters = 256
# https://manual.manticoresearch.com/Server_settings/Searchd#max_filter_values
# max_filter_values = 4096
# https://manual.manticoresearch.com/Server_settings/Searchd#max_open_files
# max_open_files = max
# https://manual.manticoresearch.com/Server_settings/Searchd#max_packet_size
max_packet_size = 128M
# https://manual.manticoresearch.com/Server_settings/Searchd#mysql_version_string
# mysql_version_string =
# https://manual.manticoresearch.com/Server_settings/Searchd#net_workers
# net_workers = 1
# https://manual.manticoresearch.com/Server_settings/Searchd#net_wait_tm
# net_wait_tm = -1
# https://manual.manticoresearch.com/Server_settings/Searchd#net_throttle_accept
# net_throttle_accept = 0
# https://manual.manticoresearch.com/Server_settings/Searchd#net_throttle_action
# net_throttle_action = 0
# https://manual.manticoresearch.com/Server_settings/Searchd#node_address
# node_address =
# https://manual.manticoresearch.com/Server_settings/Searchd#ondisk_attrs_default
# ondisk_attrs_default = 0
# https://manual.manticoresearch.com/Server_settings/Searchd#persistent_connections_limit
# persistent_connections_limit =
# https://manual.manticoresearch.com/Server_settings/Searchd#pid_file
pid_file = /var/run/manticore/searchd.pid
# https://manual.manticoresearch.com/Server_settings/Searchd#predicted_time_costs
# predicted_time_costs = doc=64, hit=48, skip=2048, match=64
# https://manual.manticoresearch.com/Server_settings/Searchd#preopen_indexes
# preopen_indexes = 1
# https://manual.manticoresearch.com/Server_settings/Searchd#qcache_max_bytes
qcache_max_bytes = 128Mb
# https://manual.manticoresearch.com/Server_settings/Searchd#qcache_thresh_msec
qcache_thresh_msec = 150
# https://manual.manticoresearch.com/Server_settings/Searchd#qcache_ttl_sec
qcache_ttl_sec = 120
# https://manual.manticoresearch.com/Server_settings/Searchd#query_log_format
query_log_format = sphinxql
# https://manual.manticoresearch.com/Server_settings/Searchd#query_log_min_msec
# query_log_min_msec = 0
# https://manual.manticoresearch.com/Server_settings/Searchd#query_log
# query_log = /var/log/manticore/query.log
# https://manual.manticoresearch.com/Server_settings/Searchd#query_log_mode
# query_log_mode = 600
# https://manual.manticoresearch.com/Server_settings/Searchd#max_connections
# max_connections =
# https://manual.manticoresearch.com/Server_settings/Searchd#network_timeout
# network_timeout = 5
# https://manual.manticoresearch.com/Server_settings/Searchd#read_buffer
# read_buffer = 256K
# https://manual.manticoresearch.com/Server_settings/Searchd#read_buffer_docs
# read_buffer_docs = 256K
# https://manual.manticoresearch.com/Server_settings/Searchd#read_buffer_hits
# read_buffer_hits = 256K
# https://manual.manticoresearch.com/Server_settings/Searchd#read_unhinted
# read_unhinted 32K
# https://manual.manticoresearch.com/Server_settings/Searchd#rt_flush_period
# rt_flush_period =
# https://manual.manticoresearch.com/Server_settings/Searchd#rt_merge_iops
# rt_merge_iops = 0
# https://manual.manticoresearch.com/Server_settings/Searchd#rt_merge_maxiosize
# rt_merge_maxiosize = 0
# https://manual.manticoresearch.com/Server_settings/Searchd#seamless_rotate
# seamless_rotate = 1
# https://manual.manticoresearch.com/Server_settings/Searchd#server_id
# server_id =
# https://manual.manticoresearch.com/Server_settings/Searchd#shutdown_timeout
# shutdown_timeout = 3
# https://manual.manticoresearch.com/Server_settings/Searchd#shutdown_token
# shutdown_token =
# https://manual.manticoresearch.com/Server_settings/Searchd#snippets_file_prefix
# snippets_file_prefix =
# https://manual.manticoresearch.com/Server_settings/Searchd#sphinxql_state
# sphinxql_state =
# https://manual.manticoresearch.com/Server_settings/Searchd#sphinxql_timeout
# sphinxql_timeout = 900
# https://manual.manticoresearch.com/Server_settings/Searchd#ssl_ca
# ssl_ca =
# https://manual.manticoresearch.com/Server_settings/Searchd#ssl_cert
# ssl_cert =
# https://manual.manticoresearch.com/Server_settings/Searchd#ssl_key
# ssl_key =
# https://manual.manticoresearch.com/Server_settings/Searchd#subtree_docs_cache
# subtree_docs_cache = 0
# https://manual.manticoresearch.com/Server_settings/Searchd#subtree_hits_cache
# subtree_hits_cache = 0
# https://manual.manticoresearch.com/Server_settings/Searchd#thread_stack
# thread_stack =
# https://manual.manticoresearch.com/Server_settings/Searchd#unlink_old
# unlink_old = 1
# https://manual.manticoresearch.com/Server_settings/Searchd#watchdog
# watchdog = 1
}
common {
# https://manual.manticoresearch.com/Server_settings/Common#lemmatizer_base
# lemmatizer_base = /usr/local/share
# https://manual.manticoresearch.com/Server_settings/Common#progressive_merge
# progressive_merge =
# https://manual.manticoresearch.com/Server_settings/Common#json_autoconv_keynames
# json_autoconv_keynames =
# https://manual.manticoresearch.com/Server_settings/Common#json_autoconv_numbers
# json_autoconv_numbers = 0
# https://manual.manticoresearch.com/Server_settings/Common#on_json_attr_error
# on_json_attr_error = ignore_attr
# https://manual.manticoresearch.com/Server_settings/Common#plugin_dir
# plugin_dir =
}
# indexer {
# lemmatizer_cache = 1024M
# max_iops = 0
# max_iosize = 0
# mem_limit = 1024M
# }
EOF

View File

@@ -1,2 +1,5 @@
unixsocket /var/run/socks/redis.sock
unixsocket /var/run/monolith-redis.sock
unixsocketperm 777
port 0
# port 6379
# requirepass changeme

View File

@@ -1,24 +0,0 @@
wheel
beautifulsoup4
redis
siphashc
aiohttp[speedups]
python-dotenv
#manticoresearch
numpy
aioredis[hiredis]
#aiokafka
vaderSentiment
polyglot
pyicu
pycld2
morfessor
six
nltk
#spacy
gensim
python-Levenshtein
orjson
uvloop
numba
elasticsearch[async]

View File

@@ -1,3 +0,0 @@
clusters:
- name: druid
guardDataCubes: true

View File

@@ -1,6 +1,6 @@
PORTAINER_GIT_DIR=.
PORTAINER_GIT_DIR=..
MODULES_ENABLED="dis"
DISCORD_TOKEN="xx"
DISCORD_TOKEN=
THRESHOLD_LISTENER_HOST=0.0.0.0
THRESHOLD_LISTENER_PORT=13867
THRESHOLD_LISTENER_SSL=1
@@ -13,16 +13,16 @@ THRESHOLD_RELAY_SSL=1
THRESHOLD_API_ENABLED=1
THRESHOLD_API_HOST=0.0.0.0
THRESHOLD_API_PORT=13869
PORTAINER_GIT_DIR=.
THRESHOLD_CONFIG_DIR=./legacy/conf/live/
THRESHOLD_CERT_DIR=./legacy/conf/cert/
THRESHOLD_CONFIG_DIR=../legacy/conf/live/
#THRESHOLD_TEMPLATE_DIR=../legacy/conf/templates/
THRESHOLD_CERT_DIR=../legacy/conf/cert/
# How many messages to ingest at once from Redis
MONOLITH_INGEST_CHUNK_SIZE=900
MONOLITH_INGEST_CHUNK_SIZE=70000
# Time to wait between polling Redis again
MONOLITH_INGEST_ITER_DELAY=0.5
MONOLITH_INGEST_ITER_DELAY=2
# Number of 4chan threads to request at once
MONOLITH_CH4_THREADS_CONCURRENT=1000
@@ -31,11 +31,20 @@ MONOLITH_CH4_THREADS_CONCURRENT=1000
MONOLITH_CH4_THREADS_DELAY=0.1
# Time to wait after finishing a crawl before starting again
MONOLITH_CH4_CRAWL_DELAY=30
MONOLITH_CH4_CRAWL_DELAY=60
# Semaphore value
MONOLITH_CH4_THREADS_SEMAPHORE=1000
# Threads to use for data processing
# Leave uncommented to use all available threads
# MONOLITH_PROCESS_THREADS=4
MONOLITH_PROCESS_THREADS=7
# Enable performance metrics after message processing
MONOLITH_PROCESS_PERFSTATS=0
# Elasticsearch
ELASTICSEARCH_USERNAME=elastic
ELASTICSEARCH_PASSWORD=
ELASTICSEARCH_HOST=https://es01:9200
ELASTICSEARCH_TLS=1

View File

@@ -1,17 +0,0 @@
repos:
- repo: https://github.com/psf/black
rev: 22.6.0
hooks:
- id: black
args:
- --line-length=120
- repo: https://github.com/PyCQA/isort
rev: 5.10.1
hooks:
- id: isort
args: ["--profile", "black"]
- repo: https://github.com/PyCQA/flake8
rev: 4.0.1
hooks:
- id: flake8
args: [--max-line-length=120]

View File

@@ -1,41 +0,0 @@
version: "2"
services:
app:
image: pathogen/threshold:latest
build: ./docker
volumes:
- ${PORTAINER_GIT_DIR}:/code
- ${THRESHOLD_CONFIG_DIR}:/code/conf/live
#- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates
- ${THRESHOLD_CERT_DIR}:/code/conf/cert
ports:
- "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}"
- "${THRESHOLD_RELAY_PORT}:${THRESHOLD_RELAY_PORT}"
- "${THRESHOLD_API_PORT}:${THRESHOLD_API_PORT}"
env_file:
- .env
# for development
extra_hosts:
- "host.docker.internal:host-gateway"
volumes_from:
- tmp
tmp:
image: busybox
command: chmod -R 777 /var/run/redis
volumes:
- /var/run/redis
redis:
image: redis
command: redis-server /etc/redis.conf
volumes:
- ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf
volumes_from:
- tmp
networks:
default:
external:
name: pathogen

View File

@@ -1,38 +0,0 @@
version: "2"
services:
app:
image: pathogen/threshold:latest
build: ./docker
volumes:
- ${PORTAINER_GIT_DIR}:/code
- ${THRESHOLD_CONFIG_DIR}:/code/conf/live
#- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates
- ${THRESHOLD_CERT_DIR}:/code/conf/cert
ports:
- "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}"
- "${THRESHOLD_RELAY_PORT}:${THRESHOLD_RELAY_PORT}"
- "${THRESHOLD_API_PORT}:${THRESHOLD_API_PORT}"
env_file:
- ../stack.env
volumes_from:
- tmp
tmp:
image: busybox
command: chmod -R 777 /var/run/redis
volumes:
- /var/run/redis
redis:
image: redis
command: redis-server /etc/redis.conf
volumes:
- ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf
volumes_from:
- tmp
networks:
default:
external:
name: pathogen

View File

@@ -4,6 +4,7 @@ from os import urandom
from os.path import exists
from string import digits
import redis
from redis import StrictRedis
# List of errors ZNC can give us
@@ -121,7 +122,7 @@ def initConf():
def initMain():
global r, g
global r, g, x
initConf()
r = StrictRedis(
unix_socket_path=config["RedisSocket"], db=config["RedisDBEphemeral"] # noqa
@@ -129,3 +130,5 @@ def initMain():
g = StrictRedis(
unix_socket_path=config["RedisSocket"], db=config["RedisDBPersistent"]
) # noqa
# SSDB for communication with Monolith
x = redis.from_url("redis://ssdb:1289", db=0)

View File

@@ -67,7 +67,7 @@ def parsemeta(numName, c):
def queue_message(c):
message = json.dumps(c)
main.g.lpush("queue", message)
main.x.lpush("queue", message)
def event(

View File

@@ -1,9 +0,0 @@
wheel
pre-commit
twisted
pyOpenSSL
redis
pyYaML
service_identity
siphashc
Klein

View File

@@ -98,7 +98,6 @@ class IRCRelayFactory(ReconnectingClientFactory):
self.relayCommands, self.user, self.stage2 = relayCommands, user, stage2
def buildProtocol(self, addr):
entry = IRCRelay(self.num, self.relayCommands, self.user, self.stage2)
self.client = entry

View File

@@ -1,8 +1,10 @@
import asyncio
from os import getenv
from time import sleep
import uvloop
import db
import util
from sources.ch4 import Chan4
from sources.dis import DiscordClient
@@ -21,14 +23,28 @@ if not token:
async def main(loop):
client = DiscordClient()
loop.create_task(client.start(token))
if "ingest" in modules_enabled:
ingest = Ingest()
loop.create_task(ingest.run())
chan = Chan4()
loop.create_task(chan.run())
if "dis" in modules_enabled:
client = DiscordClient()
loop.create_task(client.start(token))
ingest = Ingest()
loop.create_task(ingest.run())
if "ch4" in modules_enabled:
chan = Chan4()
loop.create_task(chan.run())
created = False
while not created:
try:
db.create_index(db.api_client)
created = True
except Exception as e:
print(f"Error creating index: {e}")
sleep(1) # Block the thread, just wait for the DB
db.update_schema()
loop = asyncio.get_event_loop()

0
oom Normal file
View File

0
perf/__init__.py Normal file
View File

134
perf/throttle.py Normal file
View File

@@ -0,0 +1,134 @@
import asyncio
import time
import psutil
import util
class DynamicThrottle(object):
def __init__(self, **kwargs):
self.target_cpu_usage = kwargs.get("target_cpu_usage", 50)
self.sleep_interval = 0.0
self.sleep_increment = kwargs.get("sleep_increment", 0.01)
self.sleep_decrement = kwargs.get("sleep_decrement", 0.01)
self.sleep_max = kwargs.get("sleep_max", 0.1)
self.sleep_min = kwargs.get("sleep_min", 0.01)
self.psutil_interval = kwargs.get("psutil_interval", 0.1)
self.log = kwargs.get("log", util.get_logger(self.__class__.__name__))
self.consecutive_increments = 0
self.consecutive_decrements = 0
self.consecutive_divisor = kwargs.get("consecutive_divisor", 1)
self.last_was_increment = kwargs.get("start_increment", True)
if kwargs.get("use_async"):
self.wait = self.dynamic_throttle_async
else:
self.wait = self.dynamic_throttle
async def dynamic_throttle_async(self):
"""
Dynamically sleeps before a request if CPU usage is above our target.
"""
current_cpu_usage = psutil.cpu_percent(interval=self.psutil_interval)
if current_cpu_usage > self.target_cpu_usage:
if self.last_was_increment:
self.consecutive_increments += 1
# self.log.debug(f"High CPU consecutive increments: {self.consecutive_increments}")
else:
self.consecutive_increments = 0 # ?
self.consecutive_decrements = 0 # ?
# self.log.debug(f"High CPU alert reset.")
self.sleep_interval += self.sleep_increment * (
max(1, self.consecutive_increments) / self.consecutive_divisor
)
self.last_was_increment = True
if self.sleep_interval > self.sleep_max:
self.sleep_interval = self.sleep_max
# self.log.debug(f"High CPU, but not increasing above {self.sleep_max:.3f}s")
# self.log.debug(
# f"High CPU: {current_cpu_usage}% > {self.target_cpu_usage}%, "
# f"=> sleep {self.sleep_interval:.3f}s"
# )
elif current_cpu_usage < self.target_cpu_usage:
if not self.last_was_increment:
self.consecutive_decrements += 1
# self.log.debug(f"Low CPU consecutive decrements: {self.consecutive_decrements}")
else:
self.consecutive_decrements = 0 # ?
self.consecutive_increments = 0 # ?
# self.log.debug(f"Low CPU alert reset.")
self.sleep_interval -= self.sleep_decrement * (
max(1, self.consecutive_decrements) / self.consecutive_divisor
)
self.last_was_increment = False
if self.sleep_interval < self.sleep_min:
self.sleep_interval = self.sleep_min
# self.log.debug(f"Low CPU, but not decreasing below {self.sleep_min:.3f}s")
# self.log.debug(
# f"Low CPU: {current_cpu_usage}% < {self.target_cpu_usage}%, "
# f"=> sleep {self.sleep_interval:.3f}s"
# )
if self.sleep_interval > 0:
await asyncio.sleep(self.sleep_interval)
return self.sleep_interval
return 0.0
def dynamic_throttle(self):
"""
Dynamically sleeps before a request if CPU usage is above our target.
"""
current_cpu_usage = psutil.cpu_percent(interval=self.psutil_interval)
if current_cpu_usage > self.target_cpu_usage:
if self.last_was_increment:
self.consecutive_increments += 1
# self.log.debug(f"High CPU consecutive increments: {self.consecutive_increments}")
else:
self.consecutive_increments = 0 # ?
self.consecutive_decrements = 0 # ?
# self.log.debug(f"High CPU alert reset.")
self.sleep_interval += self.sleep_increment * (
max(1, self.consecutive_increments) / self.consecutive_divisor
)
self.last_was_increment = True
if self.sleep_interval > self.sleep_max:
self.sleep_interval = self.sleep_max
# self.log.debug(f"High CPU, but not increasing above {self.sleep_max:.3f}s")
# self.log.debug(
# f"High CPU: {current_cpu_usage}% > {self.target_cpu_usage}%, "
# f"=> sleep {self.sleep_interval:.3f}s"
# )
elif current_cpu_usage < self.target_cpu_usage:
if not self.last_was_increment:
self.consecutive_decrements += 1
# self.log.debug(f"Low CPU consecutive decrements: {self.consecutive_decrements}")
else:
self.consecutive_decrements = 0 # ?
self.consecutive_increments = 0 # ?
# self.log.debug(f"Low CPU alert reset.")
self.sleep_interval -= self.sleep_decrement * (
max(1, self.consecutive_decrements) / self.consecutive_divisor
)
self.last_was_increment = False
if self.sleep_interval < self.sleep_min:
self.sleep_interval = self.sleep_min
# self.log.debug(f"Low CPU, but not decreasing below {self.sleep_min:.3f}s")
# self.log.debug(
# f"Low CPU: {current_cpu_usage}% < {self.target_cpu_usage}%, "
# f"=> sleep {self.sleep_interval:.3f}s"
# )
if self.sleep_interval > 0:
time.sleep(self.sleep_interval)
return self.sleep_interval
return 0.0

1
processing/ohlc.py Normal file
View File

@@ -0,0 +1 @@
# Resample 1Min into 5Min, 15Min, 30Min, 1H, 4H, 1D, 1W, 1M, 1Y

View File

@@ -47,6 +47,9 @@ from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import db
import util
# For throttling
from perf.throttle import DynamicThrottle
# 4chan schema
from schemas.ch4_s import ATTRMAP
@@ -57,6 +60,7 @@ KEYNAME = "queue"
MONOLITH_PROCESS_PERFSTATS = (
getenv("MONOLITH_PROCESS_PERFSTATS", "false").lower() in trues
)
TARGET_CPU_USAGE = float(os.getenv("MONOLITH_PROCESS_TARGET_CPU_USAGE", 50.0))
CUSTOM_FILTERS = [
lambda x: x.lower(),
@@ -88,6 +92,19 @@ CPU_THREADS = int(os.getenv("MONOLITH_PROCESS_THREADS", os.cpu_count()))
p = ProcessPoolExecutor(CPU_THREADS)
throttle = DynamicThrottle(
target_cpu_usage=TARGET_CPU_USAGE,
sleep_increment=0.02,
sleep_decrement=0.01,
sleep_max=0.5,
sleep_min=0,
psutil_interval=0.1,
consecutive_divisor=2,
log=log,
start_increment=True,
use_async=False,
)
def get_hash_key():
hash_key = db.r.get("hashing_key")
@@ -129,13 +146,15 @@ async def spawn_processing_threads(chunk, length):
# Join the results back from the split list
flat_list = [item for sublist in results for item in sublist]
log.debug(
total_messages = len(flat_list)
log.info(
(
f"[{chunk}/{index}] Results from processing of {length} messages in "
f"{cores} threads: {len(flat_list)}"
)
)
await db.store_batch(flat_list)
return total_messages
# log.debug(f"Finished processing {len_data} messages")
@@ -150,19 +169,38 @@ def process_data(chunk, index, chunk_size):
date_time = 0.0
nlp_time = 0.0
normalise_time = 0.0
hash_time = 0.0
normal2_time = 0.0
soup_time = 0.0
sleep_time = 0.0
total_time = 0.0
# Initialise sentiment analyser
analyzer = SentimentIntensityAnalyzer()
for msg_index in range(chunk_size):
# Print percentage of msg_index relative to chunk_size
if msg_index % 10 == 0:
percentage_done = (msg_index / chunk_size) * 100
log.debug(
f"[{chunk}/{index}] {percentage_done:.2f}% done ({msg_index}/{chunk_size})"
)
msg = db.r.rpop(KEYNAME)
if not msg:
return
msg = orjson.loads(msg)
if msg["src"] == "4ch":
board = msg["net"]
thread = msg["channel"]
redis_key = (
f"cache.{board}.{thread}.{msg['no']}.{msg['resto']}.{msg['now']}"
)
key_content = db.r.get(redis_key)
if key_content is not None:
continue
db.r.set(redis_key, "1")
total_start = time.process_time()
# normalise fields
start = time.process_time()
@@ -188,27 +226,6 @@ def process_data(chunk, index, chunk_size):
board = msg["net"]
thread = msg["channel"]
# Calculate hash for post
start = time.process_time()
post_normalised = orjson.dumps(msg, option=orjson.OPT_SORT_KEYS)
hash = siphash(hash_key, post_normalised)
hash = str(hash)
redis_key = (
f"cache.{board}.{thread}.{msg['no']}.{msg['resto']}.{msg['now']}"
)
key_content = db.r.get(redis_key)
if key_content is not None:
key_content = key_content.decode("ascii")
if key_content == hash:
# This deletes the message since the append at the end won't be hit
continue
# pass
else:
msg["type"] = "update"
db.r.set(redis_key, hash)
time_took = (time.process_time() - start) * 1000
hash_time += time_took
start = time.process_time()
for key2, value in list(msg.items()):
if key2 in ATTRMAP:
@@ -226,9 +243,10 @@ def process_data(chunk, index, chunk_size):
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S")
else:
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M")
# new_ts = old_ts.isoformat()
# iso_ts = old_ts.isoformat()
new_ts = int(old_ts.timestamp())
msg["ts"] = new_ts
# msg["iso"] = iso_ts
else:
raise Exception("No TS in msg")
time_took = (time.process_time() - start) * 1000
@@ -273,7 +291,7 @@ def process_data(chunk, index, chunk_size):
# Tokens
start = time.process_time()
tokens = preprocess_string(msg["msg"], CUSTOM_FILTERS)
msg["tokens"] = tokens
msg["tokens"] = str(tokens)
# n = nlp(msg["msg"])
# for tag in TAGS:
# tag_name = tag.lower()
@@ -286,20 +304,24 @@ def process_data(chunk, index, chunk_size):
to_store.append(msg)
total_time += (time.process_time() - total_start) * 1000
# Dynamic throttling to reduce CPU usage
if msg_index % 5 == 0:
sleep_time += throttle.wait()
if MONOLITH_PROCESS_PERFSTATS:
log.debug("=====================================")
log.debug(f"Chunk: {chunk}")
log.debug(f"Index: {index}")
log.debug(f"Sentiment: {sentiment_time}")
log.debug(f"Regex: {regex_time}")
log.debug(f"Polyglot: {polyglot_time}")
log.debug(f"Date: {date_time}")
log.debug(f"NLP: {nlp_time}")
log.debug(f"Normalise: {normalise_time}")
log.debug(f"Hash: {hash_time}")
log.debug(f"Normal2: {normal2_time}")
log.debug(f"Soup: {soup_time}")
log.debug(f"Total: {total_time}")
log.debug("=====================================")
log.info("=====================================")
log.info(f"Chunk: {chunk}")
log.info(f"Index: {index}")
log.info(f"Sentiment: {sentiment_time}")
log.info(f"Regex: {regex_time}")
log.info(f"Polyglot: {polyglot_time}")
log.info(f"Date: {date_time}")
log.info(f"NLP: {nlp_time}")
log.info(f"Normalise: {normalise_time}")
log.info(f"Normal2: {normal2_time}")
log.info(f"Soup: {soup_time}")
log.info(f"Total: {total_time}")
log.info(f"Throttling: {sleep_time}")
log.info("=====================================")
return to_store

View File

@@ -1,11 +1,10 @@
wheel
pre-commit
beautifulsoup4
redis
siphashc
aiohttp[speedups]
python-dotenv
#manticoresearch
manticoresearch
numpy
aioredis[hiredis]
#aiokafka
@@ -21,5 +20,10 @@ gensim
python-Levenshtein
orjson
uvloop
numba
elasticsearch[async]
msgpack
# flpc
psutil
pymexc
websockets
aiomysql

186
rts.py Normal file
View File

@@ -0,0 +1,186 @@
import asyncio
import logging
from os import getenv
import orjson
import websockets
import db
# Logger setup
logging.basicConfig(level=logging.INFO)
log = logging.getLogger("RTS")
# Environment variables
MONOLITH_RTS_MEXC_API_ACCESS_KEY = getenv("MONOLITH_RTS_MEXC_API_ACCESS_KEY", None)
MONOLITH_RTS_MEXC_API_SECRET_KEY = getenv("MONOLITH_RTS_MEXC_API_SECRET_KEY", None)
# WebSocket endpoint
MEXC_WS_URL = "wss://wbs.mexc.com/ws"
{
"d": {
"e": "spot@public.kline.v3.api",
"k": {
"t": 1737901140, # TS
"o": "684.4", # Open
"c": "684.5", # Close
"h": "684.5", # High
"l": "684.4", # Low
"v": "0.173", # Volume of the base
"a": "118.41", # Volume of the quote (Quantity)
"T": 1737901200, # ?
"i": "Min1", # ?
},
},
"c": "spot@public.kline.v3.api@BNBUSDT@Min1", # Channel
"t": 1737901159239,
"s": "BNBUSDT", # Symbol
}
# Scan DB for last endtime (T)
# Request Kline data from last endtime (T) to now
# Check Server Time
# Response
# {
# "serverTime" : 1645539742000
# }
# GET /api/v3/time
# Weight(IP): 1
# Parameter:
# NONE
# Kline/Candlestick Data
# Response
# [
# [
# 1640804880000,
# "47482.36",
# "47482.36",
# "47416.57",
# "47436.1",
# "3.550717",
# 1640804940000,
# "168387.3"
# ]
# ]
# GET /api/v3/klines
# Weight(IP): 1
# Kline/candlestick bars for a symbol. Klines are uniquely identified by their open time.
# Parameters:
# Name Type Mandatory Description
# symbol string YES
# interval ENUM YES ENUM: Kline Interval
# startTime long NO
# endTime long NO
# limit integer NO Default 500; max 1000.
# Scrub function:
# For each record, ensure there are no time gaps
# When the 1m window goes over, the next t is always the last T.
# Check for gaps, and request all klines between those gaps to ensure a full DB, even with restarts.
# Idle jitter function - compare our time with server time.
# Compare ts with our time and print jitter. Add jitter warning to log and OHLC.
# High jitter may prevent us from getting the correct data for trading.
async def mex_handle(data):
message = orjson.loads(data)
# print(orjson.dumps(message, option=orjson.OPT_INDENT_2).decode("utf-8"))
if "code" in message:
if message["code"] == 0:
log.info("Control message received")
return
symbol = message["s"]
open = message["d"]["k"]["o"]
close = message["d"]["k"]["c"]
high = message["d"]["k"]["h"]
low = message["d"]["k"]["l"]
volume_base = message["d"]["k"]["v"] # ERROR IN API DOCS
volume_quote = message["d"]["k"]["a"] # > a bigDecimal volume
interval = message["d"]["k"]["i"]
start_time = message["d"]["k"]["t"] # > t long stratTime
end_time = message["d"]["k"]["T"] # > T long endTime
event_time = message["t"] # t long eventTime
index = f"mex_ohlc_{symbol.lower()}"
reformatted = {
"s": symbol,
"o": float(open),
"c": float(close),
"h": float(high),
"l": float(low),
"v": float(volume_base),
"a": float(volume_quote),
"i": interval,
"t": int(start_time),
"t2": int(end_time),
"ts": int(event_time),
}
await db.rts_store_message(index, reformatted)
print(index)
print(orjson.dumps(reformatted, option=orjson.OPT_INDENT_2).decode("utf-8"))
print()
# Kline WebSocket handler
async def mex_main():
await db.init_mysql_pool()
async with websockets.connect(MEXC_WS_URL) as websocket:
log.info("WebSocket connected")
# Define symbols and intervals
symbols = ["BTCUSDT"] # Add more symbols as needed
interval = "Min1" # Kline interval
# Prepare subscription requests for Kline streams
# Request: spot@public.kline.v3.api@<symbol>@<interval>
subscriptions = [
f"spot@public.kline.v3.api@{symbol}@{interval}" for symbol in symbols
]
# Send subscription requests
subscribe_request = {
"method": "SUBSCRIPTION",
"params": subscriptions,
# "id": 1,
}
await websocket.send(orjson.dumps(subscribe_request).decode("utf-8"))
log.info(f"Subscribed to: {subscriptions}")
# Listen for messages
while True:
try:
message = await websocket.recv()
await mex_handle(message)
except websockets.exceptions.ConnectionClosed as e:
log.error(f"WebSocket connection closed: {e}")
break
# Entry point
if __name__ == "__main__":
try:
asyncio.run(mex_main())
except KeyboardInterrupt:
log.info("RTS process terminated.")

View File

@@ -129,8 +129,19 @@ schema_main = {
"version_sentiment": "int",
# 1, 2
"version_tokens": "int",
# en, ru
"lang_code": "string indexed attribute",
"lang_name": "text",
"match_ts": "timestamp",
"batch_id": "bigint",
"rule_id": "bigint",
"index": "string indexed attribute",
"meta": "text",
# "iso": "string indexed attribute",
}
schema_rule_storage = schema_main
schema_meta = {
"id": "bigint",
# 393598265, #main, Rust Programmer's Club

View File

@@ -10,6 +10,7 @@ from numpy import array_split
import db
import util
from perf.throttle import DynamicThrottle
# CONFIGURATION #
@@ -25,6 +26,12 @@ CRAWL_DELAY = int(getenv("MONOLITH_CH4_CRAWL_DELAY", 5))
# Semaphore value ?
THREADS_SEMAPHORE = int(getenv("MONOLITH_CH4_THREADS_SEMAPHORE", 1000))
# Target CPU usage percentage
TARGET_CPU_USAGE = float(getenv("MONOLITH_CH4_TARGET_CPU_USAGE", 50.0))
# Boards to crawl
BOARDS = getenv("MONOLITH_CH4_BOARDS", "").split(",")
# CONFIGURATION END #
@@ -37,6 +44,19 @@ class Chan4(object):
name = self.__class__.__name__
self.log = util.get_logger(name)
self.throttle = DynamicThrottle(
target_cpu_usage=TARGET_CPU_USAGE,
sleep_increment=0.01,
sleep_decrement=0.01,
sleep_max=0.1,
sleep_min=0,
psutil_interval=0.1,
log=self.log,
start_increment=False,
use_async=True,
)
self.wait = self.throttle.wait
self.api_endpoint = "https://a.4cdn.org"
# self.boards = ["out", "g", "a", "3", "pol"] #
self.boards = []
@@ -53,12 +73,14 @@ class Chan4(object):
self.log.debug(f"Created new hash key: {self.hash_key}")
db.r.set("hashing_key", self.hash_key)
else:
self.hash_key = self.hash_key.decode("ascii")
self.log.debug(f"Decoded hash key: {self.hash_key}")
async def run(self):
await self.get_board_list()
if "ALL" in BOARDS:
await self.get_board_list()
else:
self.boards = BOARDS
while True:
await self.get_thread_lists(self.boards)
await asyncio.sleep(CRAWL_DELAY)
@@ -71,6 +93,8 @@ class Chan4(object):
for board in response["boards"]:
self.boards.append(board["board"])
self.log.debug(f"Got boards: {self.boards}")
# await self.dynamic_throttle()
# TODO
async def get_thread_lists(self, boards):
# self.log.debug(f"Getting thread list for {boards}")
@@ -86,6 +110,8 @@ class Chan4(object):
for threads in page["threads"]:
no = threads["no"]
to_get.append((board, no))
# await self.dynamic_throttle()
# TODO
if not to_get:
return
@@ -95,6 +121,8 @@ class Chan4(object):
for index, thr in enumerate(split_threads):
self.log.debug(f"Series {index} - getting {len(thr)} threads")
await self.get_threads_content(thr)
# await self.dynamic_throttle()
# TODO
await asyncio.sleep(THREADS_DELAY)
def take_items(self, dict_list, n):
@@ -125,6 +153,8 @@ class Chan4(object):
continue
board, thread = mapped
all_posts[mapped] = response["posts"]
# await self.dynamic_throttle()
# TODO
if not all_posts:
return
@@ -142,6 +172,8 @@ class Chan4(object):
post["channel"] = thread
to_store.append(post)
# await self.dynamic_throttle()
# TODO
if to_store:
await db.queue_message_bulk(to_store)
@@ -156,6 +188,7 @@ class Chan4(object):
async def bound_fetch(self, sem, url, session, mapped):
# Getter function with semaphore.
async with sem:
await self.wait()
try:
return await self.fetch(url, session, mapped)
except: # noqa

View File

@@ -11,8 +11,16 @@ KEYNAME = "queue"
CHUNK_SIZE = int(getenv("MONOLITH_INGEST_CHUNK_SIZE", "900"))
ITER_DELAY = float(getenv("MONOLITH_INGEST_ITER_DELAY", "0.5"))
INGEST_INCREASE_BELOW = int(getenv("MONOLITH_INGEST_INCREASE_BELOW", "2500"))
INGEST_DECREASE_ABOVE = int(getenv("MONOLITH_INGEST_DECREASE_ABOVE", "10000"))
INGEST_INCREASE_BY = int(getenv("MONOLITH_INGEST_INCREASE_BY", "100"))
INGEST_DECREASE_BY = int(getenv("MONOLITH_INGEST_DECREASE_BY", "100"))
log = util.get_logger("ingest")
INGEST_MAX = int(getenv("MONOLITH_INGEST_MAX", "1000000"))
INGEST_MIN = int(getenv("MONOLITH_INGEST_MIN", "100"))
class Ingest(object):
def __init__(self):
@@ -34,9 +42,40 @@ class Ingest(object):
await asyncio.sleep(ITER_DELAY)
async def get_chunk(self):
global CHUNK_SIZE
length = await db.ar.llen(KEYNAME)
if length > CHUNK_SIZE:
length = CHUNK_SIZE
if not length:
return
await process.spawn_processing_threads(self.current_chunk, length)
ingested = await process.spawn_processing_threads(self.current_chunk, length)
if ingested < INGEST_INCREASE_BELOW:
if CHUNK_SIZE + INGEST_INCREASE_BY < INGEST_MAX:
self.log.debug(
(
f"Increasing chunk size to "
f"{CHUNK_SIZE + INGEST_INCREASE_BY} "
f"due to low ingestion ({ingested})"
)
)
CHUNK_SIZE += INGEST_INCREASE_BY
else:
log.debug(
f"Chunk size ({CHUNK_SIZE}) at maximum, not increasing above: {INGEST_MAX}"
)
elif ingested > INGEST_DECREASE_ABOVE:
if CHUNK_SIZE - INGEST_DECREASE_BY > INGEST_MIN:
self.log.debug(
(
f"Decreasing chunk size to "
f"{CHUNK_SIZE - INGEST_DECREASE_BY}"
f"due to high ingestion ({ingested})"
)
)
CHUNK_SIZE -= INGEST_DECREASE_BY
else:
log.debug(
f"Chunk size ({CHUNK_SIZE}) at minimum, not decreasing below: {INGEST_MIN}"
)

View File

@@ -43,7 +43,6 @@ class ColoredFormatter(logging.Formatter):
def get_logger(name):
# Define the logging format
FORMAT = "%(asctime)s %(levelname)18s $BOLD%(name)13s$RESET - %(message)s"
COLOR_FORMAT = formatter_message(FORMAT, True)