diff --git a/db.py b/db.py index 4c92555..473a2c0 100644 --- a/db.py +++ b/db.py @@ -1,28 +1,15 @@ import random -from math import ceil import aioredis -import manticoresearch -import ujson +import orjson + +# Kafka from aiokafka import AIOKafkaProducer -from manticoresearch.rest import ApiException -from numpy import array_split from redis import StrictRedis import util -# Manticore schema -from schemas import mc_s - -# Manticore -configuration = manticoresearch.Configuration(host="http://monolith-db-1:9308") -api_client = manticoresearch.ApiClient(configuration) -api_instance = manticoresearch.IndexApi(api_client) - -# Kafka -from aiokafka import AIOKafkaProducer - -KAFKA_TOPIC = "msg" +# KAFKA_TOPIC = "msg" log = util.get_logger("db") @@ -51,103 +38,62 @@ KEYPREFIX = "queue." async def store_kafka_batch(data): - print("STORING KAFKA BATCH") + log.debug(f"Storing Kafka batch of {len(data)} messages") producer = AIOKafkaProducer(bootstrap_servers="kafka:9092") await producer.start() batch = producer.create_batch() for msg in data: if msg["type"] in TYPES_MAIN: index = "main" - schema = mc_s.schema_main + # schema = mc_s.schema_main elif msg["type"] in TYPES_META: index = "meta" - schema = mc_s.schema_meta + # schema = mc_s.schema_meta elif msg["type"] in TYPES_INT: index = "internal" - schema = mc_s.schema_int + # schema = mc_s.schema_int + + KAFKA_TOPIC = index # normalise fields for key, value in list(msg.items()): if value is None: del msg[key] - if key in schema: - if isinstance(value, int): - if schema[key].startswith("string") or schema[key].startswith( - "text" - ): - msg[key] = str(value) - message = ujson.dumps(msg) - body = str.encode(message) + # if key in schema: + # if isinstance(value, int): + # if schema[key].startswith("string") or schema[key].startswith( + # "text" + # ): + # msg[key] = str(value) + body = orjson.dumps(msg) + # orjson returns bytes + # body = str.encode(message) if "ts" not in msg: - # print("MSG WITHOUT TS", msg) - continue + raise Exception("No TS in msg") metadata = batch.append(key=None, value=body, timestamp=msg["ts"]) if metadata is None: partitions = await producer.partitions_for(KAFKA_TOPIC) partition = random.choice(tuple(partitions)) await producer.send_batch(batch, KAFKA_TOPIC, partition=partition) - print( - "%d messages sent to partition %d" % (batch.record_count(), partition) - ) + log.debug(f"{batch.record_count()} messages sent to partition {partition}") batch = producer.create_batch() continue partitions = await producer.partitions_for(KAFKA_TOPIC) partition = random.choice(tuple(partitions)) await producer.send_batch(batch, KAFKA_TOPIC, partition=partition) - print("%d messages sent to partition %d" % (batch.record_count(), partition)) + log.debug(f"{batch.record_count()} messages sent to partition {partition}") await producer.stop() -# def store_message(msg): -# """ -# Store a message into Manticore -# :param msg: dict -# """ -# store_kafka(msg) -# # Duplicated to avoid extra function call -# if msg["type"] in TYPES_MAIN: -# index = "main" -# schema = mc_s.schema_main -# elif msg["type"] in TYPES_META: -# index = "meta" -# schema = mc_s.schema_meta -# elif msg["type"] in TYPES_INT: -# index = "internal" -# schema = mc_s.schema_int -# # normalise fields -# for key, value in list(msg.items()): -# if value is None: -# del msg[key] -# if key in schema: -# if isinstance(value, int): -# if schema[key].startswith("string") or schema[key].startswith("text"): -# msg[key] = str(value) - -# body = [{"insert": {"index": index, "doc": msg}}] -# body_post = "" -# for item in body: -# body_post += ujson.dumps(item) -# body_post += "\n" - -# # print(body_post) -# try: -# # Bulk index operations -# print("FAKE POST") -# #api_response = api_instance.bulk(body_post) # , async_req=True -# # print(api_response) -# except ApiException as e: -# print("Exception when calling IndexApi->bulk: %s\n" % e) -# print("ATTEMPT", body_post) - - async def queue_message(msg): """ Queue a message on the Redis buffer. """ src = msg["src"] - message = ujson.dumps(msg) + message = orjson.dumps(msg) key = f"{KEYPREFIX}{src}" + # log.debug(f"Queueing single message of string length {len(message)}") await ar.sadd(key, message) @@ -155,102 +101,10 @@ async def queue_message_bulk(data): """ Queue multiple messages on the Redis buffer. """ + # log.debug(f"Queueing message batch of length {len(data)}") for msg in data: src = msg["src"] - message = ujson.dumps(msg) + message = orjson.dumps(msg) key = f"{KEYPREFIX}{src}" await ar.sadd(key, message) - - -# For now, make a normal function until we go full async -def queue_message_bulk_sync(data): - """ - Queue multiple messages on the Redis buffer. - """ - for msg in data: - src = msg["src"] - message = ujson.dumps(msg) - - key = "{KEYPREFIX}{src}" - r.sadd(key, message) - - -# def store_message_bulk(data): -# """ -# Store a message into Manticore -# :param msg: dict -# """ -# if not data: -# return -# for msg in data: -# store_kafka(msg) -# # 10000: maximum inserts we can submit to -# # Manticore as of Sept 2022 -# split_posts = array_split(data, ceil(len(data) / 10000)) -# for messages in split_posts: -# total = [] -# for msg in messages: -# # Duplicated to avoid extra function call (see above) -# if msg["type"] in TYPES_MAIN: -# index = "main" -# schema = mc_s.schema_main -# elif msg["type"] in TYPES_META: -# index = "meta" -# schema = mc_s.schema_meta -# elif msg["type"] in TYPES_INT: -# index = "internal" -# schema = mc_s.schema_int -# # normalise fields -# for key, value in list(msg.items()): -# if value is None: -# del msg[key] -# if key in schema: -# if isinstance(value, int): -# if schema[key].startswith("string") or schema[key].startswith( -# "text" -# ): -# msg[key] = str(value) - -# body = {"insert": {"index": index, "doc": msg}} -# total.append(body) - -# body_post = "" -# for item in total: -# body_post += ujson.dumps(item) -# body_post += "\n" - -# # print(body_post) -# try: -# # Bulk index operations -# print("FAKE POST") -# #api_response = api_instance.bulk(body_post) # , async_req=True -# #print(api_response) -# except ApiException as e: -# print("Exception when calling IndexApi->bulk: %s\n" % e) -# print("ATTEMPT", body_post) - - -# def update_schema(): -# pass - - -# def create_index(api_client): -# util_instance = manticoresearch.UtilsApi(api_client) -# schemas = { -# "main": mc_s.schema_main, -# "meta": mc_s.schema_meta, -# "internal": mc_s.schema_int, -# } -# for name, schema in schemas.items(): -# schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()]) - -# create_query = ( -# f"create table if not exists {name}({schema_types}) engine='columnar'" -# ) -# print("Schema types", create_query) -# util_instance.sql(create_query) - - -# create_index(api_client) -# update_schema() diff --git a/docker-compose.yml b/docker-compose.yml index f82b86b..d0d2f4c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -20,10 +20,14 @@ services: volumes_from: - tmp depends_on: - - broker - - kafka - - tmp - - redis + broker: + condition: service_started + kafka: + condition: service_healthy + tmp: + condition: service_started + redis: + condition: service_healthy # - db threshold: @@ -46,8 +50,10 @@ services: volumes_from: - tmp depends_on: - - tmp - - redis + tmp: + condition: service_started + redis: + condition: service_healthy turnilo: container_name: turnilo @@ -102,6 +108,17 @@ services: KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 ALLOW_PLAINTEXT_LISTENER: yes + # healthcheck: + # test: ["CMD-SHELL", "kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic main --describe"] + # interval: 2s + # timeout: 2s + # retries: 15 + healthcheck: + test: ["CMD", "kafka-topics.sh", "--list", "--bootstrap-server", "kafka:9092"] + start_period: 15s + interval: 2s + timeout: 5s + retries: 30 coordinator: image: apache/druid:0.23.0 @@ -230,6 +247,11 @@ services: - ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf volumes_from: - tmp + healthcheck: + test: "redis-cli -s /var/run/redis/redis.sock ping" + interval: 2s + timeout: 2s + retries: 15 networks: default: diff --git a/docker/Dockerfile b/docker/Dockerfile index c133ace..0da9448 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -16,7 +16,7 @@ COPY requirements.txt /code/ COPY discord-patched.tgz /code/ RUN python -m venv /venv -RUN . /venv/bin/activate && pip install -r requirements.txt +RUN . /venv/bin/activate && pip install -r requirements.txt && python -m spacy download en_core_web_sm RUN tar xf /code/discord-patched.tgz -C /venv/lib/python3.10/site-packages diff --git a/docker/requirements.txt b/docker/requirements.txt index 137542a..35412d3 100644 --- a/docker/requirements.txt +++ b/docker/requirements.txt @@ -4,8 +4,18 @@ redis siphashc aiohttp[speedups] python-dotenv -manticoresearch +#manticoresearch numpy ujson aioredis[hiredis] aiokafka +vaderSentiment +polyglot +pyicu +pycld2 +morfessor +six +nltk +spacy +python-Levenshtein +orjson diff --git a/event_log.txt b/event_log.txt new file mode 100644 index 0000000..e69de29 diff --git a/monolith.py b/monolith.py index ff3b929..1eb559b 100644 --- a/monolith.py +++ b/monolith.py @@ -1,19 +1,11 @@ import asyncio from os import getenv -import db import util from sources.ch4 import Chan4 from sources.dis import DiscordClient from sources.ingest import Ingest -# For development -# if not getenv("DISCORD_TOKEN", None): -# print("Could not get Discord token, attempting load from .env") -# from dotenv import load_dotenv - -# load_dotenv() - log = util.get_logger("monolith") modules_enabled = getenv("MODULES_ENABLED", False) diff --git a/processing/process.py b/processing/process.py index b547845..4a6409c 100644 --- a/processing/process.py +++ b/processing/process.py @@ -4,25 +4,73 @@ import random # For key generation import string + +# Squash errors +import warnings from concurrent.futures import ProcessPoolExecutor # For timestamp processing from datetime import datetime from math import ceil -import ujson +import orjson + +# Tokenisation +import spacy # For 4chan message parsing from bs4 import BeautifulSoup from numpy import array_split +from polyglot.detect.base import logger as polyglot_logger + +# For NLP +from polyglot.text import Text +from pycld2 import error as cld2_error from siphashc import siphash +# For sentiment +from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer + import db import util # 4chan schema from schemas.ch4_s import ATTRMAP +# For tokenisation +# from gensim.parsing.preprocessing import ( +# strip_tags, +# strip_punctuation, +# strip_numeric, +# stem_text, +# strip_multiple_whitespaces, +# strip_non_alphanum, +# remove_stopwords, +# strip_short, +# preprocess_string, +# ) + +# CUSTOM_FILTERS = [ +# lambda x: x.lower(), +# strip_tags, # +# strip_punctuation, # +# strip_multiple_whitespaces, +# strip_numeric, +# remove_stopwords, +# strip_short, +# #stem_text, +# strip_non_alphanum, # +# ] + +# Squash errors +polyglot_logger.setLevel("ERROR") +warnings.filterwarnings("ignore", category=UserWarning, module="bs4") + + +TAGS = ["NOUN", "ADJ", "VERB", "ADV"] +nlp = spacy.load("en_core_web_sm", disable=["parser", "ner"]) + + log = util.get_logger("process") # Maximum number of CPU threads to use for post processing @@ -49,67 +97,44 @@ hash_key = get_hash_key() @asyncio.coroutine async def spawn_processing_threads(data): + len_data = len(data) + log.debug(f"Spawning processing threads for batch of {len_data} messages") + loop = asyncio.get_event_loop() tasks = [] - oldts = [x["now"] for x in data if "now" in x] + if len(data) < CPU_THREADS: split_data = [data] else: msg_per_core = int(len(data) / CPU_THREADS) - print("MSG PER CORE", msg_per_core) split_data = array_split(data, ceil(len(data) / msg_per_core)) for index, split in enumerate(split_data): - print("DELEGATING TO THREAD", len(split)) - future = loop.run_in_executor(p, process_data, data) - # future = p.submit(process_data, split) - tasks.append(future) - # results = [x.result(timeout=50) for x in tasks] - results = await asyncio.gather(*tasks) - print("RESULTS", len(results)) + log.debug(f"Delegating processing of {len(split)} messages to thread {index}") + task = loop.run_in_executor(p, process_data, data) + tasks.append(task) + + results = [await task for task in tasks] + log.debug(f"Results from processing of {len_data} messages: {len(results)}") # Join the results back from the split list flat_list = [item for sublist in results for item in sublist] - print("LENFLAT", len(flat_list)) - print("LENDATA", len(data)) - - newts = [x["ts"] for x in flat_list if "ts" in x] - print("lenoldts", len(oldts)) - print("lennewts", len(newts)) - allts = all(["ts" in x for x in flat_list]) - print("ALLTS", allts) - alllen = [len(x) for x in flat_list] - print("ALLLEN", alllen) await db.store_kafka_batch(flat_list) - -# @asyncio.coroutine -# def process_data_thread(data): -# """ -# Helper to spawn threads to process a list of data. -# """ -# loop = asyncio.get_event_loop() -# if len(data) < CPU_THREADS: -# split_data = [data] -# else: -# msg_per_core = int(len(data) / CPU_THREADS) -# print("MSG PER CORE", msg_per_core) -# split_data = array_split(data, ceil(len(data) / msg_per_core)) -# for index, split in enumerate(split_data): -# print("DELEGATING TO THREAD", len(split)) -# #f = process_data_thread(split) -# yield loop.run_in_executor(p, process_data, data) + log.debug(f"Finished processing {len_data} messages") def process_data(data): - print("PROCESS DATA START") - # to_store = [] - for index, msg in enumerate(data): - # print("PROCESSING", msg) + to_store = [] + + # Initialise sentiment analyser + analyzer = SentimentIntensityAnalyzer() + for msg in data: if msg["src"] == "4ch": board = msg["net"] thread = msg["channel"] + # Calculate hash for post - post_normalised = ujson.dumps(msg, sort_keys=True) + post_normalised = orjson.dumps(msg, option=orjson.OPT_SORT_KEYS) hash = siphash(hash_key, post_normalised) hash = str(hash) redis_key = f"cache.{board}.{thread}.{msg['no']}" @@ -117,19 +142,17 @@ def process_data(data): if key_content: key_content = key_content.decode("ascii") if key_content == hash: - del data[index] + # This deletes the message since the append at the end won't be hit continue else: - data[index]["type"] = "update" + msg["type"] = "update" db.r.set(redis_key, hash) - if "now" not in data[index]: - print("NOW NOT IN INDEX", data[index]) - for key2, value in list(data[index].items()): + for key2, value in list(msg.items()): if key2 in ATTRMAP: - data[index][ATTRMAP[key2]] = data[index][key2] - del data[index][key2] - if "ts" in data[index]: - old_time = data[index]["ts"] + msg[ATTRMAP[key2]] = msg[key2] + del msg[key2] + if "ts" in msg: + old_time = msg["ts"] # '08/30/22(Tue)02:25:37' time_spl = old_time.split(":") if len(time_spl) == 3: @@ -138,14 +161,42 @@ def process_data(data): old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M") # new_ts = old_ts.isoformat() new_ts = int(old_ts.timestamp()) - data[index]["ts"] = new_ts + msg["ts"] = new_ts else: - print("MSG WITHOUT TS PROCESS", data[index]) - continue + raise Exception("No TS in msg") if "msg" in msg: - soup = BeautifulSoup(data[index]["msg"], "html.parser") - msg = soup.get_text(separator="\n") - data[index]["msg"] = msg - # to_store.append(data[index]) - print("FINISHED PROCESSING DATA") - return data + soup = BeautifulSoup(msg["msg"], "html.parser") + msg_str = soup.get_text(separator="\n") + msg["msg"] = msg_str + # Annotate sentiment/NLP + if "msg" in msg: + # Language + text = Text(msg["msg"]) + try: + lang_code = text.language.code + lang_name = text.language.name + msg["lang_code"] = lang_code + msg["lang_name"] = lang_name + except cld2_error as e: + log.error(f"Error detecting language: {e}") + # So below block doesn't fail + lang_code = None + + # Blatant discrimination + if lang_code == "en": + + # Sentiment + vs = analyzer.polarity_scores(str(msg["msg"])) + addendum = vs["compound"] + msg["sentiment"] = addendum + + # Tokens + n = nlp(msg["msg"]) + for tag in TAGS: + tag_name = tag.lower() + tags_flag = [token.lemma_ for token in n if token.pos_ == tag] + msg[f"words_{tag_name}"] = tags_flag + + # Add the mutated message to the return buffer + to_store.append(msg) + return to_store diff --git a/requirements.txt b/requirements.txt index 10ad22d..d020596 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,8 +5,18 @@ redis siphashc aiohttp[speedups] python-dotenv -manticoresearch +#manticoresearch numpy ujson aioredis[hiredis] aiokafka +vaderSentiment +polyglot +pyicu +pycld2 +morfessor +six +nltk +spacy +python-Levenshtein +orjson diff --git a/sources/ch4.py b/sources/ch4.py index 7640c9b..4ece35f 100644 --- a/sources/ch4.py +++ b/sources/ch4.py @@ -2,19 +2,13 @@ import asyncio import random import string -from concurrent.futures import ProcessPoolExecutor -from datetime import datetime from math import ceil import aiohttp -import ujson -from bs4 import BeautifulSoup from numpy import array_split -from siphashc import siphash import db import util -from schemas.ch4_s import ATTRMAP # CONFIGURATION # @@ -30,13 +24,8 @@ CRAWL_DELAY = 5 # Semaphore value ? THREADS_SEMAPHORE = 1000 -# Maximum number of CPU threads to use for post processing -CPU_THREADS = 8 - # CONFIGURATION END # -p = ProcessPoolExecutor(CPU_THREADS) - class Chan4(object): """ @@ -83,10 +72,12 @@ class Chan4(object): self.log.debug(f"Got boards: {self.boards}") async def get_thread_lists(self, boards): - self.log.debug(f"Getting thread list for {boards}") + # self.log.debug(f"Getting thread list for {boards}") board_urls = {board: f"{board}/catalog.json" for board in boards} responses = await self.api_call(board_urls) to_get = [] + flat_map = [board for board, thread in responses] + self.log.debug(f"Got thread list for {flat_map}: {len(responses)}") for mapped, response in responses: if not response: continue @@ -95,7 +86,6 @@ class Chan4(object): no = threads["no"] to_get.append((mapped, no)) - self.log.debug(f"Got thread list for {mapped}: {len(response)}") if not to_get: return split_threads = array_split(to_get, ceil(len(to_get) / THREADS_CONCURRENT)) @@ -122,46 +112,20 @@ class Chan4(object): (board, thread): f"{board}/thread/{thread}.json" for board, thread in thread_list } - self.log.debug(f"Getting information for threads: {thread_urls}") + # self.log.debug(f"Getting information for threads: {thread_urls}") responses = await self.api_call(thread_urls) - self.log.debug(f"Got information for threads: {thread_urls}") + self.log.debug(f"Got information for {len(responses)} threads") + all_posts = {} for mapped, response in responses: if not response: continue board, thread = mapped - self.log.debug(f"Got thread content for thread {thread} on board {board}") all_posts[mapped] = response["posts"] - # Split into 10,000 chunks if not all_posts: return await self.handle_posts(all_posts) - # threads_per_core = int(len(all_posts) / CPU_THREADS) - # for i in range(CPU_THREADS): - # new_dict = {} - # pulled_posts = self.take_items(all_posts, threads_per_core) - # for k, v in pulled_posts: - # if k in new_dict: - # new_dict[k].append(v) - # else: - # new_dict[k] = [v] - # await self.handle_posts_thread(new_dict) - - # print("VAL", ceil(len(all_posts) / threads_per_core)) - # split_posts = array_split(all_posts, ceil(len(all_posts) / threads_per_core)) - # print("THREADS PER CORE SPLIT", len(split_posts)) - # # print("SPLIT CHUNK", len(split_posts)) - # for posts in split_posts: - # print("SPAWNED THREAD TO PROCESS", len(posts), "POSTS") - # await self.handle_posts_thread(posts) - - # await self.handle_posts_thread(all_posts) - - @asyncio.coroutine - def handle_posts_thread(self, posts): - loop = asyncio.get_event_loop() - yield from loop.run_in_executor(p, self.handle_posts, posts) async def handle_posts(self, posts): to_store = [] @@ -170,50 +134,13 @@ class Chan4(object): for index, post in enumerate(post_list): posts[key][index]["type"] = "msg" - # # Calculate hash for post - # post_normalised = ujson.dumps(post, sort_keys=True) - # hash = siphash(self.hash_key, post_normalised) - # hash = str(hash) - # redis_key = f"cache.{board}.{thread}.{post['no']}" - # key_content = db.r.get(redis_key) - # if key_content: - # key_content = key_content.decode("ascii") - # if key_content == hash: - # continue - # else: - # posts[key][index]["type"] = "update" - # #db.r.set(redis_key, hash) - - # for key2, value in list(post.items()): - # if key2 in ATTRMAP: - # post[ATTRMAP[key2]] = posts[key][index][key2] - # del posts[key][index][key2] - # if "ts" in post: - # old_time = posts[key][index]["ts"] - # # '08/30/22(Tue)02:25:37' - # time_spl = old_time.split(":") - # if len(time_spl) == 3: - # old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S") - # else: - # old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M") - # # new_ts = old_ts.isoformat() - # new_ts = int(old_ts.timestamp()) - # posts[key][index]["ts"] = new_ts - # if "msg" in post: - # soup = BeautifulSoup(posts[key][index]["msg"], "html.parser") - # msg = soup.get_text(separator="\n") - # posts[key][index]["msg"] = msg - posts[key][index]["src"] = "4ch" posts[key][index]["net"] = board posts[key][index]["channel"] = thread to_store.append(posts[key][index]) - # print({name_map[name]: val for name, val in post.items()}) - # print(f"Got posts: {len(posts)}") if to_store: - print("STORING", len(to_store)) await db.queue_message_bulk(to_store) async def fetch(self, url, session, mapped): @@ -238,7 +165,7 @@ class Chan4(object): async with aiohttp.ClientSession(connector=connector) as session: for mapped, method in methods.items(): url = f"{self.api_endpoint}/{method}" - self.log.debug(f"GET {url}") + # self.log.debug(f"GET {url}") task = asyncio.create_task(self.bound_fetch(sem, url, session, mapped)) # task = asyncio.ensure_future(self.bound_fetch(sem, url, session)) tasks.append(task) diff --git a/sources/ingest.py b/sources/ingest.py index be7d6ca..017b8db 100644 --- a/sources/ingest.py +++ b/sources/ingest.py @@ -1,6 +1,6 @@ import asyncio -import ujson +import orjson import db import util @@ -8,9 +8,13 @@ from processing import process SOURCES = ["4ch", "irc", "dis"] KEYPREFIX = "queue." -CHUNK_SIZE = 90000 + +# Chunk size per source (divide by len(SOURCES) for total) +CHUNK_SIZE = 9000 ITER_DELAY = 0.5 +log = util.get_logger("ingest") + class Ingest(object): def __init__(self): @@ -18,8 +22,6 @@ class Ingest(object): self.log = util.get_logger(name) async def run(self): - # items = [{'no': 23567753, 'now': '09/12/22(Mon)20:10:29', 'name': 'Anonysmous', 'filename': '1644986767568', 'ext': '.webm', 'w': 1280, 'h': 720, 'tn_w': 125, 'tn_h': 70, 'tim': 1663027829301457, 'time': 1663027829, 'md5': 'zeElr1VR05XpZ2XuAPhmPA==', 'fsize': 3843621, 'resto': 23554700, 'type': 'msg', 'src': '4ch', 'net': 'gif', 'channel': '23554700'}] - # await process.spawn_processing_threads(items) while True: await self.get_chunk() await asyncio.sleep(ITER_DELAY) @@ -31,11 +33,8 @@ class Ingest(object): chunk = await db.ar.spop(key, CHUNK_SIZE) if not chunk: continue - # self.log.info(f"Got chunk: {chunk}") for item in chunk: - item = ujson.loads(item) - # self.log.info(f"Got item: {item}") + item = orjson.loads(item) items.append(item) if items: - print("PROCESSING", len(items)) await process.spawn_processing_threads(items) diff --git a/util.py b/util.py index 09f98da..045c95f 100644 --- a/util.py +++ b/util.py @@ -3,7 +3,7 @@ import logging log = logging.getLogger("util") -debug = False +debug = True # Color definitions BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)