diff --git a/docker/Dockerfile b/Dockerfile similarity index 93% rename from docker/Dockerfile rename to Dockerfile index 56e07eb..d9f28bc 100644 --- a/docker/Dockerfile +++ b/Dockerfile @@ -13,7 +13,7 @@ ENV PYTHONDONTWRITEBYTECODE=1 ENV PYTHONUNBUFFERED=1 WORKDIR /code COPY requirements.txt /code/ -COPY discord-patched.tgz /code/ +COPY docker/discord-patched.tgz /code/ RUN python -m venv /venv RUN . /venv/bin/activate && pip install -r requirements.txt diff --git a/Makefile b/Makefile index 8e4e6de..4d87a98 100644 --- a/Makefile +++ b/Makefile @@ -1,20 +1,20 @@ run: - docker-compose -f docker/docker-compose.prod.yml --env-file=stack.env up -d + docker-compose -f docker-compose.prod.yml --env-file=stack.env up -d build: - docker-compose -f docker/docker-compose.prod.yml --env-file=stack.env build + docker-compose -f docker-compose.prod.yml --env-file=stack.env build stop: - docker-compose -f docker/docker-compose.prod.yml --env-file=stack.env down + docker-compose -f docker-compose.prod.yml --env-file=stack.env down log: - docker-compose -f docker/docker-compose.prod.yml --env-file=stack.env logs -f + docker-compose -f docker-compose.prod.yml --env-file=stack.env logs -f run-infra: - docker-compose -f docker/docker-compose.infra.yml --env-file=stack.env up -d + docker-compose -f docker-compose.infra.yml --env-file=stack.env up -d stop-infra: - docker-compose -f docker/docker-compose.infra.yml --env-file=stack.env down + docker-compose -f docker-compose.infra.yml --env-file=stack.env down log-infra: - docker-compose -f docker/docker-compose.infra.yml --env-file=stack.env logs -f \ No newline at end of file + docker-compose -f docker-compose.infra.yml --env-file=stack.env logs -f \ No newline at end of file diff --git a/db.py b/db.py index 99d60a5..606f4da 100644 --- a/db.py +++ b/db.py @@ -1,30 +1,43 @@ -import asyncio -from os import getenv +from math import ceil import aioredis -import msgpack +import manticoresearch import orjson -import redis - -# Elasticsearch -from elasticsearch import AsyncElasticsearch +from manticoresearch.rest import ApiException +from numpy import array_split +from redis import StrictRedis +import msgpack +import asyncio import util +from schemas import mc_s +from os import getenv +from time import sleep -trues = ("true", "1", "t", True) - -# INDEX = "msg" +configuration = manticoresearch.Configuration(host="http://monolith_db:9308") +api_client = manticoresearch.ApiClient(configuration) +api_instance = manticoresearch.IndexApi(api_client) log = util.get_logger("db") # Redis (legacy) -r = redis.from_url("redis://ssdb:1289", db=0) - +# r = StrictRedis(unix_socket_path="/var/run/redis/redis.sock", db=0) +r = StrictRedis( + host="ssdb_monolith", # Replace with your Redis server's IP address + port=1289, # Replace with your Redis server's port + db=0 # Database number +) # AIORedis -ar = aioredis.from_url("redis://ssdb:1289", db=0) +# ar = aioredis.from_url("unix:///var/run/redis/redis.sock", db=0) +ar = aioredis.from_url( + "redis://ssdb_monolith:1289", + db=0 +) +pr = aioredis.from_url("redis://redis_neptune:6379", db=10, password=getenv("REDIS_PASSWORD")) + +KEYNAME = "queue" +MESSAGE_KEY = "messages" -# Neptune redis for PubSub -pr = aioredis.from_url("redis://redis_neptune:6379", db=10) TYPES_MAIN = [ "msg", @@ -39,120 +52,146 @@ TYPES_MAIN = [ "topic", "update", ] -MAIN_SRC_MAP = { - "dis": "main", - "irc": "restricted", - "4ch": "main", -} - TYPES_META = ["who"] TYPES_INT = ["conn", "highlight", "znc", "query", "self"] -KEYNAME = "queue" -MESSAGE_KEY = "messages" - -ELASTICSEARCH_USERNAME = getenv("ELASTICSEARCH_USERNAME", "elastic") -ELASTICSEARCH_PASSWORD = getenv("ELASTICSEARCH_PASSWORD", "changeme") -ELASTICSEARCH_HOST = getenv("ELASTICSEARCH_HOST", "localhost") -ELASTICSEARCH_TLS = getenv("ELASTICSEARCH_TLS", "false") in trues - -client = None - -# These are sometimes numeric, sometimes strings. -# If they are seen to be numeric first, ES will erroneously -# index them as "long" and then subsequently fail to index messages -# with strings in the field. -keyword_fields = ["nick_id", "user_id", "net_id"] - -mapping_int = { - "mappings": { - "properties": { - "ts": {"type": "date", "format": "epoch_second"}, - "file_tim": {"type": "date", "format": "epoch_millis"}, - } - } -} -mapping = dict(mapping_int) -for field in keyword_fields: - mapping["mappings"]["properties"][field] = {"type": "text"} -del mapping_int["mappings"]["properties"]["file_tim"] +# def store_message(msg): +# """ +# Store a message into Manticore +# :param msg: dict +# """ +# # Duplicated to avoid extra function call +# if msg["type"] in TYPES_MAIN: +# index = "main" +# schema = mc_s.schema_main +# elif msg["type"] in TYPES_META: +# index = "meta" +# schema = mc_s.schema_meta +# elif msg["type"] in TYPES_INT: +# index = "internal" +# schema = mc_s.schema_int +# # normalise fields +# for key, value in list(msg.items()): +# if value is None: +# del msg[key] +# if key in schema: +# if isinstance(value, int): +# if schema[key].startswith("string") or schema[key].startswith("text"): +# msg[key] = str(value) +# body = [{"insert": {"index": index, "doc": msg}}] +# body_post = "" +# for item in body: +# body_post += orjson.dumps(item) +# body_post += "\n" -async def initialise_elasticsearch(): - """ - Initialise the Elasticsearch client. - """ - auth = (ELASTICSEARCH_USERNAME, ELASTICSEARCH_PASSWORD) - client = AsyncElasticsearch(ELASTICSEARCH_HOST, http_auth=auth, verify_certs=False) - for index in ("main", "meta", "restricted", "internal"): - if index == "internal": - map_dict = mapping_int - else: - map_dict = mapping - if await client.indices.exists(index=index): - # update index with mapping - await client.indices.put_mapping( - index=index, properties=map_dict["mappings"]["properties"] - ) - else: - await client.indices.create(index=index, mappings=map_dict["mappings"]) - return client +# # print(body_post) +# try: +# # Bulk index operations +# api_response = api_instance.bulk(body_post) # , async_req=True +# # print(api_response) +# except ApiException as e: +# print("Exception when calling IndexApi->bulk: %s\n" % e) +# print("ATTEMPT", body_post) async def store_batch(data): - global client - if not client: - client = await initialise_elasticsearch() - indexmap = {} - for msg in data: - if msg["type"] in TYPES_MAIN: - # index = "main" - index = MAIN_SRC_MAP[msg["src"]] - # schema = mc_s.schema_main - elif msg["type"] in TYPES_META: - index = "meta" - # schema = mc_s.schema_meta - elif msg["type"] in TYPES_INT: - index = "internal" - # schema = mc_s.schema_int + """ + Store a message into Manticore + :param msg: dict + """ + if not data: + return + # 10000: maximum inserts we can submit to + # Manticore as of Sept 2022 + split_posts = array_split(data, ceil(len(data) / 10000)) + for messages in split_posts: + total = [] + indexmap = {} + for msg in messages: + if msg["type"] in TYPES_MAIN: + index = "main" + schema = mc_s.schema_main + elif msg["type"] in TYPES_META: + index = "meta" + schema = mc_s.schema_meta + elif msg["type"] in TYPES_INT: + index = "internal" + schema = mc_s.schema_int + # normalise fields + for key, value in list(msg.items()): + if value is None: + del msg[key] + if key in schema: + if isinstance(value, int): + if schema[key].startswith("string") or schema[key].startswith( + "text" + ): + msg[key] = str(value) - INDEX = index + body = {"insert": {"index": index, "doc": msg}} + total.append(body) + if "ts" not in msg: + raise Exception("No TS in msg") + if index not in indexmap: + indexmap[index] = [msg] + else: + indexmap[index].append(msg) + # END MSG IN MESSAGES + + # Pack the indexmap with msgpack and publish it to Neptune + packed_index = msgpack.packb(indexmap, use_bin_type=True) + completed_publish = False + for i in range(10): + if completed_publish: + break + try: + await pr.publish(MESSAGE_KEY, packed_index) + completed_publish = True + except aioredis.exceptions.ConnectionError as e: + raise e + await asyncio.sleep(0.1) + if not completed_publish: + log.error("Failed to publish to Neptune") + + body_post = "" + for item in total: + print("ITEM", item) + body_post += orjson.dumps(item).decode("utf-8") + body_post += "\n" + + # print(body_post) - # if key in schema: - # if isinstance(value, int): - # if schema[key].startswith("string") or schema[key].startswith( - # "text" - # ): - # msg[key] = str(value) - # body = orjson.dumps(msg) - if "ts" not in msg: - raise Exception("No TS in msg") - if INDEX not in indexmap: - indexmap[INDEX] = [msg] - else: - indexmap[INDEX].append(msg) - # Pack the indexmap with msgpack and publish it to Neptune - packed_index = msgpack.packb(indexmap, use_bin_type=True) - completed_publish = False - for i in range(10): - if completed_publish: - break try: - await pr.publish(MESSAGE_KEY, packed_index) - completed_publish = True - except aioredis.exceptions.ConnectionError: - await asyncio.sleep(0.1) - if not completed_publish: - log.error("Failed to publish to Neptune") + # Bulk index operations + api_response = api_instance.bulk(body_post) # , async_req=True + except ApiException as e: + print("Exception when calling IndexApi->bulk: %s\n" % e) + print(f"Completed ingest to MC of length {len(total)}") + # END MESSAGES IN SPLIT - for index, index_messages in indexmap.items(): - for message in index_messages: - result = await client.index(index=index, body=message) - if not result["result"] == "created": - log.error(f"Indexing failed: {result}") - log.debug(f"Indexed {len(data)} messages in ES") + +def update_schema(): + pass + + +def create_index(api_client): + util_instance = manticoresearch.UtilsApi(api_client) + schemas = { + "main": mc_s.schema_main, + "meta": mc_s.schema_meta, + "internal": mc_s.schema_int, + } + for name, schema in schemas.items(): + schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()]) + + create_query = ( + f"create table if not exists {name}({schema_types}) engine='columnar'" + ) + print("Schema types", create_query) + util_instance.sql(create_query) async def queue_message(msg): @@ -172,3 +211,14 @@ async def queue_message_bulk(data): # TODO: msgpack message = orjson.dumps(msg) await ar.lpush(KEYNAME, message) + + +created = False +while not created: + try: + create_index(api_client) + created = True + except Exception as e: + print(f"Error creating index: {e}") + sleep(1) # Block the thread, just wait for the DB +update_schema() diff --git a/db_old_ref.py b/db_old_ref.py new file mode 100644 index 0000000..2be57a2 --- /dev/null +++ b/db_old_ref.py @@ -0,0 +1,174 @@ +import asyncio +from os import getenv + +import aioredis +import msgpack +import orjson +import redis + +# Elasticsearch +from elasticsearch import AsyncElasticsearch + +import util + +trues = ("true", "1", "t", True) + +# INDEX = "msg" + +log = util.get_logger("db") + +# Redis (legacy) +# r = redis.from_url("redis://ssdb:1289", db=0) + +# AIORedis +ar = aioredis.from_url("redis://ssdb:1289", db=0) + +# Neptune redis for PubSub +pr = aioredis.from_url("redis://redis_neptune:6379", db=10) + +TYPES_MAIN = [ + "msg", + "notice", + "action", + "part", + "join", + "kick", + "quit", + "nick", + "mode", + "topic", + "update", +] +MAIN_SRC_MAP = { + "dis": "main", + "irc": "restricted", + "4ch": "main", +} + +TYPES_META = ["who"] +TYPES_INT = ["conn", "highlight", "znc", "query", "self"] +KEYNAME = "queue" +MESSAGE_KEY = "messages" + +ELASTICSEARCH_USERNAME = getenv("ELASTICSEARCH_USERNAME", "elastic") +ELASTICSEARCH_PASSWORD = getenv("ELASTICSEARCH_PASSWORD", "changeme") +ELASTICSEARCH_HOST = getenv("ELASTICSEARCH_HOST", "localhost") +ELASTICSEARCH_TLS = getenv("ELASTICSEARCH_TLS", "false") in trues + +client = None + +# These are sometimes numeric, sometimes strings. +# If they are seen to be numeric first, ES will erroneously +# index them as "long" and then subsequently fail to index messages +# with strings in the field. +keyword_fields = ["nick_id", "user_id", "net_id"] + +mapping_int = { + "mappings": { + "properties": { + "ts": {"type": "date", "format": "epoch_second"}, + "file_tim": {"type": "date", "format": "epoch_millis"}, + } + } +} +mapping = dict(mapping_int) +for field in keyword_fields: + mapping["mappings"]["properties"][field] = {"type": "text"} + + +del mapping_int["mappings"]["properties"]["file_tim"] + + +async def initialise_elasticsearch(): + """ + Initialise the Elasticsearch client. + """ + auth = (ELASTICSEARCH_USERNAME, ELASTICSEARCH_PASSWORD) + client = AsyncElasticsearch(ELASTICSEARCH_HOST, http_auth=auth, verify_certs=False) + for index in ("main", "meta", "restricted", "internal"): + if index == "internal": + map_dict = mapping_int + else: + map_dict = mapping + if await client.indices.exists(index=index): + # update index with mapping + await client.indices.put_mapping( + index=index, properties=map_dict["mappings"]["properties"] + ) + else: + await client.indices.create(index=index, mappings=map_dict["mappings"]) + return client + + +async def store_batch(data): + global client + if not client: + client = await initialise_elasticsearch() + indexmap = {} + for msg in data: + if msg["type"] in TYPES_MAIN: + # index = "main" + index = MAIN_SRC_MAP[msg["src"]] + # schema = mc_s.schema_main + elif msg["type"] in TYPES_META: + index = "meta" + # schema = mc_s.schema_meta + elif msg["type"] in TYPES_INT: + index = "internal" + # schema = mc_s.schema_int + + INDEX = index + + # if key in schema: + # if isinstance(value, int): + # if schema[key].startswith("string") or schema[key].startswith( + # "text" + # ): + # msg[key] = str(value) + # body = orjson.dumps(msg) + if "ts" not in msg: + raise Exception("No TS in msg") + if INDEX not in indexmap: + indexmap[INDEX] = [msg] + else: + indexmap[INDEX].append(msg) + + # Pack the indexmap with msgpack and publish it to Neptune + packed_index = msgpack.packb(indexmap, use_bin_type=True) + completed_publish = False + for i in range(10): + if completed_publish: + break + try: + await pr.publish(MESSAGE_KEY, packed_index) + completed_publish = True + except aioredis.exceptions.ConnectionError: + await asyncio.sleep(0.1) + if not completed_publish: + log.error("Failed to publish to Neptune") + + for index, index_messages in indexmap.items(): + for message in index_messages: + result = await client.index(index=index, body=message) + if not result["result"] == "created": + log.error(f"Indexing failed: {result}") + log.debug(f"Indexed {len(data)} messages in ES") + + +async def queue_message(msg): + """ + Queue a message on the Redis buffer. + """ + # TODO: msgpack + message = orjson.dumps(msg) + await ar.lpush(KEYNAME, message) + + +async def queue_message_bulk(data): + """ + Queue multiple messages on the Redis buffer. + """ + for msg in data: + # TODO: msgpack + message = orjson.dumps(msg) + await ar.lpush(KEYNAME, message) diff --git a/docker-compose.prod.yml b/docker-compose.prod.yml new file mode 100644 index 0000000..a3f218b --- /dev/null +++ b/docker-compose.prod.yml @@ -0,0 +1,195 @@ +version: "2.2" + +services: + app: + image: pathogen/monolith:latest + container_name: monolith + build: . + volumes: + - ${PORTAINER_GIT_DIR}:/code + # env_file: + # - stack.env + networks: + - default + - xf + - db + environment: + PORTAINER_GIT_DIR: "${PORTAINER_GIT_DIR}" + MODULES_ENABLED: "${MODULES_ENABLED}" + DISCORD_TOKEN: "${DISCORD_TOKEN}" + THRESHOLD_LISTENER_HOST: "${THRESHOLD_LISTENER_HOST}" + THRESHOLD_LISTENER_PORT: "${THRESHOLD_LISTENER_PORT}" + THRESHOLD_LISTENER_SSL: "${THRESHOLD_LISTENER_SSL}" + THRESHOLD_RELAY_ENABLED: "${THRESHOLD_RELAY_ENABLED}" + THRESHOLD_RELAY_HOST: "${THRESHOLD_RELAY_HOST}" + THRESHOLD_RELAY_PORT: "${THRESHOLD_RELAY_PORT}" + THRESHOLD_RELAY_SSL: "${THRESHOLD_RELAY_SSL}" + THRESHOLD_API_ENABLED: "${THRESHOLD_API_ENABLED}" + THRESHOLD_API_HOST: "${THRESHOLD_API_HOST}" + THRESHOLD_API_PORT: "${THRESHOLD_API_PORT}" + THRESHOLD_CONFIG_DIR: "${THRESHOLD_CONFIG_DIR}" + #THRESHOLD_TEMPLATE_DIR: "${#THRESHOLD_TEMPLATE_DIR}" + THRESHOLD_CERT_DIR: "${THRESHOLD_CERT_DIR}" + # How many messages to ingest at once from Redis + MONOLITH_INGEST_CHUNK_SIZE: "${MONOLITH_INGEST_CHUNK_SIZE}" + # Time to wait between polling Redis again + MONOLITH_INGEST_ITER_DELAY: "${MONOLITH_INGEST_ITER_DELAY}" + # Number of 4chan threads to request at once + MONOLITH_CH4_THREADS_CONCURRENT: "${MONOLITH_CH4_THREADS_CONCURRENT}" + # Time to wait between every MONOLITH_CH4_THREADS_CONCURRENT threads + MONOLITH_CH4_THREADS_DELAY: "${MONOLITH_CH4_THREADS_DELAY}" + # Time to wait after finishing a crawl before starting again + MONOLITH_CH4_CRAWL_DELAY: "${MONOLITH_CH4_CRAWL_DELAY}" + # Semaphore value + MONOLITH_CH4_THREADS_SEMAPHORE: "${MONOLITH_CH4_THREADS_SEMAPHORE}" + # Threads to use for data processing + # Leave uncommented to use all available threads + MONOLITH_PROCESS_THREADS: "${MONOLITH_PROCESS_THREADS}" + # Enable performance metrics after message processing + MONOLITH_PROCESS_PERFSTATS: "${MONOLITH_PROCESS_PERFSTATS}" + MONOLITH_CH4_BOARDS: "${MONOLITH_CH4_BOARDS}" + REDIS_PASSWORD: "${REDIS_PASSWORD}" + + db: + #image: pathogen/manticore:kibana + image: manticoresearch/manticore:dev + container_name: monolith_db + #build: + # context: ./docker/manticore + # args: + # DEV: 1 + restart: always + ports: + - 9308 + - 9312 + - 9306 + ulimits: + nproc: 65535 + nofile: + soft: 65535 + hard: 65535 + memlock: + soft: -1 + hard: -1 + environment: + # - MCL=1 + - EXTRA=1 + networks: + - default + - xf + - db + volumes: + - ./docker/data:/var/lib/manticore + # - ./docker/manticore.conf:/etc/manticoresearch/manticore.conf + + # threshold: + # image: pathogen/threshold:latest + # container_name: threshold + # build: legacy/docker + # volumes: + # - ${PORTAINER_GIT_DIR}:/code + # - ${THRESHOLD_CONFIG_DIR}:/code/legacy/conf/live + # #- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates + # - ${THRESHOLD_CERT_DIR}:/code/legacy/conf/cert + # volumes_from: + # - tmp + # ports: + # - "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}" + # - "${THRESHOLD_RELAY_PORT}:${THRESHOLD_RELAY_PORT}" + # - "${THRESHOLD_API_PORT}:${THRESHOLD_API_PORT}" + # environment: + # PORTAINER_GIT_DIR: "${PORTAINER_GIT_DIR}" + # MODULES_ENABLED: "${MODULES_ENABLED}" + # DISCORD_TOKEN: "${DISCORD_TOKEN}" + # THRESHOLD_LISTENER_HOST: "${THRESHOLD_LISTENER_HOST}" + # THRESHOLD_LISTENER_PORT: "${THRESHOLD_LISTENER_PORT}" + # THRESHOLD_LISTENER_SSL: "${THRESHOLD_LISTENER_SSL}" + # THRESHOLD_RELAY_ENABLED: "${THRESHOLD_RELAY_ENABLED}" + # THRESHOLD_RELAY_HOST: "${THRESHOLD_RELAY_HOST}" + # THRESHOLD_RELAY_PORT: "${THRESHOLD_RELAY_PORT}" + # THRESHOLD_RELAY_SSL: "${THRESHOLD_RELAY_SSL}" + # THRESHOLD_API_ENABLED: "${THRESHOLD_API_ENABLED}" + # THRESHOLD_API_HOST: "${THRESHOLD_API_HOST}" + # THRESHOLD_API_PORT: "${THRESHOLD_API_PORT}" + # THRESHOLD_CONFIG_DIR: "${THRESHOLD_CONFIG_DIR}" + # #THRESHOLD_TEMPLATE_DIR: "${#THRESHOLD_TEMPLATE_DIR}" + # THRESHOLD_CERT_DIR: "${THRESHOLD_CERT_DIR}" + # # How many messages to ingest at once from Redis + # MONOLITH_INGEST_CHUNK_SIZE: "${MONOLITH_INGEST_CHUNK_SIZE}" + # # Time to wait between polling Redis again + # MONOLITH_INGEST_ITER_DELAY: "${MONOLITH_INGEST_ITER_DELAY}" + # # Number of 4chan threads to request at once + # MONOLITH_CH4_THREADS_CONCURRENT: "${MONOLITH_CH4_THREADS_CONCURRENT}" + # # Time to wait between every MONOLITH_CH4_THREADS_CONCURRENT threads + # MONOLITH_CH4_THREADS_DELAY: "${MONOLITH_CH4_THREADS_DELAY}" + # # Time to wait after finishing a crawl before starting again + # MONOLITH_CH4_CRAWL_DELAY: "${MONOLITH_CH4_CRAWL_DELAY}" + # # Semaphore value + # MONOLITH_CH4_THREADS_SEMAPHORE: "${MONOLITH_CH4_THREADS_SEMAPHORE}" + # # Threads to use for data processing + # # Leave uncommented to use all available threads + # MONOLITH_PROCESS_THREADS: "${MONOLITH_PROCESS_THREADS}" + # # Enable performance metrics after message processing + # MONOLITH_PROCESS_PERFSTATS: "${MONOLITH_PROCESS_PERFSTATS}" + # MONOLITH_CH4_BOARDS: "${MONOLITH_CH4_BOARDS}" + # REDIS_PASSWORD: "${REDIS_PASSWORD}" + # # for development + # extra_hosts: + # - "host.docker.internal:host-gateway" + # networks: + # - default + # - xf + # - db + + ssdb: + image: tsl0922/ssdb + container_name: ssdb_monolith + ports: + - "1289:1289" + environment: + - SSDB_PORT=1289 + networks: + - default + - db + + # tmp: + # image: busybox + # container_name: tmp_monolith + # command: chmod -R 777 /var/run/socks + # volumes: + # - /var/run/socks + + redis: + image: redis + container_name: redis_monolith + command: redis-server /etc/redis.conf + ulimits: + nproc: 65535 + nofile: + soft: 65535 + hard: 65535 + volumes: + - ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf + - monolith_redis_data:/data + # volumes_from: + # - tmp + healthcheck: + test: "redis-cli ping" + interval: 2s + timeout: 2s + retries: 15 + networks: + - default + - xf + - db + +networks: + default: + driver: bridge + xf: + external: true + db: + external: true + +volumes: + monolith_redis_data: diff --git a/docker/docker-compose.prod.yml b/docker/docker-compose.prod.yml deleted file mode 100644 index d3b262a..0000000 --- a/docker/docker-compose.prod.yml +++ /dev/null @@ -1,87 +0,0 @@ -version: "2.2" - -services: - app: - image: pathogen/monolith:latest - container_name: monolith - build: ${PORTAINER_GIT_DIR}/docker - volumes: - - ${PORTAINER_GIT_DIR}:/code - env_file: - - ../stack.env - networks: - - default - - pathogen - - elastic - - threshold: - image: pathogen/threshold:latest - container_name: threshold - build: ../legacy/docker - volumes: - - ${PORTAINER_GIT_DIR}:/code - - ${THRESHOLD_CONFIG_DIR}:/code/legacy/conf/live - #- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates - - ${THRESHOLD_CERT_DIR}:/code/legacy/conf/cert - volumes_from: - - tmp - ports: - - "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}" - - "${THRESHOLD_RELAY_PORT}:${THRESHOLD_RELAY_PORT}" - - "${THRESHOLD_API_PORT}:${THRESHOLD_API_PORT}" - env_file: - - ../stack.env - # for development - extra_hosts: - - "host.docker.internal:host-gateway" - networks: - - default - - pathogen - - ssdb: - image: tsl0922/ssdb - container_name: ssdb_monolith - ports: - - "1289:1289" - environment: - - SSDB_PORT=1289 - networks: - - default - - tmp: - image: busybox - container_name: tmp_monolith - command: chmod -R 777 /var/run/socks - volumes: - - /var/run/socks - - redis: - image: redis - container_name: redis_monolith - command: redis-server /etc/redis.conf - ulimits: - nproc: 65535 - nofile: - soft: 65535 - hard: 65535 - volumes: - - ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf - - redis_data:/data - volumes_from: - - tmp - healthcheck: - test: "redis-cli -s /var/run/socks/redis.sock ping" - interval: 2s - timeout: 2s - retries: 15 - -networks: - default: - driver: bridge - pathogen: - external: true - elastic: - external: true - -volumes: - redis_data: diff --git a/docker/redis.conf b/docker/redis.conf index 424a612..b4acb37 100644 --- a/docker/redis.conf +++ b/docker/redis.conf @@ -1,2 +1,4 @@ -unixsocket /var/run/socks/redis.sock -unixsocketperm 777 \ No newline at end of file +# unixsocket /var/run/socks/redis.sock +# unixsocketperm 777 +port 6379 +requirepass changeme \ No newline at end of file diff --git a/docker/requirements.txt b/docker/requirements.txt deleted file mode 100644 index 6c8a352..0000000 --- a/docker/requirements.txt +++ /dev/null @@ -1,24 +0,0 @@ -wheel -beautifulsoup4 -redis -siphashc -aiohttp[speedups] -python-dotenv -#manticoresearch -numpy -aioredis[hiredis] -#aiokafka -vaderSentiment -polyglot -pyicu -pycld2 -morfessor -six -nltk -#spacy -gensim -python-Levenshtein -orjson -uvloop -elasticsearch[async] -msgpack diff --git a/requirements.txt b/requirements.txt index 22ecb0f..d622a8d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,11 +1,10 @@ wheel -pre-commit beautifulsoup4 redis siphashc aiohttp[speedups] python-dotenv -#manticoresearch +manticoresearch numpy aioredis[hiredis] #aiokafka diff --git a/schemas/mc_s.py b/schemas/mc_s.py index 3784140..14d7950 100644 --- a/schemas/mc_s.py +++ b/schemas/mc_s.py @@ -129,6 +129,15 @@ schema_main = { "version_sentiment": "int", # 1, 2 "version_tokens": "int", + # en, ru + "lang_code": "string indexed attribute", + "lang_name": "text", + "match_ts": "timestamp", + "batch_id": "bigint", + "rule_id": "bigint", + "index": "string indexed attribute", + "meta": "text", + } schema_meta = { diff --git a/sources/ch4.py b/sources/ch4.py index b6f5d5d..7659387 100644 --- a/sources/ch4.py +++ b/sources/ch4.py @@ -60,7 +60,7 @@ class Chan4(object): self.log.debug(f"Decoded hash key: {self.hash_key}") async def run(self): - if not BOARDS: + if "ALL" in BOARDS: await self.get_board_list() else: self.boards = BOARDS