import asyncio from os import getenv import db import util from processing import process KEYNAME = "queue" # Chunk size CHUNK_SIZE = int(getenv("MONOLITH_INGEST_CHUNK_SIZE", "900")) ITER_DELAY = float(getenv("MONOLITH_INGEST_ITER_DELAY", "0.5")) INGEST_INCREASE_BELOW = int(getenv("MONOLITH_INGEST_INCREASE_BELOW", "2500")) INGEST_DECREASE_ABOVE = int(getenv("MONOLITH_INGEST_DECREASE_ABOVE", "10000")) INGEST_INCREASE_BY = int(getenv("MONOLITH_INGEST_INCREASE_BY", "100")) INGEST_DECREASE_BY = int(getenv("MONOLITH_INGEST_DECREASE_BY", "100")) log = util.get_logger("ingest") INGEST_MAX = int(getenv("MONOLITH_INGEST_MAX", "1000000")) INGEST_MIN = int(getenv("MONOLITH_INGEST_MIN", "100")) class Ingest(object): def __init__(self): name = self.__class__.__name__ self.log = util.get_logger(name) self.current_chunk = 0 self.log.info( ( "Starting ingest handler for chunk size of " f"{CHUNK_SIZE} every {ITER_DELAY} seconds." ) ) async def run(self): while True: await self.get_chunk() self.log.debug(f"Ingest chunk {self.current_chunk} complete") self.current_chunk += 1 await asyncio.sleep(ITER_DELAY) async def get_chunk(self): global CHUNK_SIZE length = await db.ar.llen(KEYNAME) if length > CHUNK_SIZE: length = CHUNK_SIZE if not length: return ingested = await process.spawn_processing_threads(self.current_chunk, length) if ingested < INGEST_INCREASE_BELOW: if CHUNK_SIZE + INGEST_INCREASE_BY < INGEST_MAX: self.log.info( ( f"Increasing chunk size to " f"{CHUNK_SIZE + INGEST_INCREASE_BY} " f"due to low ingestion ({ingested})" ) ) CHUNK_SIZE += INGEST_INCREASE_BY else: log.info(f"Chunk size ({CHUNK_SIZE}) at maximum, not increasing above: {INGEST_MAX}") elif ingested > INGEST_DECREASE_ABOVE: if CHUNK_SIZE - INGEST_DECREASE_BY > INGEST_MIN: self.log.info( ( f"Decreasing chunk size to " f"{CHUNK_SIZE - INGEST_DECREASE_BY}" f"due to high ingestion ({ingested})" ) ) CHUNK_SIZE -= INGEST_DECREASE_BY else: log.info(f"Chunk size ({CHUNK_SIZE}) at minimum, not decreasing below: {INGEST_MIN}")