diff --git a/docker-compose.prod.yml b/docker-compose.prod.yml index a3f218b..b2f5576 100644 --- a/docker-compose.prod.yml +++ b/docker-compose.prod.yml @@ -2,17 +2,14 @@ version: "2.2" services: app: - image: pathogen/monolith:latest + image: xf/monolith:latest container_name: monolith build: . volumes: - ${PORTAINER_GIT_DIR}:/code - # env_file: - # - stack.env - networks: - - default - - xf - - db + - type: bind + source: /code/run + target: /var/run environment: PORTAINER_GIT_DIR: "${PORTAINER_GIT_DIR}" MODULES_ENABLED: "${MODULES_ENABLED}" @@ -49,41 +46,22 @@ services: MONOLITH_PROCESS_PERFSTATS: "${MONOLITH_PROCESS_PERFSTATS}" MONOLITH_CH4_BOARDS: "${MONOLITH_CH4_BOARDS}" REDIS_PASSWORD: "${REDIS_PASSWORD}" + MONOLITH_INGEST_INCREASE_BELOW: "${MONOLITH_INGEST_INCREASE_BELOW}" + MONOLITH_INGEST_INCREASE_BY: "${MONOLITH_INGEST_INCREASE_BY}" + MONOLITH_INGEST_DECREASE_ABOVE: "${MONOLITH_INGEST_DECREASE_ABOVE}" + MONOLITH_INGEST_DECREASE_BY: "${MONOLITH_INGEST_DECREASE_BY}" + MONOLITH_INGEST_MAX: "${MONOLITH_INGEST_MAX}" + MONOLITH_INGEST_MIN: "${MONOLITH_INGEST_MIN}" + deploy: + resources: + limits: + cpus: '0.5' + memory: 1.0G + network_mode: host - db: - #image: pathogen/manticore:kibana - image: manticoresearch/manticore:dev - container_name: monolith_db - #build: - # context: ./docker/manticore - # args: - # DEV: 1 - restart: always - ports: - - 9308 - - 9312 - - 9306 - ulimits: - nproc: 65535 - nofile: - soft: 65535 - hard: 65535 - memlock: - soft: -1 - hard: -1 - environment: - # - MCL=1 - - EXTRA=1 - networks: - - default - - xf - - db - volumes: - - ./docker/data:/var/lib/manticore - # - ./docker/manticore.conf:/etc/manticoresearch/manticore.conf # threshold: - # image: pathogen/threshold:latest + # image: xf/threshold:latest # container_name: threshold # build: legacy/docker # volumes: @@ -148,16 +126,17 @@ services: - "1289:1289" environment: - SSDB_PORT=1289 - networks: - - default - - db - - # tmp: - # image: busybox - # container_name: tmp_monolith - # command: chmod -R 777 /var/run/socks - # volumes: - # - /var/run/socks + volumes: + - monolith_ssdb_data:/var/lib/ssdb + # networks: + # - default + # - db + deploy: + resources: + limits: + cpus: '0.5' + memory: 1.0G + network_mode: host redis: image: redis @@ -171,6 +150,9 @@ services: volumes: - ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf - monolith_redis_data:/data + - type: bind + source: /code/run + target: /var/run # volumes_from: # - tmp healthcheck: @@ -178,18 +160,25 @@ services: interval: 2s timeout: 2s retries: 15 - networks: - - default - - xf - - db + # networks: + # - default + # - xf + # - db + deploy: + resources: + limits: + cpus: '0.5' + memory: 1.0G + network_mode: host -networks: - default: - driver: bridge - xf: - external: true - db: - external: true +# networks: +# default: +# driver: bridge +# xf: +# external: true +# db: +# external: true volumes: monolith_redis_data: + monolith_ssdb_data: \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index d622a8d..c53b7ed 100644 --- a/requirements.txt +++ b/requirements.txt @@ -22,3 +22,4 @@ orjson uvloop elasticsearch[async] msgpack +# flpc diff --git a/sources/ingest.py b/sources/ingest.py index a04c1f8..603fdf0 100644 --- a/sources/ingest.py +++ b/sources/ingest.py @@ -11,8 +11,15 @@ KEYNAME = "queue" CHUNK_SIZE = int(getenv("MONOLITH_INGEST_CHUNK_SIZE", "900")) ITER_DELAY = float(getenv("MONOLITH_INGEST_ITER_DELAY", "0.5")) +INGEST_INCREASE_BELOW = int(getenv("MONOLITH_INGEST_INCREASE_BELOW", "2500")) +INGEST_DECREASE_ABOVE = int(getenv("MONOLITH_INGEST_DECREASE_ABOVE", "10000")) + +INGEST_INCREASE_BY = int(getenv("MONOLITH_INGEST_INCREASE_BY", "100")) +INGEST_DECREASE_BY = int(getenv("MONOLITH_INGEST_DECREASE_BY", "100")) log = util.get_logger("ingest") +INGEST_MAX = int(getenv("MONOLITH_INGEST_MAX", "1000000")) +INGEST_MIN = int(getenv("MONOLITH_INGEST_MIN", "100")) class Ingest(object): def __init__(self): @@ -34,9 +41,36 @@ class Ingest(object): await asyncio.sleep(ITER_DELAY) async def get_chunk(self): + global CHUNK_SIZE length = await db.ar.llen(KEYNAME) if length > CHUNK_SIZE: length = CHUNK_SIZE if not length: return - await process.spawn_processing_threads(self.current_chunk, length) + ingested = await process.spawn_processing_threads(self.current_chunk, length) + + if ingested < INGEST_INCREASE_BELOW: + if CHUNK_SIZE + INGEST_INCREASE_BY < INGEST_MAX: + self.log.info( + ( + f"Increasing chunk size to " + f"{CHUNK_SIZE + INGEST_INCREASE_BY} " + f"due to low ingestion ({ingested})" + ) + ) + CHUNK_SIZE += INGEST_INCREASE_BY + else: + log.info(f"Chunk size ({CHUNK_SIZE}) at maximum, not increasing above: {INGEST_MAX}") + + elif ingested > INGEST_DECREASE_ABOVE: + if CHUNK_SIZE - INGEST_DECREASE_BY > INGEST_MIN: + self.log.info( + ( + f"Decreasing chunk size to " + f"{CHUNK_SIZE - INGEST_DECREASE_BY}" + f"due to high ingestion ({ingested})" + ) + ) + CHUNK_SIZE -= INGEST_DECREASE_BY + else: + log.info(f"Chunk size ({CHUNK_SIZE}) at minimum, not decreasing below: {INGEST_MIN}")