Use only one Redis key for the queue to make chunk size more precise for thread allocation
This commit is contained in:
parent
5992498493
commit
63081f68b7
14
db.py
14
db.py
|
@ -40,7 +40,7 @@ MAIN_SRC_MAP = {
|
|||
|
||||
TYPES_META = ["who"]
|
||||
TYPES_INT = ["conn", "highlight", "znc", "query", "self"]
|
||||
KEYPREFIX = "queue."
|
||||
KEYNAME = "queue"
|
||||
|
||||
|
||||
async def store_kafka_batch(data):
|
||||
|
@ -113,12 +113,12 @@ async def queue_message(msg):
|
|||
"""
|
||||
Queue a message on the Redis buffer.
|
||||
"""
|
||||
src = msg["src"]
|
||||
# src = msg["src"]
|
||||
message = orjson.dumps(msg)
|
||||
|
||||
key = f"{KEYPREFIX}{src}"
|
||||
# key = f"{KEYPREFIX}{src}"
|
||||
# log.debug(f"Queueing single message of string length {len(message)}")
|
||||
await ar.sadd(key, message)
|
||||
await ar.sadd(KEYNAME, message)
|
||||
|
||||
|
||||
async def queue_message_bulk(data):
|
||||
|
@ -127,8 +127,8 @@ async def queue_message_bulk(data):
|
|||
"""
|
||||
# log.debug(f"Queueing message batch of length {len(data)}")
|
||||
for msg in data:
|
||||
src = msg["src"]
|
||||
# src = msg["src"]
|
||||
message = orjson.dumps(msg)
|
||||
|
||||
key = f"{KEYPREFIX}{src}"
|
||||
await ar.sadd(key, message)
|
||||
# key = f"{KEYPREFIX}{src}"
|
||||
await ar.sadd(KEYNAME, message)
|
||||
|
|
|
@ -127,11 +127,11 @@ services:
|
|||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
||||
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
|
||||
KAFKA_MESSAGE_MAX_BYTES: 2000000
|
||||
KAFKA_HEAP_OPTS: -Xmx2g
|
||||
#KAFKA_HEAP_OPTS: -Xmx2g
|
||||
healthcheck:
|
||||
test: ["CMD", "kafka-topics.sh", "--list", "--bootstrap-server", "kafka:9092"]
|
||||
start_period: 15s
|
||||
interval: 2s
|
||||
interval: 30s
|
||||
timeout: 30s
|
||||
retries: 45
|
||||
|
||||
|
|
|
@ -121,11 +121,11 @@ services:
|
|||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
||||
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
|
||||
KAFKA_MESSAGE_MAX_BYTES: 2000000
|
||||
KAFKA_HEAP_OPTS: -Xmx2g
|
||||
#KAFKA_HEAP_OPTS: -Xmx2g
|
||||
healthcheck:
|
||||
test: ["CMD", "kafka-topics.sh", "--list", "--bootstrap-server", "kafka:9092"]
|
||||
start_period: 15s
|
||||
interval: 2s
|
||||
interval: 30s
|
||||
timeout: 30s
|
||||
retries: 45
|
||||
|
||||
|
|
|
@ -67,7 +67,7 @@ def parsemeta(numName, c):
|
|||
|
||||
def queue_message(c):
|
||||
message = json.dumps(c)
|
||||
main.g.sadd(main.config["Ingest"]["Key"], message)
|
||||
main.g.sadd("queue", message)
|
||||
|
||||
|
||||
def event(
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
import asyncio
|
||||
import sys
|
||||
from os import getenv
|
||||
|
||||
import uvloop
|
||||
|
|
|
@ -101,18 +101,17 @@ hash_key = get_hash_key()
|
|||
@asyncio.coroutine
|
||||
async def spawn_processing_threads(data):
|
||||
len_data = len(data)
|
||||
# log.debug(f"Spawning processing threads for batch of {len_data} messages")
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
tasks = []
|
||||
|
||||
if len(data) < CPU_THREADS:
|
||||
if len(data) < CPU_THREADS * 100:
|
||||
split_data = [data]
|
||||
else:
|
||||
msg_per_core = int(len(data) / CPU_THREADS)
|
||||
split_data = array_split(data, ceil(len(data) / msg_per_core))
|
||||
for index, split in enumerate(split_data):
|
||||
# log.debug(f"Delegating processing of {len(split)} messages to thread {index}")
|
||||
log.debug(f"Delegating processing of {len(split)} messages to thread {index}")
|
||||
task = loop.run_in_executor(p, process_data, split)
|
||||
tasks.append(task)
|
||||
|
||||
|
|
|
@ -7,12 +7,13 @@ import db
|
|||
import util
|
||||
from processing import process
|
||||
|
||||
SOURCES = ["4ch", "irc", "dis"]
|
||||
# SOURCES = ["4ch", "irc", "dis"]
|
||||
# DEBUG CODE REMOVE ME
|
||||
# SOURCES.remove("4ch")
|
||||
# SOURCES.remove("dis")
|
||||
# DEBUG CODE REMOVE ME
|
||||
KEYPREFIX = "queue."
|
||||
# KEYPREFIX = "queue."
|
||||
KEYNAME = "queue"
|
||||
|
||||
# Chunk size per source (divide by len(SOURCES) for total)
|
||||
CHUNK_SIZE = int(getenv("MONOLITH_INGEST_CHUNK_SIZE", "900"))
|
||||
|
@ -39,13 +40,13 @@ class Ingest(object):
|
|||
|
||||
async def get_chunk(self):
|
||||
items = []
|
||||
for source in SOURCES:
|
||||
key = f"{KEYPREFIX}{source}"
|
||||
chunk = await db.ar.spop(key, CHUNK_SIZE)
|
||||
if not chunk:
|
||||
continue
|
||||
for item in chunk:
|
||||
item = orjson.loads(item)
|
||||
items.append(item)
|
||||
# for source in SOURCES:
|
||||
# key = f"{KEYPREFIX}{source}"
|
||||
chunk = await db.ar.spop(KEYNAME, CHUNK_SIZE)
|
||||
if not chunk:
|
||||
return
|
||||
for item in chunk:
|
||||
item = orjson.loads(item)
|
||||
items.append(item)
|
||||
if items:
|
||||
await process.spawn_processing_threads(items)
|
||||
|
|
Loading…
Reference in New Issue