Improve memory usage and fix 4chan crawler

This commit is contained in:
Mark Veidemanis 2022-10-21 07:20:30 +01:00
parent 2d7b6268dd
commit 51a9b2af79
Signed by: m
GPG Key ID: 5ACFCEED46C0904F
6 changed files with 87 additions and 48 deletions

View File

@ -19,6 +19,8 @@ services:
- ${THRESHOLD_CONFIG_DIR}:/code/legacy/conf/live - ${THRESHOLD_CONFIG_DIR}:/code/legacy/conf/live
#- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates #- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates
- ${THRESHOLD_CERT_DIR}:/code/legacy/conf/cert - ${THRESHOLD_CERT_DIR}:/code/legacy/conf/cert
volumes_from:
- tmp
ports: ports:
- "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}" - "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}"
- "${THRESHOLD_RELAY_PORT}:${THRESHOLD_RELAY_PORT}" - "${THRESHOLD_RELAY_PORT}:${THRESHOLD_RELAY_PORT}"
@ -37,8 +39,37 @@ services:
environment: environment:
- SSDB_PORT=1289 - SSDB_PORT=1289
tmp:
image: busybox
container_name: tmp_monolith
command: chmod -R 777 /var/run/socks
volumes:
- /var/run/socks
redis:
image: redis
container_name: redis_monolith
command: redis-server /etc/redis.conf
ulimits:
nproc: 65535
nofile:
soft: 65535
hard: 65535
volumes:
- ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf
- redis_data:/data
volumes_from:
- tmp
healthcheck:
test: "redis-cli -s /var/run/socks/redis.sock ping"
interval: 2s
timeout: 2s
retries: 15
networks: networks:
default: default:
external: external:
name: pathogen name: pathogen
volumes:
redis_data:

View File

@ -1,2 +1,2 @@
unixsocket /var/run/redis/redis.sock unixsocket /var/run/socks/redis.sock
unixsocketperm 777 unixsocketperm 777

View File

@ -17,7 +17,7 @@
}, },
"Key": "key.pem", "Key": "key.pem",
"Certificate": "cert.pem", "Certificate": "cert.pem",
"RedisSocket": "/var/run/redis/redis.sock", "RedisSocket": "/var/run/socks/redis.sock",
"RedisDBEphemeral": 1, "RedisDBEphemeral": 1,
"RedisDBPersistent": 0, "RedisDBPersistent": 0,
"UsePassword": false, "UsePassword": false,

View File

