From 79a430be04345a04bd818964b3190bf1f0516539 Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Thu, 8 Sep 2022 07:20:30 +0100 Subject: [PATCH 1/5] Begin implementing Apache Druid --- db.py | 12 ++-- docker-compose.yml | 167 +++++++++++++++++++++++++++++++++++++-------- environment | 52 ++++++++++++++ 3 files changed, 198 insertions(+), 33 deletions(-) create mode 100644 environment diff --git a/db.py b/db.py index 58cb9a5..710e7d3 100644 --- a/db.py +++ b/db.py @@ -72,7 +72,8 @@ def store_message(msg): # print(body_post) try: # Bulk index operations - api_response = api_instance.bulk(body_post) # , async_req=True + print("FAKE POST") + #api_response = api_instance.bulk(body_post) # , async_req=True # print(api_response) except ApiException as e: print("Exception when calling IndexApi->bulk: %s\n" % e) @@ -124,8 +125,9 @@ def store_message_bulk(data): # print(body_post) try: # Bulk index operations - api_response = api_instance.bulk(body_post) # , async_req=True - print(api_response) + print("FAKE POST") + #api_response = api_instance.bulk(body_post) # , async_req=True + #print(api_response) except ApiException as e: print("Exception when calling IndexApi->bulk: %s\n" % e) print("ATTEMPT", body_post) @@ -152,5 +154,5 @@ def create_index(api_client): util_instance.sql(create_query) -create_index(api_client) -update_schema() +#create_index(api_client) +#update_schema() diff --git a/docker-compose.yml b/docker-compose.yml index d3b25f7..e73c970 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,14 @@ -version: "2" - +version: "2.2" + +volumes: + metadata_data: {} + middle_var: {} + historical_var: {} + broker_var: {} + coordinator_var: {} + router_var: {} + druid_shared: {} + services: app: image: pathogen/monolith:latest @@ -10,8 +19,8 @@ services: - .env volumes_from: - tmp - depends_on: - - db + # depends_on: + # - db threshold: image: pathogen/threshold:latest @@ -36,31 +45,133 @@ services: - tmp - redis - db: - #image: pathogen/manticore:kibana - image: manticoresearch/manticore:dev - #build: - # context: ./docker/manticore - # args: - # DEV: 1 - restart: always - ports: - - 9308 - - 9312 - - 9306 - ulimits: - nproc: 65535 - nofile: - soft: 65535 - hard: 65535 - memlock: - soft: -1 - hard: -1 - environment: - - MCL=1 + postgres: + container_name: postgres + image: postgres:latest volumes: - - ./docker/data:/var/lib/manticore - - ./docker/manticore.conf:/etc/manticoresearch/manticore.conf + - metadata_data:/var/lib/postgresql/data + environment: + - POSTGRES_PASSWORD=FoolishPassword + - POSTGRES_USER=druid + - POSTGRES_DB=druid + + # Need 3.5 or later for container nodes + zookeeper: + container_name: zookeeper + image: zookeeper:3.5 + ports: + - "2181:2181" + environment: + - ZOO_MY_ID=1 + + coordinator: + image: apache/druid:0.23.0 + container_name: coordinator + volumes: + - druid_shared:/opt/shared + - coordinator_var:/opt/druid/var + depends_on: + - zookeeper + - postgres + ports: + - "8081:8081" + command: + - coordinator + env_file: + - environment + + broker: + image: apache/druid:0.23.0 + container_name: broker + volumes: + - broker_var:/opt/druid/var + depends_on: + - zookeeper + - postgres + - coordinator + ports: + - "8082:8082" + command: + - broker + env_file: + - environment + + historical: + image: apache/druid:0.23.0 + container_name: historical + volumes: + - druid_shared:/opt/shared + - historical_var:/opt/druid/var + depends_on: + - zookeeper + - postgres + - coordinator + ports: + - "8083:8083" + command: + - historical + env_file: + - environment + + middlemanager: + image: apache/druid:0.23.0 + container_name: middlemanager + volumes: + - druid_shared:/opt/shared + - middle_var:/opt/druid/var + depends_on: + - zookeeper + - postgres + - coordinator + ports: + - "8091:8091" + - "8100-8105:8100-8105" + command: + - middleManager + env_file: + - environment + + router: + image: apache/druid:0.23.0 + container_name: router + volumes: + - router_var:/opt/druid/var + depends_on: + - zookeeper + - postgres + - coordinator + ports: + - "8888:8888" + command: + - router + env_file: + - environment + + # db: + # #image: pathogen/manticore:kibana + # image: manticoresearch/manticore:dev + # #build: + # # context: ./docker/manticore + # # args: + # # DEV: 1 + # restart: always + # ports: + # - 9308 + # - 9312 + # - 9306 + # ulimits: + # nproc: 65535 + # nofile: + # soft: 65535 + # hard: 65535 + # memlock: + # soft: -1 + # hard: -1 + # environment: + # - MCL=1 + # volumes: + # - ./docker/data:/var/lib/manticore + # - ./docker/manticore.conf:/etc/manticoresearch/manticore.conf tmp: image: busybox diff --git a/environment b/environment new file mode 100644 index 0000000..7bb9214 --- /dev/null +++ b/environment @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Java tuning +DRUID_XMX=1g +DRUID_XMS=1g +DRUID_MAXNEWSIZE=250m +DRUID_NEWSIZE=250m +DRUID_MAXDIRECTMEMORYSIZE=6172m + +druid_emitter_logging_logLevel=debug + +druid_extensions_loadList=["druid-histogram", "druid-datasketches", "druid-lookups-cached-global", "postgresql-metadata-storage"] + +druid_zk_service_host=zookeeper + +druid_metadata_storage_host= +druid_metadata_storage_type=postgresql +druid_metadata_storage_connector_connectURI=jdbc:postgresql://postgres:5432/druid +druid_metadata_storage_connector_user=druid +druid_metadata_storage_connector_password=FoolishPassword + +druid_coordinator_balancer_strategy=cachingCost + +druid_indexer_runner_javaOptsArray=["-server", "-Xmx1g", "-Xms1g", "-XX:MaxDirectMemorySize=3g", "-Duser.timezone=UTC", "-Dfile.encoding=UTF-8", "-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"] +druid_indexer_fork_property_druid_processing_buffer_sizeBytes=256MiB + +druid_storage_type=local +druid_storage_storageDirectory=/opt/shared/segments +druid_indexer_logs_type=file +druid_indexer_logs_directory=/opt/shared/indexing-logs + +druid_processing_numThreads=2 +druid_processing_numMergeBuffers=2 + +DRUID_LOG4J= From 3c2adfc16e0307b9585ca82756ae35e583783a14 Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Tue, 13 Sep 2022 22:17:32 +0100 Subject: [PATCH 2/5] Implement Apache Druid/Kafka and Metabase --- docker-compose.yml | 30 ++++++++++++++++++++++++++++++ docker/requirements.txt | 1 + environment | 2 +- requirements.txt | 1 + 4 files changed, 33 insertions(+), 1 deletion(-) diff --git a/docker-compose.yml b/docker-compose.yml index e73c970..8d15443 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -45,6 +45,20 @@ services: - tmp - redis + turnilo: + container_name: turnilo + image: uchhatre/turnilo:latest + ports: + - 9093:9090 + environment: + - DRUID_BROKER_URL=http://broker:8082 + + metabase: + container_name: metabase + image: metabase/metabase:latest + ports: + - 3001:3000 + postgres: container_name: postgres image: postgres:latest @@ -64,6 +78,22 @@ services: environment: - ZOO_MY_ID=1 + kafka: + image: bitnami/kafka + depends_on: + - zookeeper + ports: + - 29092:29092 + - 9092:9092 + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 + #KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + ALLOW_PLAINTEXT_LISTENER: yes + coordinator: image: apache/druid:0.23.0 container_name: coordinator diff --git a/docker/requirements.txt b/docker/requirements.txt index 8ec92ae..137542a 100644 --- a/docker/requirements.txt +++ b/docker/requirements.txt @@ -8,3 +8,4 @@ manticoresearch numpy ujson aioredis[hiredis] +aiokafka diff --git a/environment b/environment index 7bb9214..8c4e756 100644 --- a/environment +++ b/environment @@ -26,7 +26,7 @@ DRUID_MAXDIRECTMEMORYSIZE=6172m druid_emitter_logging_logLevel=debug -druid_extensions_loadList=["druid-histogram", "druid-datasketches", "druid-lookups-cached-global", "postgresql-metadata-storage"] +druid_extensions_loadList=["druid-histogram", "druid-datasketches", "druid-lookups-cached-global", "postgresql-metadata-storage", "druid-kafka-indexing-service"] druid_zk_service_host=zookeeper diff --git a/requirements.txt b/requirements.txt index 860086f..10ad22d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,4 @@ manticoresearch numpy ujson aioredis[hiredis] +aiokafka From fec0d379a6fe3103941789d9d21b4e79e7f1b24a Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Tue, 13 Sep 2022 22:17:46 +0100 Subject: [PATCH 3/5] Ingest into Kafka and queue messages better --- db.py | 291 +++++++++++++++++++++++++++-------------- monolith.py | 1 + processing/__init__.py | 0 processing/process.py | 106 +++++++++++++++ sources/ch4.py | 102 ++++++++------- sources/dis.py | 2 +- sources/ingest.py | 21 ++- 7 files changed, 366 insertions(+), 157 deletions(-) create mode 100644 processing/__init__.py create mode 100644 processing/process.py diff --git a/db.py b/db.py index 710e7d3..88676ee 100644 --- a/db.py +++ b/db.py @@ -8,12 +8,21 @@ from numpy import array_split from redis import StrictRedis import util +import random +from aiokafka import AIOKafkaProducer + +# Manticore schema from schemas import mc_s +# Manticore configuration = manticoresearch.Configuration(host="http://monolith-db-1:9308") api_client = manticoresearch.ApiClient(configuration) api_instance = manticoresearch.IndexApi(api_client) +# Kafka +from aiokafka import AIOKafkaProducer +KAFKA_TOPIC = "msg" + log = util.get_logger("db") # Redis (legacy) @@ -37,121 +46,201 @@ TYPES_MAIN = [ ] TYPES_META = ["who"] TYPES_INT = ["conn", "highlight", "znc", "query", "self"] +KEYPREFIX = "queue." -def store_message(msg): +async def store_kafka_batch(data): + print("STORING KAFKA BATCH") + producer = AIOKafkaProducer(bootstrap_servers='kafka:9092') + await producer.start() + batch = producer.create_batch() + for msg in data: + if msg["type"] in TYPES_MAIN: + index = "main" + schema = mc_s.schema_main + elif msg["type"] in TYPES_META: + index = "meta" + schema = mc_s.schema_meta + elif msg["type"] in TYPES_INT: + index = "internal" + schema = mc_s.schema_int + # normalise fields + for key, value in list(msg.items()): + if value is None: + del msg[key] + if key in schema: + if isinstance(value, int): + if schema[key].startswith("string") or schema[key].startswith("text"): + msg[key] = str(value) + message = ujson.dumps(msg) + body = str.encode(message) + metadata = batch.append(key=None, value=body, timestamp=msg["ts"]) + if metadata is None: + partitions = await producer.partitions_for(KAFKA_TOPIC) + partition = random.choice(tuple(partitions)) + await producer.send_batch(batch, KAFKA_TOPIC, partition=partition) + print("%d messages sent to partition %d" + % (batch.record_count(), partition)) + batch = producer.create_batch() + continue + + partitions = await producer.partitions_for(KAFKA_TOPIC) + partition = random.choice(tuple(partitions)) + await producer.send_batch(batch, KAFKA_TOPIC, partition=partition) + print("%d messages sent to partition %d" + % (batch.record_count(), partition)) + await producer.stop() + +# def store_message(msg): +# """ +# Store a message into Manticore +# :param msg: dict +# """ +# store_kafka(msg) + # # Duplicated to avoid extra function call + # if msg["type"] in TYPES_MAIN: + # index = "main" + # schema = mc_s.schema_main + # elif msg["type"] in TYPES_META: + # index = "meta" + # schema = mc_s.schema_meta + # elif msg["type"] in TYPES_INT: + # index = "internal" + # schema = mc_s.schema_int + # # normalise fields + # for key, value in list(msg.items()): + # if value is None: + # del msg[key] + # if key in schema: + # if isinstance(value, int): + # if schema[key].startswith("string") or schema[key].startswith("text"): + # msg[key] = str(value) + + # body = [{"insert": {"index": index, "doc": msg}}] + # body_post = "" + # for item in body: + # body_post += ujson.dumps(item) + # body_post += "\n" + + # # print(body_post) + # try: + # # Bulk index operations + # print("FAKE POST") + # #api_response = api_instance.bulk(body_post) # , async_req=True + # # print(api_response) + # except ApiException as e: + # print("Exception when calling IndexApi->bulk: %s\n" % e) + # print("ATTEMPT", body_post) + +async def queue_message(msg): """ - Store a message into Manticore - :param msg: dict + Queue a message on the Redis buffer. """ - # Duplicated to avoid extra function call - if msg["type"] in TYPES_MAIN: - index = "main" - schema = mc_s.schema_main - elif msg["type"] in TYPES_META: - index = "meta" - schema = mc_s.schema_meta - elif msg["type"] in TYPES_INT: - index = "internal" - schema = mc_s.schema_int - # normalise fields - for key, value in list(msg.items()): - if value is None: - del msg[key] - if key in schema: - if isinstance(value, int): - if schema[key].startswith("string") or schema[key].startswith("text"): - msg[key] = str(value) + src = msg["src"] + message = ujson.dumps(msg) - body = [{"insert": {"index": index, "doc": msg}}] - body_post = "" - for item in body: - body_post += ujson.dumps(item) - body_post += "\n" + key = "{KEYPREFIX}{src}" + await ar.sadd(key, message) - # print(body_post) - try: - # Bulk index operations - print("FAKE POST") - #api_response = api_instance.bulk(body_post) # , async_req=True - # print(api_response) - except ApiException as e: - print("Exception when calling IndexApi->bulk: %s\n" % e) - print("ATTEMPT", body_post) - - -def store_message_bulk(data): +async def queue_message_bulk(data): """ - Store a message into Manticore - :param msg: dict + Queue multiple messages on the Redis buffer. """ - if not data: - return - # 10000: maximum inserts we can submit to - # Manticore as of Sept 2022 - split_posts = array_split(data, ceil(len(data) / 10000)) - for messages in split_posts: - total = [] - for msg in messages: - # Duplicated to avoid extra function call (see above) - if msg["type"] in TYPES_MAIN: - index = "main" - schema = mc_s.schema_main - elif msg["type"] in TYPES_META: - index = "meta" - schema = mc_s.schema_meta - elif msg["type"] in TYPES_INT: - index = "internal" - schema = mc_s.schema_int - # normalise fields - for key, value in list(msg.items()): - if value is None: - del msg[key] - if key in schema: - if isinstance(value, int): - if schema[key].startswith("string") or schema[key].startswith( - "text" - ): - msg[key] = str(value) + for msg in data: + src = msg["src"] + message = ujson.dumps(msg) - body = {"insert": {"index": index, "doc": msg}} - total.append(body) - - body_post = "" - for item in total: - body_post += ujson.dumps(item) - body_post += "\n" - - # print(body_post) - try: - # Bulk index operations - print("FAKE POST") - #api_response = api_instance.bulk(body_post) # , async_req=True - #print(api_response) - except ApiException as e: - print("Exception when calling IndexApi->bulk: %s\n" % e) - print("ATTEMPT", body_post) + key = "{KEYPREFIX}{src}" + await ar.sadd(key, message) -def update_schema(): - pass +# For now, make a normal function until we go full async +def queue_message_bulk_sync(data): + """ + Queue multiple messages on the Redis buffer. + """ + for msg in data: + src = msg["src"] + message = ujson.dumps(msg) + + key = "{KEYPREFIX}{src}" + r.sadd(key, message) -def create_index(api_client): - util_instance = manticoresearch.UtilsApi(api_client) - schemas = { - "main": mc_s.schema_main, - "meta": mc_s.schema_meta, - "internal": mc_s.schema_int, - } - for name, schema in schemas.items(): - schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()]) +# def store_message_bulk(data): +# """ +# Store a message into Manticore +# :param msg: dict +# """ +# if not data: +# return +# for msg in data: +# store_kafka(msg) + # # 10000: maximum inserts we can submit to + # # Manticore as of Sept 2022 + # split_posts = array_split(data, ceil(len(data) / 10000)) + # for messages in split_posts: + # total = [] + # for msg in messages: + # # Duplicated to avoid extra function call (see above) + # if msg["type"] in TYPES_MAIN: + # index = "main" + # schema = mc_s.schema_main + # elif msg["type"] in TYPES_META: + # index = "meta" + # schema = mc_s.schema_meta + # elif msg["type"] in TYPES_INT: + # index = "internal" + # schema = mc_s.schema_int + # # normalise fields + # for key, value in list(msg.items()): + # if value is None: + # del msg[key] + # if key in schema: + # if isinstance(value, int): + # if schema[key].startswith("string") or schema[key].startswith( + # "text" + # ): + # msg[key] = str(value) - create_query = ( - f"create table if not exists {name}({schema_types}) engine='columnar'" - ) - print("Schema types", create_query) - util_instance.sql(create_query) + # body = {"insert": {"index": index, "doc": msg}} + # total.append(body) + + # body_post = "" + # for item in total: + # body_post += ujson.dumps(item) + # body_post += "\n" + + # # print(body_post) + # try: + # # Bulk index operations + # print("FAKE POST") + # #api_response = api_instance.bulk(body_post) # , async_req=True + # #print(api_response) + # except ApiException as e: + # print("Exception when calling IndexApi->bulk: %s\n" % e) + # print("ATTEMPT", body_post) + + +# def update_schema(): +# pass + + +# def create_index(api_client): +# util_instance = manticoresearch.UtilsApi(api_client) +# schemas = { +# "main": mc_s.schema_main, +# "meta": mc_s.schema_meta, +# "internal": mc_s.schema_int, +# } +# for name, schema in schemas.items(): +# schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()]) + +# create_query = ( +# f"create table if not exists {name}({schema_types}) engine='columnar'" +# ) +# print("Schema types", create_query) +# util_instance.sql(create_query) #create_index(api_client) diff --git a/monolith.py b/monolith.py index fbead0a..ce4c692 100644 --- a/monolith.py +++ b/monolith.py @@ -5,6 +5,7 @@ import util from sources.ch4 import Chan4 from sources.dis import DiscordClient from sources.ingest import Ingest +import db # For development # if not getenv("DISCORD_TOKEN", None): diff --git a/processing/__init__.py b/processing/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/processing/process.py b/processing/process.py new file mode 100644 index 0000000..ee45ebc --- /dev/null +++ b/processing/process.py @@ -0,0 +1,106 @@ +from concurrent.futures import ProcessPoolExecutor +import asyncio +import os +import ujson +from siphashc import siphash + +import db +import util + +# 4chan schema +from schemas.ch4_s import ATTRMAP + +# For key generation +import string +import random + +# For timestamp processing +import datetime + +# For 4chan message parsing +from bs4 import BeautifulSoup + +from numpy import array_split +from math import ceil + +log = util.get_logger("process") + +# Maximum number of CPU threads to use for post processing +CPU_THREADS = os.cpu_count() + +p = ProcessPoolExecutor(CPU_THREADS) + +def get_hash_key(): + hash_key = db.r.get("hashing_key") + if not hash_key: + letters = string.ascii_lowercase + hash_key = "".join(random.choice(letters) for i in range(16)) + log.debug(f"Created new hash key: {hash_key}") + db.r.set("hashing_key", hash_key) + else: + hash_key = hash_key.decode("ascii") + log.debug(f"Decoded hash key: {hash_key}") + return hash_key + +hash_key = get_hash_key() + +async def spawn_processing_threads(data): + print("SPAWN", data) + if len(data) < CPU_THREADS: + split_data = [data] + else: + msg_per_core = int(len(data) / CPU_THREADS) + print("MSG PER CORE", msg_per_core) + split_data = array_split(data, ceil(len(data) / msg_per_core)) + print("SPLIT DATA", split_data) + for split in split_data: + print("DELEGATING TO THREAD", len(split)) + await process_data_thread(split) + +@asyncio.coroutine +def process_data_thread(data): + """ + Helper to spawn threads to process a list of data. + """ + loop = asyncio.get_event_loop() + yield from loop.run_in_executor(p, process_data, data) + +def process_data(data): + print("PROCESSING DATA", data) + for index, msg in enumerate(data): + #print("PROCESSING", msg) + if msg["src"] == "4ch": + board = msg["net"] + thread = msg["channel"] + # Calculate hash for post + post_normalised = ujson.dumps(msg, sort_keys=True) + hash = siphash(hash_key, post_normalised) + hash = str(hash) + redis_key = f"cache.{board}.{thread}.{msg['no']}" + key_content = db.r.get(redis_key) + if key_content: + key_content = key_content.decode("ascii") + if key_content == hash: + continue + else: + data[index][index]["type"] = "update" + db.r.set(redis_key, hash) + for key2, value in list(msg.items()): + if key2 in ATTRMAP: + msg[ATTRMAP[key2]] = data[index][key2] + del data[index][key2] + if "ts" in msg: + old_time = data[index]["ts"] + # '08/30/22(Tue)02:25:37' + time_spl = old_time.split(":") + if len(time_spl) == 3: + old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S") + else: + old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M") + # new_ts = old_ts.isoformat() + new_ts = int(old_ts.timestamp()) + data[index]["ts"] = new_ts + if "msg" in msg: + soup = BeautifulSoup(data[index]["msg"], "html.parser") + msg = soup.get_text(separator="\n") + data[index]["msg"] = msg \ No newline at end of file diff --git a/sources/ch4.py b/sources/ch4.py index 54038ea..7fad264 100644 --- a/sources/ch4.py +++ b/sources/ch4.py @@ -19,19 +19,19 @@ from schemas.ch4_s import ATTRMAP # CONFIGURATION # # Number of 4chan threads to request at once -THREADS_CONCURRENT = 100 +THREADS_CONCURRENT = 1000 # Seconds to wait between every THREADS_CONCURRENT requests -THREADS_DELAY = 0.8 +THREADS_DELAY = 0.1 # Seconds to wait between crawls CRAWL_DELAY = 5 # Semaphore value ? -THREADS_SEMAPHORE = 100 +THREADS_SEMAPHORE = 1000 # Maximum number of CPU threads to use for post processing -CPU_THREADS = 1 +CPU_THREADS = 8 # CONFIGURATION END # @@ -95,7 +95,7 @@ class Chan4(object): no = threads["no"] to_get.append((mapped, no)) - self.log.info(f"Got thread list for {mapped}: {len(response)}") + self.log.debug(f"Got thread list for {mapped}: {len(response)}") if not to_get: return split_threads = array_split(to_get, ceil(len(to_get) / THREADS_CONCURRENT)) @@ -136,16 +136,19 @@ class Chan4(object): # Split into 10,000 chunks if not all_posts: return - threads_per_core = int(len(all_posts) / CPU_THREADS) - for i in range(CPU_THREADS): - new_dict = {} - pulled_posts = self.take_items(all_posts, threads_per_core) - for k, v in pulled_posts: - if k in new_dict: - new_dict[k].append(v) - else: - new_dict[k] = [v] - await self.handle_posts_thread(new_dict) + self.handle_posts(all_posts) + # threads_per_core = int(len(all_posts) / CPU_THREADS) + # for i in range(CPU_THREADS): + # new_dict = {} + # pulled_posts = self.take_items(all_posts, threads_per_core) + # for k, v in pulled_posts: + # if k in new_dict: + # new_dict[k].append(v) + # else: + # new_dict[k] = [v] + #await self.handle_posts_thread(new_dict) + + # print("VAL", ceil(len(all_posts) / threads_per_core)) # split_posts = array_split(all_posts, ceil(len(all_posts) / threads_per_core)) # print("THREADS PER CORE SPLIT", len(split_posts)) @@ -161,46 +164,46 @@ class Chan4(object): loop = asyncio.get_event_loop() yield from loop.run_in_executor(p, self.handle_posts, posts) - def handle_posts(self, posts): + async def handle_posts(self, posts): to_store = [] for key, post_list in posts.items(): board, thread = key for index, post in enumerate(post_list): posts[key][index]["type"] = "msg" - # Calculate hash for post - post_normalised = ujson.dumps(post, sort_keys=True) - hash = siphash(self.hash_key, post_normalised) - hash = str(hash) - redis_key = f"cache.{board}.{thread}.{post['no']}" - key_content = db.r.get(redis_key) - if key_content: - key_content = key_content.decode("ascii") - if key_content == hash: - continue - else: - posts[key][index]["type"] = "update" - db.r.set(redis_key, hash) + # # Calculate hash for post + # post_normalised = ujson.dumps(post, sort_keys=True) + # hash = siphash(self.hash_key, post_normalised) + # hash = str(hash) + # redis_key = f"cache.{board}.{thread}.{post['no']}" + # key_content = db.r.get(redis_key) + # if key_content: + # key_content = key_content.decode("ascii") + # if key_content == hash: + # continue + # else: + # posts[key][index]["type"] = "update" + # #db.r.set(redis_key, hash) - for key2, value in list(post.items()): - if key2 in ATTRMAP: - post[ATTRMAP[key2]] = posts[key][index][key2] - del posts[key][index][key2] - if "ts" in post: - old_time = posts[key][index]["ts"] - # '08/30/22(Tue)02:25:37' - time_spl = old_time.split(":") - if len(time_spl) == 3: - old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S") - else: - old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M") - # new_ts = old_ts.isoformat() - new_ts = int(old_ts.timestamp()) - posts[key][index]["ts"] = new_ts - if "msg" in post: - soup = BeautifulSoup(posts[key][index]["msg"], "html.parser") - msg = soup.get_text(separator="\n") - posts[key][index]["msg"] = msg + # for key2, value in list(post.items()): + # if key2 in ATTRMAP: + # post[ATTRMAP[key2]] = posts[key][index][key2] + # del posts[key][index][key2] + # if "ts" in post: + # old_time = posts[key][index]["ts"] + # # '08/30/22(Tue)02:25:37' + # time_spl = old_time.split(":") + # if len(time_spl) == 3: + # old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S") + # else: + # old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M") + # # new_ts = old_ts.isoformat() + # new_ts = int(old_ts.timestamp()) + # posts[key][index]["ts"] = new_ts + # if "msg" in post: + # soup = BeautifulSoup(posts[key][index]["msg"], "html.parser") + # msg = soup.get_text(separator="\n") + # posts[key][index]["msg"] = msg posts[key][index]["src"] = "4ch" posts[key][index]["net"] = board @@ -211,7 +214,8 @@ class Chan4(object): # print({name_map[name]: val for name, val in post.items()}) # print(f"Got posts: {len(posts)}") if to_store: - db.store_message_bulk(to_store) + print("STORING", len(to_store)) + await db.queue_message_bulk(to_store) async def fetch(self, url, session, mapped): async with session.get(url) as response: diff --git a/sources/dis.py b/sources/dis.py index 0b8b2f6..c187cf7 100644 --- a/sources/dis.py +++ b/sources/dis.py @@ -41,4 +41,4 @@ class DiscordClient(discord.Client): a["type"] = "msg" a["src"] = "dis" - db.store_message(a) + await db.queue_message(a) diff --git a/sources/ingest.py b/sources/ingest.py index 083a085..3132797 100644 --- a/sources/ingest.py +++ b/sources/ingest.py @@ -5,12 +5,17 @@ import ujson import db import util -SOURCES = ["irc"] +from processing import process + +SOURCES = ["irc", "dis", "4ch"] KEYPREFIX = "queue." CHUNK_SIZE = 1000 ITER_DELAY = 0.5 + + + class Ingest(object): def __init__(self): name = self.__class__.__name__ @@ -18,19 +23,23 @@ class Ingest(object): async def run(self): while True: - await self.process_chunk() + await self.get_chunk() await asyncio.sleep(ITER_DELAY) - async def process_chunk(self): + async def get_chunk(self): items = [] for source in SOURCES: key = f"{KEYPREFIX}{source}" chunk = await db.ar.spop(key, CHUNK_SIZE) if not chunk: continue - self.log.info(f"Got chunk: {chunk}") + #self.log.info(f"Got chunk: {chunk}") for item in chunk: item = ujson.loads(item) - self.log.info(f"Got item: {item}") + #self.log.info(f"Got item: {item}") items.append(item) - db.store_message_bulk(items) + if items: + print("PROCESSING", len(items)) + await process.spawn_processing_threads(items) + print("DONE WITH PROCESSING", len(items)) + await db.store_kafka_batch(items) From 4ea77ac543eb531216f6cf626e681037b3e132fd Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Wed, 14 Sep 2022 18:32:32 +0100 Subject: [PATCH 4/5] Properly process Redis buffered messages and ingest into Kafka --- db.py | 179 ++++++++++++++++++++++-------------------- docker-compose.yml | 11 ++- monolith.py | 3 +- processing/process.py | 109 +++++++++++++++++-------- sources/ch4.py | 5 +- sources/ingest.py | 16 ++-- 6 files changed, 190 insertions(+), 133 deletions(-) diff --git a/db.py b/db.py index 88676ee..4c92555 100644 --- a/db.py +++ b/db.py @@ -1,15 +1,15 @@ +import random from math import ceil import aioredis import manticoresearch import ujson +from aiokafka import AIOKafkaProducer from manticoresearch.rest import ApiException from numpy import array_split from redis import StrictRedis import util -import random -from aiokafka import AIOKafkaProducer # Manticore schema from schemas import mc_s @@ -21,6 +21,7 @@ api_instance = manticoresearch.IndexApi(api_client) # Kafka from aiokafka import AIOKafkaProducer + KAFKA_TOPIC = "msg" log = util.get_logger("db") @@ -51,7 +52,7 @@ KEYPREFIX = "queue." async def store_kafka_batch(data): print("STORING KAFKA BATCH") - producer = AIOKafkaProducer(bootstrap_servers='kafka:9092') + producer = AIOKafkaProducer(bootstrap_servers="kafka:9092") await producer.start() batch = producer.create_batch() for msg in data: @@ -70,67 +71,74 @@ async def store_kafka_batch(data): del msg[key] if key in schema: if isinstance(value, int): - if schema[key].startswith("string") or schema[key].startswith("text"): + if schema[key].startswith("string") or schema[key].startswith( + "text" + ): msg[key] = str(value) message = ujson.dumps(msg) body = str.encode(message) + if "ts" not in msg: + # print("MSG WITHOUT TS", msg) + continue metadata = batch.append(key=None, value=body, timestamp=msg["ts"]) if metadata is None: partitions = await producer.partitions_for(KAFKA_TOPIC) partition = random.choice(tuple(partitions)) await producer.send_batch(batch, KAFKA_TOPIC, partition=partition) - print("%d messages sent to partition %d" - % (batch.record_count(), partition)) + print( + "%d messages sent to partition %d" % (batch.record_count(), partition) + ) batch = producer.create_batch() continue partitions = await producer.partitions_for(KAFKA_TOPIC) partition = random.choice(tuple(partitions)) await producer.send_batch(batch, KAFKA_TOPIC, partition=partition) - print("%d messages sent to partition %d" - % (batch.record_count(), partition)) + print("%d messages sent to partition %d" % (batch.record_count(), partition)) await producer.stop() + # def store_message(msg): # """ # Store a message into Manticore # :param msg: dict # """ # store_kafka(msg) - # # Duplicated to avoid extra function call - # if msg["type"] in TYPES_MAIN: - # index = "main" - # schema = mc_s.schema_main - # elif msg["type"] in TYPES_META: - # index = "meta" - # schema = mc_s.schema_meta - # elif msg["type"] in TYPES_INT: - # index = "internal" - # schema = mc_s.schema_int - # # normalise fields - # for key, value in list(msg.items()): - # if value is None: - # del msg[key] - # if key in schema: - # if isinstance(value, int): - # if schema[key].startswith("string") or schema[key].startswith("text"): - # msg[key] = str(value) +# # Duplicated to avoid extra function call +# if msg["type"] in TYPES_MAIN: +# index = "main" +# schema = mc_s.schema_main +# elif msg["type"] in TYPES_META: +# index = "meta" +# schema = mc_s.schema_meta +# elif msg["type"] in TYPES_INT: +# index = "internal" +# schema = mc_s.schema_int +# # normalise fields +# for key, value in list(msg.items()): +# if value is None: +# del msg[key] +# if key in schema: +# if isinstance(value, int): +# if schema[key].startswith("string") or schema[key].startswith("text"): +# msg[key] = str(value) - # body = [{"insert": {"index": index, "doc": msg}}] - # body_post = "" - # for item in body: - # body_post += ujson.dumps(item) - # body_post += "\n" +# body = [{"insert": {"index": index, "doc": msg}}] +# body_post = "" +# for item in body: +# body_post += ujson.dumps(item) +# body_post += "\n" + +# # print(body_post) +# try: +# # Bulk index operations +# print("FAKE POST") +# #api_response = api_instance.bulk(body_post) # , async_req=True +# # print(api_response) +# except ApiException as e: +# print("Exception when calling IndexApi->bulk: %s\n" % e) +# print("ATTEMPT", body_post) - # # print(body_post) - # try: - # # Bulk index operations - # print("FAKE POST") - # #api_response = api_instance.bulk(body_post) # , async_req=True - # # print(api_response) - # except ApiException as e: - # print("Exception when calling IndexApi->bulk: %s\n" % e) - # print("ATTEMPT", body_post) async def queue_message(msg): """ @@ -139,9 +147,10 @@ async def queue_message(msg): src = msg["src"] message = ujson.dumps(msg) - key = "{KEYPREFIX}{src}" + key = f"{KEYPREFIX}{src}" await ar.sadd(key, message) + async def queue_message_bulk(data): """ Queue multiple messages on the Redis buffer. @@ -150,7 +159,7 @@ async def queue_message_bulk(data): src = msg["src"] message = ujson.dumps(msg) - key = "{KEYPREFIX}{src}" + key = f"{KEYPREFIX}{src}" await ar.sadd(key, message) @@ -176,50 +185,50 @@ def queue_message_bulk_sync(data): # return # for msg in data: # store_kafka(msg) - # # 10000: maximum inserts we can submit to - # # Manticore as of Sept 2022 - # split_posts = array_split(data, ceil(len(data) / 10000)) - # for messages in split_posts: - # total = [] - # for msg in messages: - # # Duplicated to avoid extra function call (see above) - # if msg["type"] in TYPES_MAIN: - # index = "main" - # schema = mc_s.schema_main - # elif msg["type"] in TYPES_META: - # index = "meta" - # schema = mc_s.schema_meta - # elif msg["type"] in TYPES_INT: - # index = "internal" - # schema = mc_s.schema_int - # # normalise fields - # for key, value in list(msg.items()): - # if value is None: - # del msg[key] - # if key in schema: - # if isinstance(value, int): - # if schema[key].startswith("string") or schema[key].startswith( - # "text" - # ): - # msg[key] = str(value) +# # 10000: maximum inserts we can submit to +# # Manticore as of Sept 2022 +# split_posts = array_split(data, ceil(len(data) / 10000)) +# for messages in split_posts: +# total = [] +# for msg in messages: +# # Duplicated to avoid extra function call (see above) +# if msg["type"] in TYPES_MAIN: +# index = "main" +# schema = mc_s.schema_main +# elif msg["type"] in TYPES_META: +# index = "meta" +# schema = mc_s.schema_meta +# elif msg["type"] in TYPES_INT: +# index = "internal" +# schema = mc_s.schema_int +# # normalise fields +# for key, value in list(msg.items()): +# if value is None: +# del msg[key] +# if key in schema: +# if isinstance(value, int): +# if schema[key].startswith("string") or schema[key].startswith( +# "text" +# ): +# msg[key] = str(value) - # body = {"insert": {"index": index, "doc": msg}} - # total.append(body) +# body = {"insert": {"index": index, "doc": msg}} +# total.append(body) - # body_post = "" - # for item in total: - # body_post += ujson.dumps(item) - # body_post += "\n" +# body_post = "" +# for item in total: +# body_post += ujson.dumps(item) +# body_post += "\n" - # # print(body_post) - # try: - # # Bulk index operations - # print("FAKE POST") - # #api_response = api_instance.bulk(body_post) # , async_req=True - # #print(api_response) - # except ApiException as e: - # print("Exception when calling IndexApi->bulk: %s\n" % e) - # print("ATTEMPT", body_post) +# # print(body_post) +# try: +# # Bulk index operations +# print("FAKE POST") +# #api_response = api_instance.bulk(body_post) # , async_req=True +# #print(api_response) +# except ApiException as e: +# print("Exception when calling IndexApi->bulk: %s\n" % e) +# print("ATTEMPT", body_post) # def update_schema(): @@ -243,5 +252,5 @@ def queue_message_bulk_sync(data): # util_instance.sql(create_query) -#create_index(api_client) -#update_schema() +# create_index(api_client) +# update_schema() diff --git a/docker-compose.yml b/docker-compose.yml index 8d15443..f82b86b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,7 +19,11 @@ services: - .env volumes_from: - tmp - # depends_on: + depends_on: + - broker + - kafka + - tmp + - redis # - db threshold: @@ -52,12 +56,16 @@ services: - 9093:9090 environment: - DRUID_BROKER_URL=http://broker:8082 + depends_on: + - broker metabase: container_name: metabase image: metabase/metabase:latest ports: - 3001:3000 + depends_on: + - broker postgres: container_name: postgres @@ -82,6 +90,7 @@ services: image: bitnami/kafka depends_on: - zookeeper + - broker ports: - 29092:29092 - 9092:9092 diff --git a/monolith.py b/monolith.py index ce4c692..ff3b929 100644 --- a/monolith.py +++ b/monolith.py @@ -1,11 +1,11 @@ import asyncio from os import getenv +import db import util from sources.ch4 import Chan4 from sources.dis import DiscordClient from sources.ingest import Ingest -import db # For development # if not getenv("DISCORD_TOKEN", None): @@ -27,7 +27,6 @@ async def main(loop): log.info("Starting Discord handler.") client = DiscordClient() loop.create_task(client.start(token)) - # client.run(token) log.info("Starting 4chan handler.") chan = Chan4() diff --git a/processing/process.py b/processing/process.py index ee45ebc..b547845 100644 --- a/processing/process.py +++ b/processing/process.py @@ -1,7 +1,20 @@ -from concurrent.futures import ProcessPoolExecutor import asyncio import os +import random + +# For key generation +import string +from concurrent.futures import ProcessPoolExecutor + +# For timestamp processing +from datetime import datetime +from math import ceil + import ujson + +# For 4chan message parsing +from bs4 import BeautifulSoup +from numpy import array_split from siphashc import siphash import db @@ -10,19 +23,6 @@ import util # 4chan schema from schemas.ch4_s import ATTRMAP -# For key generation -import string -import random - -# For timestamp processing -import datetime - -# For 4chan message parsing -from bs4 import BeautifulSoup - -from numpy import array_split -from math import ceil - log = util.get_logger("process") # Maximum number of CPU threads to use for post processing @@ -30,6 +30,7 @@ CPU_THREADS = os.cpu_count() p = ProcessPoolExecutor(CPU_THREADS) + def get_hash_key(): hash_key = db.r.get("hashing_key") if not hash_key: @@ -42,33 +43,68 @@ def get_hash_key(): log.debug(f"Decoded hash key: {hash_key}") return hash_key + hash_key = get_hash_key() + +@asyncio.coroutine async def spawn_processing_threads(data): - print("SPAWN", data) + loop = asyncio.get_event_loop() + tasks = [] + oldts = [x["now"] for x in data if "now" in x] if len(data) < CPU_THREADS: split_data = [data] else: msg_per_core = int(len(data) / CPU_THREADS) print("MSG PER CORE", msg_per_core) split_data = array_split(data, ceil(len(data) / msg_per_core)) - print("SPLIT DATA", split_data) - for split in split_data: + for index, split in enumerate(split_data): print("DELEGATING TO THREAD", len(split)) - await process_data_thread(split) + future = loop.run_in_executor(p, process_data, data) + # future = p.submit(process_data, split) + tasks.append(future) + # results = [x.result(timeout=50) for x in tasks] + results = await asyncio.gather(*tasks) + print("RESULTS", len(results)) + + # Join the results back from the split list + flat_list = [item for sublist in results for item in sublist] + print("LENFLAT", len(flat_list)) + print("LENDATA", len(data)) + + newts = [x["ts"] for x in flat_list if "ts" in x] + print("lenoldts", len(oldts)) + print("lennewts", len(newts)) + allts = all(["ts" in x for x in flat_list]) + print("ALLTS", allts) + alllen = [len(x) for x in flat_list] + print("ALLLEN", alllen) + await db.store_kafka_batch(flat_list) + + +# @asyncio.coroutine +# def process_data_thread(data): +# """ +# Helper to spawn threads to process a list of data. +# """ +# loop = asyncio.get_event_loop() +# if len(data) < CPU_THREADS: +# split_data = [data] +# else: +# msg_per_core = int(len(data) / CPU_THREADS) +# print("MSG PER CORE", msg_per_core) +# split_data = array_split(data, ceil(len(data) / msg_per_core)) +# for index, split in enumerate(split_data): +# print("DELEGATING TO THREAD", len(split)) +# #f = process_data_thread(split) +# yield loop.run_in_executor(p, process_data, data) -@asyncio.coroutine -def process_data_thread(data): - """ - Helper to spawn threads to process a list of data. - """ - loop = asyncio.get_event_loop() - yield from loop.run_in_executor(p, process_data, data) def process_data(data): - print("PROCESSING DATA", data) + print("PROCESS DATA START") + # to_store = [] for index, msg in enumerate(data): - #print("PROCESSING", msg) + # print("PROCESSING", msg) if msg["src"] == "4ch": board = msg["net"] thread = msg["channel"] @@ -81,15 +117,18 @@ def process_data(data): if key_content: key_content = key_content.decode("ascii") if key_content == hash: + del data[index] continue else: - data[index][index]["type"] = "update" + data[index]["type"] = "update" db.r.set(redis_key, hash) - for key2, value in list(msg.items()): + if "now" not in data[index]: + print("NOW NOT IN INDEX", data[index]) + for key2, value in list(data[index].items()): if key2 in ATTRMAP: - msg[ATTRMAP[key2]] = data[index][key2] + data[index][ATTRMAP[key2]] = data[index][key2] del data[index][key2] - if "ts" in msg: + if "ts" in data[index]: old_time = data[index]["ts"] # '08/30/22(Tue)02:25:37' time_spl = old_time.split(":") @@ -100,7 +139,13 @@ def process_data(data): # new_ts = old_ts.isoformat() new_ts = int(old_ts.timestamp()) data[index]["ts"] = new_ts + else: + print("MSG WITHOUT TS PROCESS", data[index]) + continue if "msg" in msg: soup = BeautifulSoup(data[index]["msg"], "html.parser") msg = soup.get_text(separator="\n") - data[index]["msg"] = msg \ No newline at end of file + data[index]["msg"] = msg + # to_store.append(data[index]) + print("FINISHED PROCESSING DATA") + return data diff --git a/sources/ch4.py b/sources/ch4.py index 7fad264..7640c9b 100644 --- a/sources/ch4.py +++ b/sources/ch4.py @@ -136,7 +136,7 @@ class Chan4(object): # Split into 10,000 chunks if not all_posts: return - self.handle_posts(all_posts) + await self.handle_posts(all_posts) # threads_per_core = int(len(all_posts) / CPU_THREADS) # for i in range(CPU_THREADS): # new_dict = {} @@ -146,8 +146,7 @@ class Chan4(object): # new_dict[k].append(v) # else: # new_dict[k] = [v] - #await self.handle_posts_thread(new_dict) - + # await self.handle_posts_thread(new_dict) # print("VAL", ceil(len(all_posts) / threads_per_core)) # split_posts = array_split(all_posts, ceil(len(all_posts) / threads_per_core)) diff --git a/sources/ingest.py b/sources/ingest.py index 3132797..be7d6ca 100644 --- a/sources/ingest.py +++ b/sources/ingest.py @@ -4,24 +4,22 @@ import ujson import db import util - from processing import process -SOURCES = ["irc", "dis", "4ch"] +SOURCES = ["4ch", "irc", "dis"] KEYPREFIX = "queue." -CHUNK_SIZE = 1000 +CHUNK_SIZE = 90000 ITER_DELAY = 0.5 - - - class Ingest(object): def __init__(self): name = self.__class__.__name__ self.log = util.get_logger(name) async def run(self): + # items = [{'no': 23567753, 'now': '09/12/22(Mon)20:10:29', 'name': 'Anonysmous', 'filename': '1644986767568', 'ext': '.webm', 'w': 1280, 'h': 720, 'tn_w': 125, 'tn_h': 70, 'tim': 1663027829301457, 'time': 1663027829, 'md5': 'zeElr1VR05XpZ2XuAPhmPA==', 'fsize': 3843621, 'resto': 23554700, 'type': 'msg', 'src': '4ch', 'net': 'gif', 'channel': '23554700'}] + # await process.spawn_processing_threads(items) while True: await self.get_chunk() await asyncio.sleep(ITER_DELAY) @@ -33,13 +31,11 @@ class Ingest(object): chunk = await db.ar.spop(key, CHUNK_SIZE) if not chunk: continue - #self.log.info(f"Got chunk: {chunk}") + # self.log.info(f"Got chunk: {chunk}") for item in chunk: item = ujson.loads(item) - #self.log.info(f"Got item: {item}") + # self.log.info(f"Got item: {item}") items.append(item) if items: print("PROCESSING", len(items)) await process.spawn_processing_threads(items) - print("DONE WITH PROCESSING", len(items)) - await db.store_kafka_batch(items) From 143f2a0bf0a0e101184095696bb2f27d517177e7 Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Fri, 16 Sep 2022 17:09:49 +0100 Subject: [PATCH 5/5] Implement sentiment/NLP annotation and optimise processing --- db.py | 198 ++++++---------------------------------- docker-compose.yml | 34 +++++-- docker/Dockerfile | 2 +- docker/requirements.txt | 12 ++- event_log.txt | 0 monolith.py | 8 -- processing/process.py | 171 ++++++++++++++++++++++------------ requirements.txt | 12 ++- sources/ch4.py | 87 ++---------------- sources/ingest.py | 15 ++- util.py | 2 +- 11 files changed, 203 insertions(+), 338 deletions(-) create mode 100644 event_log.txt diff --git a/db.py b/db.py index 4c92555..473a2c0 100644 --- a/db.py +++ b/db.py @@ -1,28 +1,15 @@ import random -from math import ceil import aioredis -import manticoresearch -import ujson +import orjson + +# Kafka from aiokafka import AIOKafkaProducer -from manticoresearch.rest import ApiException -from numpy import array_split from redis import StrictRedis import util -# Manticore schema -from schemas import mc_s - -# Manticore -configuration = manticoresearch.Configuration(host="http://monolith-db-1:9308") -api_client = manticoresearch.ApiClient(configuration) -api_instance = manticoresearch.IndexApi(api_client) - -# Kafka -from aiokafka import AIOKafkaProducer - -KAFKA_TOPIC = "msg" +# KAFKA_TOPIC = "msg" log = util.get_logger("db") @@ -51,103 +38,62 @@ KEYPREFIX = "queue." async def store_kafka_batch(data): - print("STORING KAFKA BATCH") + log.debug(f"Storing Kafka batch of {len(data)} messages") producer = AIOKafkaProducer(bootstrap_servers="kafka:9092") await producer.start() batch = producer.create_batch() for msg in data: if msg["type"] in TYPES_MAIN: index = "main" - schema = mc_s.schema_main + # schema = mc_s.schema_main elif msg["type"] in TYPES_META: index = "meta" - schema = mc_s.schema_meta + # schema = mc_s.schema_meta elif msg["type"] in TYPES_INT: index = "internal" - schema = mc_s.schema_int + # schema = mc_s.schema_int + + KAFKA_TOPIC = index # normalise fields for key, value in list(msg.items()): if value is None: del msg[key] - if key in schema: - if isinstance(value, int): - if schema[key].startswith("string") or schema[key].startswith( - "text" - ): - msg[key] = str(value) - message = ujson.dumps(msg) - body = str.encode(message) + # if key in schema: + # if isinstance(value, int): + # if schema[key].startswith("string") or schema[key].startswith( + # "text" + # ): + # msg[key] = str(value) + body = orjson.dumps(msg) + # orjson returns bytes + # body = str.encode(message) if "ts" not in msg: - # print("MSG WITHOUT TS", msg) - continue + raise Exception("No TS in msg") metadata = batch.append(key=None, value=body, timestamp=msg["ts"]) if metadata is None: partitions = await producer.partitions_for(KAFKA_TOPIC) partition = random.choice(tuple(partitions)) await producer.send_batch(batch, KAFKA_TOPIC, partition=partition) - print( - "%d messages sent to partition %d" % (batch.record_count(), partition) - ) + log.debug(f"{batch.record_count()} messages sent to partition {partition}") batch = producer.create_batch() continue partitions = await producer.partitions_for(KAFKA_TOPIC) partition = random.choice(tuple(partitions)) await producer.send_batch(batch, KAFKA_TOPIC, partition=partition) - print("%d messages sent to partition %d" % (batch.record_count(), partition)) + log.debug(f"{batch.record_count()} messages sent to partition {partition}") await producer.stop() -# def store_message(msg): -# """ -# Store a message into Manticore -# :param msg: dict -# """ -# store_kafka(msg) -# # Duplicated to avoid extra function call -# if msg["type"] in TYPES_MAIN: -# index = "main" -# schema = mc_s.schema_main -# elif msg["type"] in TYPES_META: -# index = "meta" -# schema = mc_s.schema_meta -# elif msg["type"] in TYPES_INT: -# index = "internal" -# schema = mc_s.schema_int -# # normalise fields -# for key, value in list(msg.items()): -# if value is None: -# del msg[key] -# if key in schema: -# if isinstance(value, int): -# if schema[key].startswith("string") or schema[key].startswith("text"): -# msg[key] = str(value) - -# body = [{"insert": {"index": index, "doc": msg}}] -# body_post = "" -# for item in body: -# body_post += ujson.dumps(item) -# body_post += "\n" - -# # print(body_post) -# try: -# # Bulk index operations -# print("FAKE POST") -# #api_response = api_instance.bulk(body_post) # , async_req=True -# # print(api_response) -# except ApiException as e: -# print("Exception when calling IndexApi->bulk: %s\n" % e) -# print("ATTEMPT", body_post) - - async def queue_message(msg): """ Queue a message on the Redis buffer. """ src = msg["src"] - message = ujson.dumps(msg) + message = orjson.dumps(msg) key = f"{KEYPREFIX}{src}" + # log.debug(f"Queueing single message of string length {len(message)}") await ar.sadd(key, message) @@ -155,102 +101,10 @@ async def queue_message_bulk(data): """ Queue multiple messages on the Redis buffer. """ + # log.debug(f"Queueing message batch of length {len(data)}") for msg in data: src = msg["src"] - message = ujson.dumps(msg) + message = orjson.dumps(msg) key = f"{KEYPREFIX}{src}" await ar.sadd(key, message) - - -# For now, make a normal function until we go full async -def queue_message_bulk_sync(data): - """ - Queue multiple messages on the Redis buffer. - """ - for msg in data: - src = msg["src"] - message = ujson.dumps(msg) - - key = "{KEYPREFIX}{src}" - r.sadd(key, message) - - -# def store_message_bulk(data): -# """ -# Store a message into Manticore -# :param msg: dict -# """ -# if not data: -# return -# for msg in data: -# store_kafka(msg) -# # 10000: maximum inserts we can submit to -# # Manticore as of Sept 2022 -# split_posts = array_split(data, ceil(len(data) / 10000)) -# for messages in split_posts: -# total = [] -# for msg in messages: -# # Duplicated to avoid extra function call (see above) -# if msg["type"] in TYPES_MAIN: -# index = "main" -# schema = mc_s.schema_main -# elif msg["type"] in TYPES_META: -# index = "meta" -# schema = mc_s.schema_meta -# elif msg["type"] in TYPES_INT: -# index = "internal" -# schema = mc_s.schema_int -# # normalise fields -# for key, value in list(msg.items()): -# if value is None: -# del msg[key] -# if key in schema: -# if isinstance(value, int): -# if schema[key].startswith("string") or schema[key].startswith( -# "text" -# ): -# msg[key] = str(value) - -# body = {"insert": {"index": index, "doc": msg}} -# total.append(body) - -# body_post = "" -# for item in total: -# body_post += ujson.dumps(item) -# body_post += "\n" - -# # print(body_post) -# try: -# # Bulk index operations -# print("FAKE POST") -# #api_response = api_instance.bulk(body_post) # , async_req=True -# #print(api_response) -# except ApiException as e: -# print("Exception when calling IndexApi->bulk: %s\n" % e) -# print("ATTEMPT", body_post) - - -# def update_schema(): -# pass - - -# def create_index(api_client): -# util_instance = manticoresearch.UtilsApi(api_client) -# schemas = { -# "main": mc_s.schema_main, -# "meta": mc_s.schema_meta, -# "internal": mc_s.schema_int, -# } -# for name, schema in schemas.items(): -# schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()]) - -# create_query = ( -# f"create table if not exists {name}({schema_types}) engine='columnar'" -# ) -# print("Schema types", create_query) -# util_instance.sql(create_query) - - -# create_index(api_client) -# update_schema() diff --git a/docker-compose.yml b/docker-compose.yml index f82b86b..d0d2f4c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -20,10 +20,14 @@ services: volumes_from: - tmp depends_on: - - broker - - kafka - - tmp - - redis + broker: + condition: service_started + kafka: + condition: service_healthy + tmp: + condition: service_started + redis: + condition: service_healthy # - db threshold: @@ -46,8 +50,10 @@ services: volumes_from: - tmp depends_on: - - tmp - - redis + tmp: + condition: service_started + redis: + condition: service_healthy turnilo: container_name: turnilo @@ -102,6 +108,17 @@ services: KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 ALLOW_PLAINTEXT_LISTENER: yes + # healthcheck: + # test: ["CMD-SHELL", "kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic main --describe"] + # interval: 2s + # timeout: 2s + # retries: 15 + healthcheck: + test: ["CMD", "kafka-topics.sh", "--list", "--bootstrap-server", "kafka:9092"] + start_period: 15s + interval: 2s + timeout: 5s + retries: 30 coordinator: image: apache/druid:0.23.0 @@ -230,6 +247,11 @@ services: - ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf volumes_from: - tmp + healthcheck: + test: "redis-cli -s /var/run/redis/redis.sock ping" + interval: 2s + timeout: 2s + retries: 15 networks: default: diff --git a/docker/Dockerfile b/docker/Dockerfile index c133ace..0da9448 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -16,7 +16,7 @@ COPY requirements.txt /code/ COPY discord-patched.tgz /code/ RUN python -m venv /venv -RUN . /venv/bin/activate && pip install -r requirements.txt +RUN . /venv/bin/activate && pip install -r requirements.txt && python -m spacy download en_core_web_sm RUN tar xf /code/discord-patched.tgz -C /venv/lib/python3.10/site-packages diff --git a/docker/requirements.txt b/docker/requirements.txt index 137542a..35412d3 100644 --- a/docker/requirements.txt +++ b/docker/requirements.txt @@ -4,8 +4,18 @@ redis siphashc aiohttp[speedups] python-dotenv -manticoresearch +#manticoresearch numpy ujson aioredis[hiredis] aiokafka +vaderSentiment +polyglot +pyicu +pycld2 +morfessor +six +nltk +spacy +python-Levenshtein +orjson diff --git a/event_log.txt b/event_log.txt new file mode 100644 index 0000000..e69de29 diff --git a/monolith.py b/monolith.py index ff3b929..1eb559b 100644 --- a/monolith.py +++ b/monolith.py @@ -1,19 +1,11 @@ import asyncio from os import getenv -import db import util from sources.ch4 import Chan4 from sources.dis import DiscordClient from sources.ingest import Ingest -# For development -# if not getenv("DISCORD_TOKEN", None): -# print("Could not get Discord token, attempting load from .env") -# from dotenv import load_dotenv - -# load_dotenv() - log = util.get_logger("monolith") modules_enabled = getenv("MODULES_ENABLED", False) diff --git a/processing/process.py b/processing/process.py index b547845..4a6409c 100644 --- a/processing/process.py +++ b/processing/process.py @@ -4,25 +4,73 @@ import random # For key generation import string + +# Squash errors +import warnings from concurrent.futures import ProcessPoolExecutor # For timestamp processing from datetime import datetime from math import ceil -import ujson +import orjson + +# Tokenisation +import spacy # For 4chan message parsing from bs4 import BeautifulSoup from numpy import array_split +from polyglot.detect.base import logger as polyglot_logger + +# For NLP +from polyglot.text import Text +from pycld2 import error as cld2_error from siphashc import siphash +# For sentiment +from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer + import db import util # 4chan schema from schemas.ch4_s import ATTRMAP +# For tokenisation +# from gensim.parsing.preprocessing import ( +# strip_tags, +# strip_punctuation, +# strip_numeric, +# stem_text, +# strip_multiple_whitespaces, +# strip_non_alphanum, +# remove_stopwords, +# strip_short, +# preprocess_string, +# ) + +# CUSTOM_FILTERS = [ +# lambda x: x.lower(), +# strip_tags, # +# strip_punctuation, # +# strip_multiple_whitespaces, +# strip_numeric, +# remove_stopwords, +# strip_short, +# #stem_text, +# strip_non_alphanum, # +# ] + +# Squash errors +polyglot_logger.setLevel("ERROR") +warnings.filterwarnings("ignore", category=UserWarning, module="bs4") + + +TAGS = ["NOUN", "ADJ", "VERB", "ADV"] +nlp = spacy.load("en_core_web_sm", disable=["parser", "ner"]) + + log = util.get_logger("process") # Maximum number of CPU threads to use for post processing @@ -49,67 +97,44 @@ hash_key = get_hash_key() @asyncio.coroutine async def spawn_processing_threads(data): + len_data = len(data) + log.debug(f"Spawning processing threads for batch of {len_data} messages") + loop = asyncio.get_event_loop() tasks = [] - oldts = [x["now"] for x in data if "now" in x] + if len(data) < CPU_THREADS: split_data = [data] else: msg_per_core = int(len(data) / CPU_THREADS) - print("MSG PER CORE", msg_per_core) split_data = array_split(data, ceil(len(data) / msg_per_core)) for index, split in enumerate(split_data): - print("DELEGATING TO THREAD", len(split)) - future = loop.run_in_executor(p, process_data, data) - # future = p.submit(process_data, split) - tasks.append(future) - # results = [x.result(timeout=50) for x in tasks] - results = await asyncio.gather(*tasks) - print("RESULTS", len(results)) + log.debug(f"Delegating processing of {len(split)} messages to thread {index}") + task = loop.run_in_executor(p, process_data, data) + tasks.append(task) + + results = [await task for task in tasks] + log.debug(f"Results from processing of {len_data} messages: {len(results)}") # Join the results back from the split list flat_list = [item for sublist in results for item in sublist] - print("LENFLAT", len(flat_list)) - print("LENDATA", len(data)) - - newts = [x["ts"] for x in flat_list if "ts" in x] - print("lenoldts", len(oldts)) - print("lennewts", len(newts)) - allts = all(["ts" in x for x in flat_list]) - print("ALLTS", allts) - alllen = [len(x) for x in flat_list] - print("ALLLEN", alllen) await db.store_kafka_batch(flat_list) - -# @asyncio.coroutine -# def process_data_thread(data): -# """ -# Helper to spawn threads to process a list of data. -# """ -# loop = asyncio.get_event_loop() -# if len(data) < CPU_THREADS: -# split_data = [data] -# else: -# msg_per_core = int(len(data) / CPU_THREADS) -# print("MSG PER CORE", msg_per_core) -# split_data = array_split(data, ceil(len(data) / msg_per_core)) -# for index, split in enumerate(split_data): -# print("DELEGATING TO THREAD", len(split)) -# #f = process_data_thread(split) -# yield loop.run_in_executor(p, process_data, data) + log.debug(f"Finished processing {len_data} messages") def process_data(data): - print("PROCESS DATA START") - # to_store = [] - for index, msg in enumerate(data): - # print("PROCESSING", msg) + to_store = [] + + # Initialise sentiment analyser + analyzer = SentimentIntensityAnalyzer() + for msg in data: if msg["src"] == "4ch": board = msg["net"] thread = msg["channel"] + # Calculate hash for post - post_normalised = ujson.dumps(msg, sort_keys=True) + post_normalised = orjson.dumps(msg, option=orjson.OPT_SORT_KEYS) hash = siphash(hash_key, post_normalised) hash = str(hash) redis_key = f"cache.{board}.{thread}.{msg['no']}" @@ -117,19 +142,17 @@ def process_data(data): if key_content: key_content = key_content.decode("ascii") if key_content == hash: - del data[index] + # This deletes the message since the append at the end won't be hit continue else: - data[index]["type"] = "update" + msg["type"] = "update" db.r.set(redis_key, hash) - if "now" not in data[index]: - print("NOW NOT IN INDEX", data[index]) - for key2, value in list(data[index].items()): + for key2, value in list(msg.items()): if key2 in ATTRMAP: - data[index][ATTRMAP[key2]] = data[index][key2] - del data[index][key2] - if "ts" in data[index]: - old_time = data[index]["ts"] + msg[ATTRMAP[key2]] = msg[key2] + del msg[key2] + if "ts" in msg: + old_time = msg["ts"] # '08/30/22(Tue)02:25:37' time_spl = old_time.split(":") if len(time_spl) == 3: @@ -138,14 +161,42 @@ def process_data(data): old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M") # new_ts = old_ts.isoformat() new_ts = int(old_ts.timestamp()) - data[index]["ts"] = new_ts + msg["ts"] = new_ts else: - print("MSG WITHOUT TS PROCESS", data[index]) - continue + raise Exception("No TS in msg") if "msg" in msg: - soup = BeautifulSoup(data[index]["msg"], "html.parser") - msg = soup.get_text(separator="\n") - data[index]["msg"] = msg - # to_store.append(data[index]) - print("FINISHED PROCESSING DATA") - return data + soup = BeautifulSoup(msg["msg"], "html.parser") + msg_str = soup.get_text(separator="\n") + msg["msg"] = msg_str + # Annotate sentiment/NLP + if "msg" in msg: + # Language + text = Text(msg["msg"]) + try: + lang_code = text.language.code + lang_name = text.language.name + msg["lang_code"] = lang_code + msg["lang_name"] = lang_name + except cld2_error as e: + log.error(f"Error detecting language: {e}") + # So below block doesn't fail + lang_code = None + + # Blatant discrimination + if lang_code == "en": + + # Sentiment + vs = analyzer.polarity_scores(str(msg["msg"])) + addendum = vs["compound"] + msg["sentiment"] = addendum + + # Tokens + n = nlp(msg["msg"]) + for tag in TAGS: + tag_name = tag.lower() + tags_flag = [token.lemma_ for token in n if token.pos_ == tag] + msg[f"words_{tag_name}"] = tags_flag + + # Add the mutated message to the return buffer + to_store.append(msg) + return to_store diff --git a/requirements.txt b/requirements.txt index 10ad22d..d020596 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,8 +5,18 @@ redis siphashc aiohttp[speedups] python-dotenv -manticoresearch +#manticoresearch numpy ujson aioredis[hiredis] aiokafka +vaderSentiment +polyglot +pyicu +pycld2 +morfessor +six +nltk +spacy +python-Levenshtein +orjson diff --git a/sources/ch4.py b/sources/ch4.py index 7640c9b..4ece35f 100644 --- a/sources/ch4.py +++ b/sources/ch4.py @@ -2,19 +2,13 @@ import asyncio import random import string -from concurrent.futures import ProcessPoolExecutor -from datetime import datetime from math import ceil import aiohttp -import ujson -from bs4 import BeautifulSoup from numpy import array_split -from siphashc import siphash import db import util -from schemas.ch4_s import ATTRMAP # CONFIGURATION # @@ -30,13 +24,8 @@ CRAWL_DELAY = 5 # Semaphore value ? THREADS_SEMAPHORE = 1000 -# Maximum number of CPU threads to use for post processing -CPU_THREADS = 8 - # CONFIGURATION END # -p = ProcessPoolExecutor(CPU_THREADS) - class Chan4(object): """ @@ -83,10 +72,12 @@ class Chan4(object): self.log.debug(f"Got boards: {self.boards}") async def get_thread_lists(self, boards): - self.log.debug(f"Getting thread list for {boards}") + # self.log.debug(f"Getting thread list for {boards}") board_urls = {board: f"{board}/catalog.json" for board in boards} responses = await self.api_call(board_urls) to_get = [] + flat_map = [board for board, thread in responses] + self.log.debug(f"Got thread list for {flat_map}: {len(responses)}") for mapped, response in responses: if not response: continue @@ -95,7 +86,6 @@ class Chan4(object): no = threads["no"] to_get.append((mapped, no)) - self.log.debug(f"Got thread list for {mapped}: {len(response)}") if not to_get: return split_threads = array_split(to_get, ceil(len(to_get) / THREADS_CONCURRENT)) @@ -122,46 +112,20 @@ class Chan4(object): (board, thread): f"{board}/thread/{thread}.json" for board, thread in thread_list } - self.log.debug(f"Getting information for threads: {thread_urls}") + # self.log.debug(f"Getting information for threads: {thread_urls}") responses = await self.api_call(thread_urls) - self.log.debug(f"Got information for threads: {thread_urls}") + self.log.debug(f"Got information for {len(responses)} threads") + all_posts = {} for mapped, response in responses: if not response: continue board, thread = mapped - self.log.debug(f"Got thread content for thread {thread} on board {board}") all_posts[mapped] = response["posts"] - # Split into 10,000 chunks if not all_posts: return await self.handle_posts(all_posts) - # threads_per_core = int(len(all_posts) / CPU_THREADS) - # for i in range(CPU_THREADS): - # new_dict = {} - # pulled_posts = self.take_items(all_posts, threads_per_core) - # for k, v in pulled_posts: - # if k in new_dict: - # new_dict[k].append(v) - # else: - # new_dict[k] = [v] - # await self.handle_posts_thread(new_dict) - - # print("VAL", ceil(len(all_posts) / threads_per_core)) - # split_posts = array_split(all_posts, ceil(len(all_posts) / threads_per_core)) - # print("THREADS PER CORE SPLIT", len(split_posts)) - # # print("SPLIT CHUNK", len(split_posts)) - # for posts in split_posts: - # print("SPAWNED THREAD TO PROCESS", len(posts), "POSTS") - # await self.handle_posts_thread(posts) - - # await self.handle_posts_thread(all_posts) - - @asyncio.coroutine - def handle_posts_thread(self, posts): - loop = asyncio.get_event_loop() - yield from loop.run_in_executor(p, self.handle_posts, posts) async def handle_posts(self, posts): to_store = [] @@ -170,50 +134,13 @@ class Chan4(object): for index, post in enumerate(post_list): posts[key][index]["type"] = "msg" - # # Calculate hash for post - # post_normalised = ujson.dumps(post, sort_keys=True) - # hash = siphash(self.hash_key, post_normalised) - # hash = str(hash) - # redis_key = f"cache.{board}.{thread}.{post['no']}" - # key_content = db.r.get(redis_key) - # if key_content: - # key_content = key_content.decode("ascii") - # if key_content == hash: - # continue - # else: - # posts[key][index]["type"] = "update" - # #db.r.set(redis_key, hash) - - # for key2, value in list(post.items()): - # if key2 in ATTRMAP: - # post[ATTRMAP[key2]] = posts[key][index][key2] - # del posts[key][index][key2] - # if "ts" in post: - # old_time = posts[key][index]["ts"] - # # '08/30/22(Tue)02:25:37' - # time_spl = old_time.split(":") - # if len(time_spl) == 3: - # old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S") - # else: - # old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M") - # # new_ts = old_ts.isoformat() - # new_ts = int(old_ts.timestamp()) - # posts[key][index]["ts"] = new_ts - # if "msg" in post: - # soup = BeautifulSoup(posts[key][index]["msg"], "html.parser") - # msg = soup.get_text(separator="\n") - # posts[key][index]["msg"] = msg - posts[key][index]["src"] = "4ch" posts[key][index]["net"] = board posts[key][index]["channel"] = thread to_store.append(posts[key][index]) - # print({name_map[name]: val for name, val in post.items()}) - # print(f"Got posts: {len(posts)}") if to_store: - print("STORING", len(to_store)) await db.queue_message_bulk(to_store) async def fetch(self, url, session, mapped): @@ -238,7 +165,7 @@ class Chan4(object): async with aiohttp.ClientSession(connector=connector) as session: for mapped, method in methods.items(): url = f"{self.api_endpoint}/{method}" - self.log.debug(f"GET {url}") + # self.log.debug(f"GET {url}") task = asyncio.create_task(self.bound_fetch(sem, url, session, mapped)) # task = asyncio.ensure_future(self.bound_fetch(sem, url, session)) tasks.append(task) diff --git a/sources/ingest.py b/sources/ingest.py index be7d6ca..017b8db 100644 --- a/sources/ingest.py +++ b/sources/ingest.py @@ -1,6 +1,6 @@ import asyncio -import ujson +import orjson import db import util @@ -8,9 +8,13 @@ from processing import process SOURCES = ["4ch", "irc", "dis"] KEYPREFIX = "queue." -CHUNK_SIZE = 90000 + +# Chunk size per source (divide by len(SOURCES) for total) +CHUNK_SIZE = 9000 ITER_DELAY = 0.5 +log = util.get_logger("ingest") + class Ingest(object): def __init__(self): @@ -18,8 +22,6 @@ class Ingest(object): self.log = util.get_logger(name) async def run(self): - # items = [{'no': 23567753, 'now': '09/12/22(Mon)20:10:29', 'name': 'Anonysmous', 'filename': '1644986767568', 'ext': '.webm', 'w': 1280, 'h': 720, 'tn_w': 125, 'tn_h': 70, 'tim': 1663027829301457, 'time': 1663027829, 'md5': 'zeElr1VR05XpZ2XuAPhmPA==', 'fsize': 3843621, 'resto': 23554700, 'type': 'msg', 'src': '4ch', 'net': 'gif', 'channel': '23554700'}] - # await process.spawn_processing_threads(items) while True: await self.get_chunk() await asyncio.sleep(ITER_DELAY) @@ -31,11 +33,8 @@ class Ingest(object): chunk = await db.ar.spop(key, CHUNK_SIZE) if not chunk: continue - # self.log.info(f"Got chunk: {chunk}") for item in chunk: - item = ujson.loads(item) - # self.log.info(f"Got item: {item}") + item = orjson.loads(item) items.append(item) if items: - print("PROCESSING", len(items)) await process.spawn_processing_threads(items) diff --git a/util.py b/util.py index 09f98da..045c95f 100644 --- a/util.py +++ b/util.py @@ -3,7 +3,7 @@ import logging log = logging.getLogger("util") -debug = False +debug = True # Color definitions BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)