monolith/sources/ingest.py

43 lines
1.1 KiB
Python
Raw Normal View History

2022-09-07 06:20:30 +00:00
import asyncio
from os import getenv
2022-09-07 06:20:30 +00:00
import db
import util
from processing import process
KEYNAME = "queue"
2022-09-30 06:22:22 +00:00
# Chunk size
2022-09-20 17:13:46 +00:00
CHUNK_SIZE = int(getenv("MONOLITH_INGEST_CHUNK_SIZE", "900"))
ITER_DELAY = float(getenv("MONOLITH_INGEST_ITER_DELAY", "0.5"))
log = util.get_logger("ingest")
2022-09-07 06:20:30 +00:00
class Ingest(object):
def __init__(self):
name = self.__class__.__name__
self.log = util.get_logger(name)
self.current_chunk = 0
2022-09-23 07:32:29 +00:00
self.log.info(
(
"Starting ingest handler for chunk size of "
f"{CHUNK_SIZE} every {ITER_DELAY} seconds."
)
)
async def run(self):
while True:
await self.get_chunk()
self.log.debug(f"Ingest chunk {self.current_chunk} complete")
self.current_chunk += 1
await asyncio.sleep(ITER_DELAY)
async def get_chunk(self):
2022-10-21 06:20:30 +00:00
length = await db.ar.llen(KEYNAME)
if length > CHUNK_SIZE:
length = CHUNK_SIZE
if not length:
return
await process.spawn_processing_threads(self.current_chunk, length)