@ -14,7 +14,6 @@ from concurrent.futures import ProcessPoolExecutor
# For timestamp processing # For timestamp processing
from datetime import datetime from datetime import datetime
from math import ceil
from os import getenv from os import getenv
import orjson import orjson
@ -35,7 +34,6 @@ from gensim.parsing.preprocessing import ( # stem_text,
strip_short, strip_short,
strip_tags, strip_tags,
) )
from numpy import array_split
from polyglot.detect.base import logger as polyglot_logger from polyglot.detect.base import logger as polyglot_logger
# For NLP # For NLP
@ -54,6 +52,8 @@ from schemas.ch4_s import ATTRMAP
trues = ("true", "1", "t", True) trues = ("true", "1", "t", True)
KEYNAME = "queue"
MONOLITH_PROCESS_PERFSTATS = ( MONOLITH_PROCESS_PERFSTATS = (
getenv("MONOLITH_PROCESS_PERFSTATS", "false").lower() in trues getenv("MONOLITH_PROCESS_PERFSTATS", "false").lower() in trues
) )
@ -106,20 +106,23 @@ hash_key = get_hash_key()
@asyncio.coroutine @asyncio.coroutine
async def spawn_processing_threads(data): async def spawn_processing_threads(chunk, length):
len_data = len(data) log.debug(f"Spawning processing threads for chunk {chunk} of length {length}")
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
tasks = [] tasks = []
if len(data) < CPU_THREADS * 100: if length < CPU_THREADS * 100:
split_data = [data] cores = 1
chunk_size = length
else: else:
msg_per_core = int(len(data) / CPU_THREADS) cores = CPU_THREADS
split_data = array_split(data, ceil(len(data) / msg_per_core)) chunk_size = int(length / cores)
for index, split in enumerate(split_data):
log.debug(f"Delegating processing of {len(split)} messages to thread {index}") for index in range(cores):
task = loop.run_in_executor(p, process_data, split) log.debug(
f"[{chunk}/{index}] Delegating {chunk_size} messages to thread {index}"
)
task = loop.run_in_executor(p, process_data, chunk, index, chunk_size)
tasks.append(task) tasks.append(task)
results = [await task for task in tasks] results = [await task for task in tasks]
@ -128,8 +131,8 @@ async def spawn_processing_threads(data):
flat_list = [item for sublist in results for item in sublist] flat_list = [item for sublist in results for item in sublist]
log.debug( log.debug(
( (
f"Results from processing of {len_data} messages in " f"[{chunk}/{index}] Results from processing of {length} messages in "
f"{len(split_data)} threads: {len(flat_list)}" f"{cores} threads: {len(flat_list)}"
) )
) )
await db.store_kafka_batch(flat_list) await db.store_kafka_batch(flat_list)
@ -137,7 +140,8 @@ async def spawn_processing_threads(data):
# log.debug(f"Finished processing {len_data} messages") # log.debug(f"Finished processing {len_data} messages")
def process_data(data): def process_data(chunk, index, chunk_size):
log.debug(f"[{chunk}/{index}] Processing {chunk_size} messages")
to_store = [] to_store = []
sentiment_time = 0.0 sentiment_time = 0.0
@ -154,7 +158,11 @@ def process_data(data):
# Initialise sentiment analyser # Initialise sentiment analyser
analyzer = SentimentIntensityAnalyzer() analyzer = SentimentIntensityAnalyzer()
for msg in data: for msg_index in range(chunk_size):
msg = db.r.rpop(KEYNAME)
if not msg:
return
msg = orjson.loads(msg)
total_start = time.process_time() total_start = time.process_time()
# normalise fields # normalise fields
start = time.process_time() start = time.process_time()
@ -185,13 +193,16 @@ def process_data(data):
post_normalised = orjson.dumps(msg, option=orjson.OPT_SORT_KEYS) post_normalised = orjson.dumps(msg, option=orjson.OPT_SORT_KEYS)
hash = siphash(hash_key, post_normalised) hash = siphash(hash_key, post_normalised)
hash = str(hash) hash = str(hash)
redis_key = f"cache.{board}.{thread}.{msg['no']}" redis_key = (
f"cache.{board}.{thread}.{msg['no']}.{msg['resto']}.{msg['now']}"
)
key_content = db.r.get(redis_key) key_content = db.r.get(redis_key)
if key_content: if key_content is not None:
key_content = key_content.decode("ascii") key_content = key_content.decode("ascii")
if key_content == hash: if key_content == hash:
# This deletes the message since the append at the end won't be hit # This deletes the message since the append at the end won't be hit
continue continue
# pass
else: else:
msg["type"] = "update" msg["type"] = "update"
db.r.set(redis_key, hash) db.r.set(redis_key, hash)
@ -243,7 +254,7 @@ def process_data(data):
msg["lang_code"] = lang_code msg["lang_code"] = lang_code
msg["lang_name"] = lang_name msg["lang_name"] = lang_name
except cld2_error as e: except cld2_error as e:
log.error(f"Error detecting language: {e}") log.error(f"[{chunk}/{index}] Error detecting language: {e}")
# So below block doesn't fail # So below block doesn't fail
lang_code = None lang_code = None
time_took = (time.process_time() - start) * 1000 time_took = (time.process_time() - start) * 1000
@ -277,6 +288,8 @@ def process_data(data):
if MONOLITH_PROCESS_PERFSTATS: if MONOLITH_PROCESS_PERFSTATS:
log.debug("=====================================") log.debug("=====================================")
log.debug(f"Chunk: {chunk}")
log.debug(f"Index: {index}")
log.debug(f"Sentiment: {sentiment_time}") log.debug(f"Sentiment: {sentiment_time}")
log.debug(f"Regex: {regex_time}") log.debug(f"Regex: {regex_time}")
log.debug(f"Polyglot: {polyglot_time}") log.debug(f"Polyglot: {polyglot_time}")

