diff --git a/db.py b/db.py index 58cb9a5..473a2c0 100644 --- a/db.py +++ b/db.py @@ -1,18 +1,15 @@ -from math import ceil +import random import aioredis -import manticoresearch -import ujson -from manticoresearch.rest import ApiException -from numpy import array_split +import orjson + +# Kafka +from aiokafka import AIOKafkaProducer from redis import StrictRedis import util -from schemas import mc_s -configuration = manticoresearch.Configuration(host="http://monolith-db-1:9308") -api_client = manticoresearch.ApiClient(configuration) -api_instance = manticoresearch.IndexApi(api_client) +# KAFKA_TOPIC = "msg" log = util.get_logger("db") @@ -37,120 +34,77 @@ TYPES_MAIN = [ ] TYPES_META = ["who"] TYPES_INT = ["conn", "highlight", "znc", "query", "self"] +KEYPREFIX = "queue." -def store_message(msg): +async def store_kafka_batch(data): + log.debug(f"Storing Kafka batch of {len(data)} messages") + producer = AIOKafkaProducer(bootstrap_servers="kafka:9092") + await producer.start() + batch = producer.create_batch() + for msg in data: + if msg["type"] in TYPES_MAIN: + index = "main" + # schema = mc_s.schema_main + elif msg["type"] in TYPES_META: + index = "meta" + # schema = mc_s.schema_meta + elif msg["type"] in TYPES_INT: + index = "internal" + # schema = mc_s.schema_int + + KAFKA_TOPIC = index + # normalise fields + for key, value in list(msg.items()): + if value is None: + del msg[key] + # if key in schema: + # if isinstance(value, int): + # if schema[key].startswith("string") or schema[key].startswith( + # "text" + # ): + # msg[key] = str(value) + body = orjson.dumps(msg) + # orjson returns bytes + # body = str.encode(message) + if "ts" not in msg: + raise Exception("No TS in msg") + metadata = batch.append(key=None, value=body, timestamp=msg["ts"]) + if metadata is None: + partitions = await producer.partitions_for(KAFKA_TOPIC) + partition = random.choice(tuple(partitions)) + await producer.send_batch(batch, KAFKA_TOPIC, partition=partition) + log.debug(f"{batch.record_count()} messages sent to partition {partition}") + batch = producer.create_batch() + continue + + partitions = await producer.partitions_for(KAFKA_TOPIC) + partition = random.choice(tuple(partitions)) + await producer.send_batch(batch, KAFKA_TOPIC, partition=partition) + log.debug(f"{batch.record_count()} messages sent to partition {partition}") + await producer.stop() + + +async def queue_message(msg): """ - Store a message into Manticore - :param msg: dict + Queue a message on the Redis buffer. """ - # Duplicated to avoid extra function call - if msg["type"] in TYPES_MAIN: - index = "main" - schema = mc_s.schema_main - elif msg["type"] in TYPES_META: - index = "meta" - schema = mc_s.schema_meta - elif msg["type"] in TYPES_INT: - index = "internal" - schema = mc_s.schema_int - # normalise fields - for key, value in list(msg.items()): - if value is None: - del msg[key] - if key in schema: - if isinstance(value, int): - if schema[key].startswith("string") or schema[key].startswith("text"): - msg[key] = str(value) + src = msg["src"] + message = orjson.dumps(msg) - body = [{"insert": {"index": index, "doc": msg}}] - body_post = "" - for item in body: - body_post += ujson.dumps(item) - body_post += "\n" - - # print(body_post) - try: - # Bulk index operations - api_response = api_instance.bulk(body_post) # , async_req=True - # print(api_response) - except ApiException as e: - print("Exception when calling IndexApi->bulk: %s\n" % e) - print("ATTEMPT", body_post) + key = f"{KEYPREFIX}{src}" + # log.debug(f"Queueing single message of string length {len(message)}") + await ar.sadd(key, message) -def store_message_bulk(data): +async def queue_message_bulk(data): """ - Store a message into Manticore - :param msg: dict + Queue multiple messages on the Redis buffer. """ - if not data: - return - # 10000: maximum inserts we can submit to - # Manticore as of Sept 2022 - split_posts = array_split(data, ceil(len(data) / 10000)) - for messages in split_posts: - total = [] - for msg in messages: - # Duplicated to avoid extra function call (see above) - if msg["type"] in TYPES_MAIN: - index = "main" - schema = mc_s.schema_main - elif msg["type"] in TYPES_META: - index = "meta" - schema = mc_s.schema_meta - elif msg["type"] in TYPES_INT: - index = "internal" - schema = mc_s.schema_int - # normalise fields - for key, value in list(msg.items()): - if value is None: - del msg[key] - if key in schema: - if isinstance(value, int): - if schema[key].startswith("string") or schema[key].startswith( - "text" - ): - msg[key] = str(value) + # log.debug(f"Queueing message batch of length {len(data)}") + for msg in data: + src = msg["src"] + message = orjson.dumps(msg) - body = {"insert": {"index": index, "doc": msg}} - total.append(body) - - body_post = "" - for item in total: - body_post += ujson.dumps(item) - body_post += "\n" - - # print(body_post) - try: - # Bulk index operations - api_response = api_instance.bulk(body_post) # , async_req=True - print(api_response) - except ApiException as e: - print("Exception when calling IndexApi->bulk: %s\n" % e) - print("ATTEMPT", body_post) - - -def update_schema(): - pass - - -def create_index(api_client): - util_instance = manticoresearch.UtilsApi(api_client) - schemas = { - "main": mc_s.schema_main, - "meta": mc_s.schema_meta, - "internal": mc_s.schema_int, - } - for name, schema in schemas.items(): - schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()]) - - create_query = ( - f"create table if not exists {name}({schema_types}) engine='columnar'" - ) - print("Schema types", create_query) - util_instance.sql(create_query) - - -create_index(api_client) -update_schema() + key = f"{KEYPREFIX}{src}" + await ar.sadd(key, message) diff --git a/docker-compose.yml b/docker-compose.yml index 995bd64..27364a3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,14 @@ -version: "2" - +version: "2.2" + +volumes: + metadata_data: {} + middle_var: {} + historical_var: {} + broker_var: {} + coordinator_var: {} + router_var: {} + druid_shared: {} + services: app: image: pathogen/monolith:latest @@ -11,7 +20,15 @@ services: volumes_from: - tmp depends_on: - - db + broker: + condition: service_started + kafka: + condition: service_healthy + tmp: + condition: service_started + redis: + condition: service_healthy + # - db threshold: image: pathogen/threshold:latest @@ -33,34 +50,194 @@ services: volumes_from: - tmp depends_on: - - tmp - - redis + tmp: + condition: service_started + redis: + condition: service_healthy - db: + # db: #image: pathogen/manticore:kibana - image: manticoresearch/manticore:latest + # image: manticoresearch/manticore:latest #build: # context: ./docker/manticore # args: # DEV: 1 - restart: always + # restart: always + + + turnilo: + container_name: turnilo + image: uchhatre/turnilo:latest ports: - - 9308 - - 9312 - - 9306 - ulimits: - nproc: 65535 - nofile: - soft: 65535 - hard: 65535 - memlock: - soft: -1 - hard: -1 + - 9093:9090 environment: - - MCL=1 + - DRUID_BROKER_URL=http://broker:8082 + depends_on: + - broker + + metabase: + container_name: metabase + image: metabase/metabase:latest + ports: + - 3001:3000 + depends_on: + - broker + + postgres: + container_name: postgres + image: postgres:latest volumes: - - ./docker/data:/var/lib/manticore - - ./docker/manticore.conf:/etc/manticoresearch/manticore.conf + - metadata_data:/var/lib/postgresql/data + environment: + - POSTGRES_PASSWORD=FoolishPassword + - POSTGRES_USER=druid + - POSTGRES_DB=druid + + # Need 3.5 or later for container nodes + zookeeper: + container_name: zookeeper + image: zookeeper:3.5 + ports: + - "2181:2181" + environment: + - ZOO_MY_ID=1 + + kafka: + image: bitnami/kafka + depends_on: + - zookeeper + - broker + ports: + - 29092:29092 + - 9092:9092 + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 + #KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + ALLOW_PLAINTEXT_LISTENER: yes + # healthcheck: + # test: ["CMD-SHELL", "kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic main --describe"] + # interval: 2s + # timeout: 2s + # retries: 15 + healthcheck: + test: ["CMD", "kafka-topics.sh", "--list", "--bootstrap-server", "kafka:9092"] + start_period: 15s + interval: 2s + timeout: 5s + retries: 30 + + coordinator: + image: apache/druid:0.23.0 + container_name: coordinator + volumes: + - druid_shared:/opt/shared + - coordinator_var:/opt/druid/var + depends_on: + - zookeeper + - postgres + ports: + - "8081:8081" + command: + - coordinator + env_file: + - environment + + broker: + image: apache/druid:0.23.0 + container_name: broker + volumes: + - broker_var:/opt/druid/var + depends_on: + - zookeeper + - postgres + - coordinator + ports: + - "8082:8082" + command: + - broker + env_file: + - environment + + historical: + image: apache/druid:0.23.0 + container_name: historical + volumes: + - druid_shared:/opt/shared + - historical_var:/opt/druid/var + depends_on: + - zookeeper + - postgres + - coordinator + ports: + - "8083:8083" + command: + - historical + env_file: + - environment + + middlemanager: + image: apache/druid:0.23.0 + container_name: middlemanager + volumes: + - druid_shared:/opt/shared + - middle_var:/opt/druid/var + depends_on: + - zookeeper + - postgres + - coordinator + ports: + - "8091:8091" + - "8100-8105:8100-8105" + command: + - middleManager + env_file: + - environment + + router: + image: apache/druid:0.23.0 + container_name: router + volumes: + - router_var:/opt/druid/var + depends_on: + - zookeeper + - postgres + - coordinator + ports: + - "8888:8888" + command: + - router + env_file: + - environment + + # db: + # #image: pathogen/manticore:kibana + # image: manticoresearch/manticore:dev + # #build: + # # context: ./docker/manticore + # # args: + # # DEV: 1 + # restart: always + # ports: + # - 9308 + # - 9312 + # - 9306 + # ulimits: + # nproc: 65535 + # nofile: + # soft: 65535 + # hard: 65535 + # memlock: + # soft: -1 + # hard: -1 + # environment: + # - MCL=1 + # volumes: + # - ./docker/data:/var/lib/manticore + # - ./docker/manticore.conf:/etc/manticoresearch/manticore.conf tmp: image: busybox @@ -80,6 +257,11 @@ services: - ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf volumes_from: - tmp + healthcheck: + test: "redis-cli -s /var/run/redis/redis.sock ping" + interval: 2s + timeout: 2s + retries: 15 networks: default: diff --git a/docker/Dockerfile b/docker/Dockerfile index c133ace..0da9448 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -16,7 +16,7 @@ COPY requirements.txt /code/ COPY discord-patched.tgz /code/ RUN python -m venv /venv -RUN . /venv/bin/activate && pip install -r requirements.txt +RUN . /venv/bin/activate && pip install -r requirements.txt && python -m spacy download en_core_web_sm RUN tar xf /code/discord-patched.tgz -C /venv/lib/python3.10/site-packages diff --git a/docker/requirements.txt b/docker/requirements.txt index 8ec92ae..35412d3 100644 --- a/docker/requirements.txt +++ b/docker/requirements.txt @@ -4,7 +4,18 @@ redis siphashc aiohttp[speedups] python-dotenv -manticoresearch +#manticoresearch numpy ujson aioredis[hiredis] +aiokafka +vaderSentiment +polyglot +pyicu +pycld2 +morfessor +six +nltk +spacy +python-Levenshtein +orjson diff --git a/environment b/environment new file mode 100644 index 0000000..8c4e756 --- /dev/null +++ b/environment @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Java tuning +DRUID_XMX=1g +DRUID_XMS=1g +DRUID_MAXNEWSIZE=250m +DRUID_NEWSIZE=250m +DRUID_MAXDIRECTMEMORYSIZE=6172m + +druid_emitter_logging_logLevel=debug + +druid_extensions_loadList=["druid-histogram", "druid-datasketches", "druid-lookups-cached-global", "postgresql-metadata-storage", "druid-kafka-indexing-service"] + +druid_zk_service_host=zookeeper + +druid_metadata_storage_host= +druid_metadata_storage_type=postgresql +druid_metadata_storage_connector_connectURI=jdbc:postgresql://postgres:5432/druid +druid_metadata_storage_connector_user=druid +druid_metadata_storage_connector_password=FoolishPassword + +druid_coordinator_balancer_strategy=cachingCost + +druid_indexer_runner_javaOptsArray=["-server", "-Xmx1g", "-Xms1g", "-XX:MaxDirectMemorySize=3g", "-Duser.timezone=UTC", "-Dfile.encoding=UTF-8", "-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"] +druid_indexer_fork_property_druid_processing_buffer_sizeBytes=256MiB + +druid_storage_type=local +druid_storage_storageDirectory=/opt/shared/segments +druid_indexer_logs_type=file +druid_indexer_logs_directory=/opt/shared/indexing-logs + +druid_processing_numThreads=2 +druid_processing_numMergeBuffers=2 + +DRUID_LOG4J= diff --git a/event_log.txt b/event_log.txt new file mode 100644 index 0000000..e69de29 diff --git a/monolith.py b/monolith.py index fbead0a..1eb559b 100644 --- a/monolith.py +++ b/monolith.py @@ -6,13 +6,6 @@ from sources.ch4 import Chan4 from sources.dis import DiscordClient from sources.ingest import Ingest -# For development -# if not getenv("DISCORD_TOKEN", None): -# print("Could not get Discord token, attempting load from .env") -# from dotenv import load_dotenv - -# load_dotenv() - log = util.get_logger("monolith") modules_enabled = getenv("MODULES_ENABLED", False) @@ -26,7 +19,6 @@ async def main(loop): log.info("Starting Discord handler.") client = DiscordClient() loop.create_task(client.start(token)) - # client.run(token) log.info("Starting 4chan handler.") chan = Chan4() diff --git a/processing/__init__.py b/processing/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/processing/process.py b/processing/process.py new file mode 100644 index 0000000..4a6409c --- /dev/null +++ b/processing/process.py @@ -0,0 +1,202 @@ +import asyncio +import os +import random + +# For key generation +import string + +# Squash errors +import warnings +from concurrent.futures import ProcessPoolExecutor + +# For timestamp processing +from datetime import datetime +from math import ceil + +import orjson + +# Tokenisation +import spacy + +# For 4chan message parsing +from bs4 import BeautifulSoup +from numpy import array_split +from polyglot.detect.base import logger as polyglot_logger + +# For NLP +from polyglot.text import Text +from pycld2 import error as cld2_error +from siphashc import siphash + +# For sentiment +from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer + +import db +import util + +# 4chan schema +from schemas.ch4_s import ATTRMAP + +# For tokenisation +# from gensim.parsing.preprocessing import ( +# strip_tags, +# strip_punctuation, +# strip_numeric, +# stem_text, +# strip_multiple_whitespaces, +# strip_non_alphanum, +# remove_stopwords, +# strip_short, +# preprocess_string, +# ) + +# CUSTOM_FILTERS = [ +# lambda x: x.lower(), +# strip_tags, # +# strip_punctuation, # +# strip_multiple_whitespaces, +# strip_numeric, +# remove_stopwords, +# strip_short, +# #stem_text, +# strip_non_alphanum, # +# ] + +# Squash errors +polyglot_logger.setLevel("ERROR") +warnings.filterwarnings("ignore", category=UserWarning, module="bs4") + + +TAGS = ["NOUN", "ADJ", "VERB", "ADV"] +nlp = spacy.load("en_core_web_sm", disable=["parser", "ner"]) + + +log = util.get_logger("process") + +# Maximum number of CPU threads to use for post processing +CPU_THREADS = os.cpu_count() + +p = ProcessPoolExecutor(CPU_THREADS) + + +def get_hash_key(): + hash_key = db.r.get("hashing_key") + if not hash_key: + letters = string.ascii_lowercase + hash_key = "".join(random.choice(letters) for i in range(16)) + log.debug(f"Created new hash key: {hash_key}") + db.r.set("hashing_key", hash_key) + else: + hash_key = hash_key.decode("ascii") + log.debug(f"Decoded hash key: {hash_key}") + return hash_key + + +hash_key = get_hash_key() + + +@asyncio.coroutine +async def spawn_processing_threads(data): + len_data = len(data) + log.debug(f"Spawning processing threads for batch of {len_data} messages") + + loop = asyncio.get_event_loop() + tasks = [] + + if len(data) < CPU_THREADS: + split_data = [data] + else: + msg_per_core = int(len(data) / CPU_THREADS) + split_data = array_split(data, ceil(len(data) / msg_per_core)) + for index, split in enumerate(split_data): + log.debug(f"Delegating processing of {len(split)} messages to thread {index}") + task = loop.run_in_executor(p, process_data, data) + tasks.append(task) + + results = [await task for task in tasks] + log.debug(f"Results from processing of {len_data} messages: {len(results)}") + + # Join the results back from the split list + flat_list = [item for sublist in results for item in sublist] + await db.store_kafka_batch(flat_list) + + log.debug(f"Finished processing {len_data} messages") + + +def process_data(data): + to_store = [] + + # Initialise sentiment analyser + analyzer = SentimentIntensityAnalyzer() + for msg in data: + if msg["src"] == "4ch": + board = msg["net"] + thread = msg["channel"] + + # Calculate hash for post + post_normalised = orjson.dumps(msg, option=orjson.OPT_SORT_KEYS) + hash = siphash(hash_key, post_normalised) + hash = str(hash) + redis_key = f"cache.{board}.{thread}.{msg['no']}" + key_content = db.r.get(redis_key) + if key_content: + key_content = key_content.decode("ascii") + if key_content == hash: + # This deletes the message since the append at the end won't be hit + continue + else: + msg["type"] = "update" + db.r.set(redis_key, hash) + for key2, value in list(msg.items()): + if key2 in ATTRMAP: + msg[ATTRMAP[key2]] = msg[key2] + del msg[key2] + if "ts" in msg: + old_time = msg["ts"] + # '08/30/22(Tue)02:25:37' + time_spl = old_time.split(":") + if len(time_spl) == 3: + old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S") + else: + old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M") + # new_ts = old_ts.isoformat() + new_ts = int(old_ts.timestamp()) + msg["ts"] = new_ts + else: + raise Exception("No TS in msg") + if "msg" in msg: + soup = BeautifulSoup(msg["msg"], "html.parser") + msg_str = soup.get_text(separator="\n") + msg["msg"] = msg_str + # Annotate sentiment/NLP + if "msg" in msg: + # Language + text = Text(msg["msg"]) + try: + lang_code = text.language.code + lang_name = text.language.name + msg["lang_code"] = lang_code + msg["lang_name"] = lang_name + except cld2_error as e: + log.error(f"Error detecting language: {e}") + # So below block doesn't fail + lang_code = None + + # Blatant discrimination + if lang_code == "en": + + # Sentiment + vs = analyzer.polarity_scores(str(msg["msg"])) + addendum = vs["compound"] + msg["sentiment"] = addendum + + # Tokens + n = nlp(msg["msg"]) + for tag in TAGS: + tag_name = tag.lower() + tags_flag = [token.lemma_ for token in n if token.pos_ == tag] + msg[f"words_{tag_name}"] = tags_flag + + # Add the mutated message to the return buffer + to_store.append(msg) + return to_store diff --git a/requirements.txt b/requirements.txt index 860086f..d020596 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,7 +5,18 @@ redis siphashc aiohttp[speedups] python-dotenv -manticoresearch +#manticoresearch numpy ujson aioredis[hiredis] +aiokafka +vaderSentiment +polyglot +pyicu +pycld2 +morfessor +six +nltk +spacy +python-Levenshtein +orjson diff --git a/sources/ch4.py b/sources/ch4.py index 54038ea..4ece35f 100644 --- a/sources/ch4.py +++ b/sources/ch4.py @@ -2,41 +2,30 @@ import asyncio import random import string -from concurrent.futures import ProcessPoolExecutor -from datetime import datetime from math import ceil import aiohttp -import ujson -from bs4 import BeautifulSoup from numpy import array_split -from siphashc import siphash import db import util -from schemas.ch4_s import ATTRMAP # CONFIGURATION # # Number of 4chan threads to request at once -THREADS_CONCURRENT = 100 +THREADS_CONCURRENT = 1000 # Seconds to wait between every THREADS_CONCURRENT requests -THREADS_DELAY = 0.8 +THREADS_DELAY = 0.1 # Seconds to wait between crawls CRAWL_DELAY = 5 # Semaphore value ? -THREADS_SEMAPHORE = 100 - -# Maximum number of CPU threads to use for post processing -CPU_THREADS = 1 +THREADS_SEMAPHORE = 1000 # CONFIGURATION END # -p = ProcessPoolExecutor(CPU_THREADS) - class Chan4(object): """ @@ -83,10 +72,12 @@ class Chan4(object): self.log.debug(f"Got boards: {self.boards}") async def get_thread_lists(self, boards): - self.log.debug(f"Getting thread list for {boards}") + # self.log.debug(f"Getting thread list for {boards}") board_urls = {board: f"{board}/catalog.json" for board in boards} responses = await self.api_call(board_urls) to_get = [] + flat_map = [board for board, thread in responses] + self.log.debug(f"Got thread list for {flat_map}: {len(responses)}") for mapped, response in responses: if not response: continue @@ -95,7 +86,6 @@ class Chan4(object): no = threads["no"] to_get.append((mapped, no)) - self.log.info(f"Got thread list for {mapped}: {len(response)}") if not to_get: return split_threads = array_split(to_get, ceil(len(to_get) / THREADS_CONCURRENT)) @@ -122,96 +112,36 @@ class Chan4(object): (board, thread): f"{board}/thread/{thread}.json" for board, thread in thread_list } - self.log.debug(f"Getting information for threads: {thread_urls}") + # self.log.debug(f"Getting information for threads: {thread_urls}") responses = await self.api_call(thread_urls) - self.log.debug(f"Got information for threads: {thread_urls}") + self.log.debug(f"Got information for {len(responses)} threads") + all_posts = {} for mapped, response in responses: if not response: continue board, thread = mapped - self.log.debug(f"Got thread content for thread {thread} on board {board}") all_posts[mapped] = response["posts"] - # Split into 10,000 chunks if not all_posts: return - threads_per_core = int(len(all_posts) / CPU_THREADS) - for i in range(CPU_THREADS): - new_dict = {} - pulled_posts = self.take_items(all_posts, threads_per_core) - for k, v in pulled_posts: - if k in new_dict: - new_dict[k].append(v) - else: - new_dict[k] = [v] - await self.handle_posts_thread(new_dict) - # print("VAL", ceil(len(all_posts) / threads_per_core)) - # split_posts = array_split(all_posts, ceil(len(all_posts) / threads_per_core)) - # print("THREADS PER CORE SPLIT", len(split_posts)) - # # print("SPLIT CHUNK", len(split_posts)) - # for posts in split_posts: - # print("SPAWNED THREAD TO PROCESS", len(posts), "POSTS") - # await self.handle_posts_thread(posts) + await self.handle_posts(all_posts) - # await self.handle_posts_thread(all_posts) - - @asyncio.coroutine - def handle_posts_thread(self, posts): - loop = asyncio.get_event_loop() - yield from loop.run_in_executor(p, self.handle_posts, posts) - - def handle_posts(self, posts): + async def handle_posts(self, posts): to_store = [] for key, post_list in posts.items(): board, thread = key for index, post in enumerate(post_list): posts[key][index]["type"] = "msg" - # Calculate hash for post - post_normalised = ujson.dumps(post, sort_keys=True) - hash = siphash(self.hash_key, post_normalised) - hash = str(hash) - redis_key = f"cache.{board}.{thread}.{post['no']}" - key_content = db.r.get(redis_key) - if key_content: - key_content = key_content.decode("ascii") - if key_content == hash: - continue - else: - posts[key][index]["type"] = "update" - db.r.set(redis_key, hash) - - for key2, value in list(post.items()): - if key2 in ATTRMAP: - post[ATTRMAP[key2]] = posts[key][index][key2] - del posts[key][index][key2] - if "ts" in post: - old_time = posts[key][index]["ts"] - # '08/30/22(Tue)02:25:37' - time_spl = old_time.split(":") - if len(time_spl) == 3: - old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S") - else: - old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M") - # new_ts = old_ts.isoformat() - new_ts = int(old_ts.timestamp()) - posts[key][index]["ts"] = new_ts - if "msg" in post: - soup = BeautifulSoup(posts[key][index]["msg"], "html.parser") - msg = soup.get_text(separator="\n") - posts[key][index]["msg"] = msg - posts[key][index]["src"] = "4ch" posts[key][index]["net"] = board posts[key][index]["channel"] = thread to_store.append(posts[key][index]) - # print({name_map[name]: val for name, val in post.items()}) - # print(f"Got posts: {len(posts)}") if to_store: - db.store_message_bulk(to_store) + await db.queue_message_bulk(to_store) async def fetch(self, url, session, mapped): async with session.get(url) as response: @@ -235,7 +165,7 @@ class Chan4(object): async with aiohttp.ClientSession(connector=connector) as session: for mapped, method in methods.items(): url = f"{self.api_endpoint}/{method}" - self.log.debug(f"GET {url}") + # self.log.debug(f"GET {url}") task = asyncio.create_task(self.bound_fetch(sem, url, session, mapped)) # task = asyncio.ensure_future(self.bound_fetch(sem, url, session)) tasks.append(task) diff --git a/sources/dis.py b/sources/dis.py index 0b8b2f6..c187cf7 100644 --- a/sources/dis.py +++ b/sources/dis.py @@ -41,4 +41,4 @@ class DiscordClient(discord.Client): a["type"] = "msg" a["src"] = "dis" - db.store_message(a) + await db.queue_message(a) diff --git a/sources/ingest.py b/sources/ingest.py index 083a085..017b8db 100644 --- a/sources/ingest.py +++ b/sources/ingest.py @@ -1,15 +1,20 @@ import asyncio -import ujson +import orjson import db import util +from processing import process -SOURCES = ["irc"] +SOURCES = ["4ch", "irc", "dis"] KEYPREFIX = "queue." -CHUNK_SIZE = 1000 + +# Chunk size per source (divide by len(SOURCES) for total) +CHUNK_SIZE = 9000 ITER_DELAY = 0.5 +log = util.get_logger("ingest") + class Ingest(object): def __init__(self): @@ -18,19 +23,18 @@ class Ingest(object): async def run(self): while True: - await self.process_chunk() + await self.get_chunk() await asyncio.sleep(ITER_DELAY) - async def process_chunk(self): + async def get_chunk(self): items = [] for source in SOURCES: key = f"{KEYPREFIX}{source}" chunk = await db.ar.spop(key, CHUNK_SIZE) if not chunk: continue - self.log.info(f"Got chunk: {chunk}") for item in chunk: - item = ujson.loads(item) - self.log.info(f"Got item: {item}") + item = orjson.loads(item) items.append(item) - db.store_message_bulk(items) + if items: + await process.spawn_processing_threads(items) diff --git a/util.py b/util.py index 09f98da..045c95f 100644 --- a/util.py +++ b/util.py @@ -3,7 +3,7 @@ import logging log = logging.getLogger("util") -debug = False +debug = True # Color definitions BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)