View File

@ -74,26 +74,28 @@ class Chan4(object):
async def get_thread_lists(self, boards): async def get_thread_lists(self, boards):
# self.log.debug(f"Getting thread list for {boards}") # self.log.debug(f"Getting thread list for {boards}")
board_urls = {board: f"{board}/catalog.json" for board in boards} board_urls = {board: f"{board}/threads.json" for board in boards}
responses = await self.api_call(board_urls) responses = await self.api_call(board_urls)
to_get = [] to_get = []
flat_map = [board for board, thread in responses] flat_map = [board for board, thread in responses]
self.log.debug(f"Got thread list for {flat_map}: {len(responses)}") self.log.debug(f"Got thread list for {len(responses)} boards: {flat_map}")
for mapped, response in responses: for board, response in responses:
if not response: if not response:
continue continue
for page in response: for page in response:
for threads in page["threads"]: for threads in page["threads"]:
no = threads["no"] no = threads["no"]
to_get.append((mapped, no)) to_get.append((board, no))
if not to_get: if not to_get:
return return
self.log.debug(f"Got {len(to_get)} threads to fetch")
split_threads = array_split(to_get, ceil(len(to_get) / THREADS_CONCURRENT)) split_threads = array_split(to_get, ceil(len(to_get) / THREADS_CONCURRENT))
for threads in split_threads: self.log.debug(f"Split threads into {len(split_threads)} series")
await self.get_threads_content(threads) for index, thr in enumerate(split_threads):
self.log.debug(f"Series {index} - getting {len(thr)} threads")
await self.get_threads_content(thr)
await asyncio.sleep(THREADS_DELAY) await asyncio.sleep(THREADS_DELAY)
# await self.get_threads_content(to_get)
def take_items(self, dict_list, n): def take_items(self, dict_list, n):
i = 0 i = 0
@ -132,14 +134,14 @@ class Chan4(object):
to_store = [] to_store = []
for key, post_list in posts.items(): for key, post_list in posts.items():
board, thread = key board, thread = key
for index, post in enumerate(post_list): for post in post_list:
posts[key][index]["type"] = "msg" post["type"] = "msg"
posts[key][index]["src"] = "4ch" post["src"] = "4ch"
posts[key][index]["net"] = board post["net"] = board
posts[key][index]["channel"] = thread post["channel"] = thread
to_store.append(posts[key][index]) to_store.append(post)
if to_store: if to_store:
await db.queue_message_bulk(to_store) await db.queue_message_bulk(to_store)

View File

@ -1,8 +1,6 @@
import asyncio import asyncio
from os import getenv from os import getenv
import orjson
import db import db
import util import util
from processing import process from processing import process
@ -20,6 +18,7 @@ class Ingest(object):
def __init__(self): def __init__(self):
name = self.__class__.__name__ name = self.__class__.__name__
self.log = util.get_logger(name) self.log = util.get_logger(name)
self.current_chunk = 0
self.log.info( self.log.info(
( (
"Starting ingest handler for chunk size of " "Starting ingest handler for chunk size of "
@ -30,20 +29,14 @@ class Ingest(object):
async def run(self): async def run(self):
while True: while True:
await self.get_chunk() await self.get_chunk()
self.log.debug(f"Ingest chunk {self.current_chunk} complete")
self.current_chunk += 1
await asyncio.sleep(ITER_DELAY) await asyncio.sleep(ITER_DELAY)
async def get_chunk(self): async def get_chunk(self):
items = []
# for source in SOURCES:
# key = f"{KEYPREFIX}{source}"
length = await db.ar.llen(KEYNAME) length = await db.ar.llen(KEYNAME)
start_num = length - CHUNK_SIZE if length > CHUNK_SIZE:
chunk = await db.ar.lrange(KEYNAME, start_num, -1) length = CHUNK_SIZE
# chunk = await db.ar.rpop(KEYNAME, CHUNK_SIZE) if not length:
if not chunk:
return return
for item in chunk: await process.spawn_processing_threads(self.current_chunk, length)
item = orjson.loads(item)
items.append(item)
if items:
await process.spawn_processing_threads(items)