Fix merge conflict

This commit is contained in:
Mark Veidemanis 2022-09-16 17:45:24 +01:00
commit cb11ce9b12
14 changed files with 581 additions and 243 deletions

186
db.py
View File

@ -1,18 +1,15 @@
from math import ceil import random
import aioredis import aioredis
import manticoresearch import orjson
import ujson
from manticoresearch.rest import ApiException # Kafka
from numpy import array_split from aiokafka import AIOKafkaProducer
from redis import StrictRedis from redis import StrictRedis
import util import util
from schemas import mc_s
configuration = manticoresearch.Configuration(host="http://monolith-db-1:9308") # KAFKA_TOPIC = "msg"
api_client = manticoresearch.ApiClient(configuration)
api_instance = manticoresearch.IndexApi(api_client)
log = util.get_logger("db") log = util.get_logger("db")
@ -37,120 +34,77 @@ TYPES_MAIN = [
] ]
TYPES_META = ["who"] TYPES_META = ["who"]
TYPES_INT = ["conn", "highlight", "znc", "query", "self"] TYPES_INT = ["conn", "highlight", "znc", "query", "self"]
KEYPREFIX = "queue."
def store_message(msg): async def store_kafka_batch(data):
log.debug(f"Storing Kafka batch of {len(data)} messages")
producer = AIOKafkaProducer(bootstrap_servers="kafka:9092")
await producer.start()
batch = producer.create_batch()
for msg in data:
if msg["type"] in TYPES_MAIN:
index = "main"
# schema = mc_s.schema_main
elif msg["type"] in TYPES_META:
index = "meta"
# schema = mc_s.schema_meta
elif msg["type"] in TYPES_INT:
index = "internal"
# schema = mc_s.schema_int
KAFKA_TOPIC = index
# normalise fields
for key, value in list(msg.items()):
if value is None:
del msg[key]
# if key in schema:
# if isinstance(value, int):
# if schema[key].startswith("string") or schema[key].startswith(
# "text"
# ):
# msg[key] = str(value)
body = orjson.dumps(msg)
# orjson returns bytes
# body = str.encode(message)
if "ts" not in msg:
raise Exception("No TS in msg")
metadata = batch.append(key=None, value=body, timestamp=msg["ts"])
if metadata is None:
partitions = await producer.partitions_for(KAFKA_TOPIC)
partition = random.choice(tuple(partitions))
await producer.send_batch(batch, KAFKA_TOPIC, partition=partition)
log.debug(f"{batch.record_count()} messages sent to partition {partition}")
batch = producer.create_batch()
continue
partitions = await producer.partitions_for(KAFKA_TOPIC)
partition = random.choice(tuple(partitions))
await producer.send_batch(batch, KAFKA_TOPIC, partition=partition)
log.debug(f"{batch.record_count()} messages sent to partition {partition}")
await producer.stop()
async def queue_message(msg):
""" """
Store a message into Manticore Queue a message on the Redis buffer.
:param msg: dict
""" """
# Duplicated to avoid extra function call src = msg["src"]
if msg["type"] in TYPES_MAIN: message = orjson.dumps(msg)
index = "main"
schema = mc_s.schema_main
elif msg["type"] in TYPES_META:
index = "meta"
schema = mc_s.schema_meta
elif msg["type"] in TYPES_INT:
index = "internal"
schema = mc_s.schema_int
# normalise fields
for key, value in list(msg.items()):
if value is None:
del msg[key]
if key in schema:
if isinstance(value, int):
if schema[key].startswith("string") or schema[key].startswith("text"):
msg[key] = str(value)
body = [{"insert": {"index": index, "doc": msg}}] key = f"{KEYPREFIX}{src}"
body_post = "" # log.debug(f"Queueing single message of string length {len(message)}")
for item in body: await ar.sadd(key, message)
body_post += ujson.dumps(item)
body_post += "\n"
# print(body_post)
try:
# Bulk index operations
api_response = api_instance.bulk(body_post) # , async_req=True
# print(api_response)
except ApiException as e:
print("Exception when calling IndexApi->bulk: %s\n" % e)
print("ATTEMPT", body_post)
def store_message_bulk(data): async def queue_message_bulk(data):
""" """
Store a message into Manticore Queue multiple messages on the Redis buffer.
:param msg: dict
""" """
if not data: # log.debug(f"Queueing message batch of length {len(data)}")
return for msg in data:
# 10000: maximum inserts we can submit to src = msg["src"]
# Manticore as of Sept 2022 message = orjson.dumps(msg)
split_posts = array_split(data, ceil(len(data) / 10000))
for messages in split_posts:
total = []
for msg in messages:
# Duplicated to avoid extra function call (see above)
if msg["type"] in TYPES_MAIN:
index = "main"
schema = mc_s.schema_main
elif msg["type"] in TYPES_META:
index = "meta"
schema = mc_s.schema_meta
elif msg["type"] in TYPES_INT:
index = "internal"
schema = mc_s.schema_int
# normalise fields
for key, value in list(msg.items()):
if value is None:
del msg[key]
if key in schema:
if isinstance(value, int):
if schema[key].startswith("string") or schema[key].startswith(
"text"
):
msg[key] = str(value)
body = {"insert": {"index": index, "doc": msg}} key = f"{KEYPREFIX}{src}"
total.append(body) await ar.sadd(key, message)
body_post = ""
for item in total:
body_post += ujson.dumps(item)
body_post += "\n"
# print(body_post)
try:
# Bulk index operations
api_response = api_instance.bulk(body_post) # , async_req=True
print(api_response)
except ApiException as e:
print("Exception when calling IndexApi->bulk: %s\n" % e)
print("ATTEMPT", body_post)
def update_schema():
pass
def create_index(api_client):
util_instance = manticoresearch.UtilsApi(api_client)
schemas = {
"main": mc_s.schema_main,
"meta": mc_s.schema_meta,
"internal": mc_s.schema_int,
}
for name, schema in schemas.items():
schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()])
create_query = (
f"create table if not exists {name}({schema_types}) engine='columnar'"
)
print("Schema types", create_query)
util_instance.sql(create_query)
create_index(api_client)
update_schema()

View File

@ -1,5 +1,14 @@
version: "2" version: "2.2"
volumes:
metadata_data: {}
middle_var: {}
historical_var: {}
broker_var: {}
coordinator_var: {}
router_var: {}
druid_shared: {}
services: services:
app: app:
image: pathogen/monolith:latest image: pathogen/monolith:latest
@ -11,7 +20,15 @@ services:
volumes_from: volumes_from:
- tmp - tmp
depends_on: depends_on:
- db broker:
condition: service_started
kafka:
condition: service_healthy
tmp:
condition: service_started
redis:
condition: service_healthy
# - db
threshold: threshold:
image: pathogen/threshold:latest image: pathogen/threshold:latest
@ -33,34 +50,194 @@ services:
volumes_from: volumes_from:
- tmp - tmp
depends_on: depends_on:
- tmp tmp:
- redis condition: service_started
redis:
condition: service_healthy
db: # db:
#image: pathogen/manticore:kibana #image: pathogen/manticore:kibana
image: manticoresearch/manticore:latest # image: manticoresearch/manticore:latest
#build: #build:
# context: ./docker/manticore # context: ./docker/manticore
# args: # args:
# DEV: 1 # DEV: 1
restart: always # restart: always
turnilo:
container_name: turnilo
image: uchhatre/turnilo:latest
ports: ports:
- 9308 - 9093:9090
- 9312
- 9306
ulimits:
nproc: 65535
nofile:
soft: 65535
hard: 65535
memlock:
soft: -1
hard: -1
environment: environment:
- MCL=1 - DRUID_BROKER_URL=http://broker:8082
depends_on:
- broker
metabase:
container_name: metabase
image: metabase/metabase:latest
ports:
- 3001:3000
depends_on:
- broker
postgres:
container_name: postgres
image: postgres:latest
volumes: volumes:
- ./docker/data:/var/lib/manticore - metadata_data:/var/lib/postgresql/data
- ./docker/manticore.conf:/etc/manticoresearch/manticore.conf environment:
- POSTGRES_PASSWORD=FoolishPassword
- POSTGRES_USER=druid
- POSTGRES_DB=druid
# Need 3.5 or later for container nodes
zookeeper:
container_name: zookeeper
image: zookeeper:3.5
ports:
- "2181:2181"
environment:
- ZOO_MY_ID=1
kafka:
image: bitnami/kafka
depends_on:
- zookeeper
- broker
ports:
- 29092:29092
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
#KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ALLOW_PLAINTEXT_LISTENER: yes
# healthcheck:
# test: ["CMD-SHELL", "kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic main --describe"]
# interval: 2s
# timeout: 2s
# retries: 15
healthcheck:
test: ["CMD", "kafka-topics.sh", "--list", "--bootstrap-server", "kafka:9092"]
start_period: 15s
interval: 2s
timeout: 5s
retries: 30
coordinator:
image: apache/druid:0.23.0
container_name: coordinator
volumes:
- druid_shared:/opt/shared
- coordinator_var:/opt/druid/var
depends_on:
- zookeeper
- postgres
ports:
- "8081:8081"
command:
- coordinator
env_file:
- environment
broker:
image: apache/druid:0.23.0
container_name: broker
volumes:
- broker_var:/opt/druid/var
depends_on:
- zookeeper
- postgres
- coordinator
ports:
- "8082:8082"
command:
- broker
env_file:
- environment
historical:
image: apache/druid:0.23.0
container_name: historical
volumes:
- druid_shared:/opt/shared
- historical_var:/opt/druid/var
depends_on:
- zookeeper
- postgres
- coordinator
ports:
- "8083:8083"
command:
- historical
env_file:
- environment
middlemanager:
image: apache/druid:0.23.0
container_name: middlemanager
volumes:
- druid_shared:/opt/shared
- middle_var:/opt/druid/var
depends_on:
- zookeeper
- postgres
- coordinator
ports:
- "8091:8091"
- "8100-8105:8100-8105"
command:
- middleManager
env_file:
- environment
router:
image: apache/druid:0.23.0
container_name: router
volumes:
- router_var:/opt/druid/var
depends_on:
- zookeeper
- postgres
- coordinator
ports:
- "8888:8888"
command:
- router
env_file:
- environment
# db:
# #image: pathogen/manticore:kibana
# image: manticoresearch/manticore:dev
# #build:
# # context: ./docker/manticore
# # args:
# # DEV: 1
# restart: always
# ports:
# - 9308
# - 9312
# - 9306
# ulimits:
# nproc: 65535
# nofile:
# soft: 65535
# hard: 65535
# memlock:
# soft: -1
# hard: -1
# environment:
# - MCL=1
# volumes:
# - ./docker/data:/var/lib/manticore
# - ./docker/manticore.conf:/etc/manticoresearch/manticore.conf
tmp: tmp:
image: busybox image: busybox
@ -80,6 +257,11 @@ services:
- ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf - ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf
volumes_from: volumes_from:
- tmp - tmp
healthcheck:
test: "redis-cli -s /var/run/redis/redis.sock ping"
interval: 2s
timeout: 2s
retries: 15
networks: networks:
default: default:

View File

@ -16,7 +16,7 @@ COPY requirements.txt /code/
COPY discord-patched.tgz /code/ COPY discord-patched.tgz /code/
RUN python -m venv /venv RUN python -m venv /venv
RUN . /venv/bin/activate && pip install -r requirements.txt RUN . /venv/bin/activate && pip install -r requirements.txt && python -m spacy download en_core_web_sm
RUN tar xf /code/discord-patched.tgz -C /venv/lib/python3.10/site-packages RUN tar xf /code/discord-patched.tgz -C /venv/lib/python3.10/site-packages

View File

@ -4,7 +4,18 @@ redis
siphashc siphashc
aiohttp[speedups] aiohttp[speedups]
python-dotenv python-dotenv
manticoresearch #manticoresearch
numpy numpy
ujson ujson
aioredis[hiredis] aioredis[hiredis]
aiokafka
vaderSentiment
polyglot
pyicu
pycld2
morfessor
six
nltk
spacy
python-Levenshtein
orjson

52
environment Normal file
View File

@ -0,0 +1,52 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Java tuning
DRUID_XMX=1g
DRUID_XMS=1g
DRUID_MAXNEWSIZE=250m
DRUID_NEWSIZE=250m
DRUID_MAXDIRECTMEMORYSIZE=6172m
druid_emitter_logging_logLevel=debug
druid_extensions_loadList=["druid-histogram", "druid-datasketches", "druid-lookups-cached-global", "postgresql-metadata-storage", "druid-kafka-indexing-service"]
druid_zk_service_host=zookeeper
druid_metadata_storage_host=
druid_metadata_storage_type=postgresql
druid_metadata_storage_connector_connectURI=jdbc:postgresql://postgres:5432/druid
druid_metadata_storage_connector_user=druid
druid_metadata_storage_connector_password=FoolishPassword
druid_coordinator_balancer_strategy=cachingCost
druid_indexer_runner_javaOptsArray=["-server", "-Xmx1g", "-Xms1g", "-XX:MaxDirectMemorySize=3g", "-Duser.timezone=UTC", "-Dfile.encoding=UTF-8", "-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
druid_indexer_fork_property_druid_processing_buffer_sizeBytes=256MiB
druid_storage_type=local
druid_storage_storageDirectory=/opt/shared/segments
druid_indexer_logs_type=file
druid_indexer_logs_directory=/opt/shared/indexing-logs
druid_processing_numThreads=2
druid_processing_numMergeBuffers=2
DRUID_LOG4J=<?xml version="1.0" encoding="UTF-8" ?><Configuration status="WARN"><Appenders><Console name="Console" target="SYSTEM_OUT"><PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/></Console></Appenders><Loggers><Root level="info"><AppenderRef ref="Console"/></Root><Logger name="org.apache.druid.jetty.RequestLog" additivity="false" level="DEBUG"><AppenderRef ref="Console"/></Logger></Loggers></Configuration>

0
event_log.txt Normal file
View File

View File

@ -6,13 +6,6 @@ from sources.ch4 import Chan4
from sources.dis import DiscordClient from sources.dis import DiscordClient
from sources.ingest import Ingest from sources.ingest import Ingest
# For development
# if not getenv("DISCORD_TOKEN", None):
# print("Could not get Discord token, attempting load from .env")
# from dotenv import load_dotenv
# load_dotenv()
log = util.get_logger("monolith") log = util.get_logger("monolith")
modules_enabled = getenv("MODULES_ENABLED", False) modules_enabled = getenv("MODULES_ENABLED", False)
@ -26,7 +19,6 @@ async def main(loop):
log.info("Starting Discord handler.") log.info("Starting Discord handler.")
client = DiscordClient() client = DiscordClient()
loop.create_task(client.start(token)) loop.create_task(client.start(token))
# client.run(token)
log.info("Starting 4chan handler.") log.info("Starting 4chan handler.")
chan = Chan4() chan = Chan4()

0
processing/__init__.py Normal file
View File

202
processing/process.py Normal file
View File

@ -0,0 +1,202 @@
import asyncio
import os
import random
# For key generation
import string
# Squash errors
import warnings
from concurrent.futures import ProcessPoolExecutor
# For timestamp processing
from datetime import datetime
from math import ceil
import orjson
# Tokenisation
import spacy
# For 4chan message parsing
from bs4 import BeautifulSoup
from numpy import array_split
from polyglot.detect.base import logger as polyglot_logger
# For NLP
from polyglot.text import Text
from pycld2 import error as cld2_error
from siphashc import siphash
# For sentiment
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import db
import util
# 4chan schema
from schemas.ch4_s import ATTRMAP
# For tokenisation
# from gensim.parsing.preprocessing import (
# strip_tags,
# strip_punctuation,
# strip_numeric,
# stem_text,
# strip_multiple_whitespaces,
# strip_non_alphanum,
# remove_stopwords,
# strip_short,
# preprocess_string,
# )
# CUSTOM_FILTERS = [
# lambda x: x.lower(),
# strip_tags, #
# strip_punctuation, #
# strip_multiple_whitespaces,
# strip_numeric,
# remove_stopwords,
# strip_short,
# #stem_text,
# strip_non_alphanum, #
# ]
# Squash errors
polyglot_logger.setLevel("ERROR")
warnings.filterwarnings("ignore", category=UserWarning, module="bs4")
TAGS = ["NOUN", "ADJ", "VERB", "ADV"]
nlp = spacy.load("en_core_web_sm", disable=["parser", "ner"])
log = util.get_logger("process")
# Maximum number of CPU threads to use for post processing
CPU_THREADS = os.cpu_count()
p = ProcessPoolExecutor(CPU_THREADS)
def get_hash_key():
hash_key = db.r.get("hashing_key")
if not hash_key:
letters = string.ascii_lowercase
hash_key = "".join(random.choice(letters) for i in range(16))
log.debug(f"Created new hash key: {hash_key}")
db.r.set("hashing_key", hash_key)
else:
hash_key = hash_key.decode("ascii")
log.debug(f"Decoded hash key: {hash_key}")
return hash_key
hash_key = get_hash_key()
@asyncio.coroutine
async def spawn_processing_threads(data):
len_data = len(data)
log.debug(f"Spawning processing threads for batch of {len_data} messages")
loop = asyncio.get_event_loop()
tasks = []
if len(data) < CPU_THREADS:
split_data = [data]
else:
msg_per_core = int(len(data) / CPU_THREADS)
split_data = array_split(data, ceil(len(data) / msg_per_core))
for index, split in enumerate(split_data):
log.debug(f"Delegating processing of {len(split)} messages to thread {index}")
task = loop.run_in_executor(p, process_data, data)
tasks.append(task)
results = [await task for task in tasks]
log.debug(f"Results from processing of {len_data} messages: {len(results)}")
# Join the results back from the split list
flat_list = [item for sublist in results for item in sublist]
await db.store_kafka_batch(flat_list)
log.debug(f"Finished processing {len_data} messages")
def process_data(data):
to_store = []
# Initialise sentiment analyser
analyzer = SentimentIntensityAnalyzer()
for msg in data:
if msg["src"] == "4ch":
board = msg["net"]
thread = msg["channel"]
# Calculate hash for post
post_normalised = orjson.dumps(msg, option=orjson.OPT_SORT_KEYS)
hash = siphash(hash_key, post_normalised)
hash = str(hash)
redis_key = f"cache.{board}.{thread}.{msg['no']}"
key_content = db.r.get(redis_key)
if key_content:
key_content = key_content.decode("ascii")
if key_content == hash:
# This deletes the message since the append at the end won't be hit
continue
else:
msg["type"] = "update"
db.r.set(redis_key, hash)
for key2, value in list(msg.items()):
if key2 in ATTRMAP:
msg[ATTRMAP[key2]] = msg[key2]
del msg[key2]
if "ts" in msg:
old_time = msg["ts"]
# '08/30/22(Tue)02:25:37'
time_spl = old_time.split(":")
if len(time_spl) == 3:
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S")
else:
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M")
# new_ts = old_ts.isoformat()
new_ts = int(old_ts.timestamp())
msg["ts"] = new_ts
else:
raise Exception("No TS in msg")
if "msg" in msg:
soup = BeautifulSoup(msg["msg"], "html.parser")
msg_str = soup.get_text(separator="\n")
msg["msg"] = msg_str
# Annotate sentiment/NLP
if "msg" in msg:
# Language
text = Text(msg["msg"])
try:
lang_code = text.language.code
lang_name = text.language.name
msg["lang_code"] = lang_code
msg["lang_name"] = lang_name
except cld2_error as e:
log.error(f"Error detecting language: {e}")
# So below block doesn't fail
lang_code = None
# Blatant discrimination
if lang_code == "en":
# Sentiment
vs = analyzer.polarity_scores(str(msg["msg"]))
addendum = vs["compound"]
msg["sentiment"] = addendum
# Tokens
n = nlp(msg["msg"])
for tag in TAGS:
tag_name = tag.lower()
tags_flag = [token.lemma_ for token in n if token.pos_ == tag]
msg[f"words_{tag_name}"] = tags_flag
# Add the mutated message to the return buffer
to_store.append(msg)
return to_store

View File

@ -5,7 +5,18 @@ redis
siphashc siphashc
aiohttp[speedups] aiohttp[speedups]
python-dotenv python-dotenv
manticoresearch #manticoresearch
numpy numpy
ujson ujson
aioredis[hiredis] aioredis[hiredis]
aiokafka
vaderSentiment
polyglot
pyicu
pycld2
morfessor
six
nltk
spacy
python-Levenshtein
orjson

View File

@ -2,41 +2,30 @@
import asyncio import asyncio
import random import random
import string import string
from concurrent.futures import ProcessPoolExecutor
from datetime import datetime
from math import ceil from math import ceil
import aiohttp import aiohttp
import ujson
from bs4 import BeautifulSoup
from numpy import array_split from numpy import array_split
from siphashc import siphash
import db import db
import util import util
from schemas.ch4_s import ATTRMAP
# CONFIGURATION # # CONFIGURATION #
# Number of 4chan threads to request at once # Number of 4chan threads to request at once
THREADS_CONCURRENT = 100 THREADS_CONCURRENT = 1000
# Seconds to wait between every THREADS_CONCURRENT requests # Seconds to wait between every THREADS_CONCURRENT requests
THREADS_DELAY = 0.8 THREADS_DELAY = 0.1
# Seconds to wait between crawls # Seconds to wait between crawls
CRAWL_DELAY = 5 CRAWL_DELAY = 5
# Semaphore value ? # Semaphore value ?
THREADS_SEMAPHORE = 100 THREADS_SEMAPHORE = 1000
# Maximum number of CPU threads to use for post processing
CPU_THREADS = 1
# CONFIGURATION END # # CONFIGURATION END #
p = ProcessPoolExecutor(CPU_THREADS)
class Chan4(object): class Chan4(object):
""" """
@ -83,10 +72,12 @@ class Chan4(object):
self.log.debug(f"Got boards: {self.boards}") self.log.debug(f"Got boards: {self.boards}")
async def get_thread_lists(self, boards): async def get_thread_lists(self, boards):
self.log.debug(f"Getting thread list for {boards}") # self.log.debug(f"Getting thread list for {boards}")
board_urls = {board: f"{board}/catalog.json" for board in boards} board_urls = {board: f"{board}/catalog.json" for board in boards}
responses = await self.api_call(board_urls) responses = await self.api_call(board_urls)
to_get = [] to_get = []
flat_map = [board for board, thread in responses]
self.log.debug(f"Got thread list for {flat_map}: {len(responses)}")
for mapped, response in responses: for mapped, response in responses:
if not response: if not response:
continue continue
@ -95,7 +86,6 @@ class Chan4(object):
no = threads["no"] no = threads["no"]
to_get.append((mapped, no)) to_get.append((mapped, no))
self.log.info(f"Got thread list for {mapped}: {len(response)}")
if not to_get: if not to_get:
return return
split_threads = array_split(to_get, ceil(len(to_get) / THREADS_CONCURRENT)) split_threads = array_split(to_get, ceil(len(to_get) / THREADS_CONCURRENT))
@ -122,96 +112,36 @@ class Chan4(object):
(board, thread): f"{board}/thread/{thread}.json" (board, thread): f"{board}/thread/{thread}.json"
for board, thread in thread_list for board, thread in thread_list
} }
self.log.debug(f"Getting information for threads: {thread_urls}") # self.log.debug(f"Getting information for threads: {thread_urls}")
responses = await self.api_call(thread_urls) responses = await self.api_call(thread_urls)
self.log.debug(f"Got information for threads: {thread_urls}") self.log.debug(f"Got information for {len(responses)} threads")
all_posts = {} all_posts = {}
for mapped, response in responses: for mapped, response in responses:
if not response: if not response:
continue continue
board, thread = mapped board, thread = mapped
self.log.debug(f"Got thread content for thread {thread} on board {board}")
all_posts[mapped] = response["posts"] all_posts[mapped] = response["posts"]
# Split into 10,000 chunks
if not all_posts: if not all_posts:
return return
threads_per_core = int(len(all_posts) / CPU_THREADS) await self.handle_posts(all_posts)
for i in range(CPU_THREADS):
new_dict = {}
pulled_posts = self.take_items(all_posts, threads_per_core)
for k, v in pulled_posts:
if k in new_dict:
new_dict[k].append(v)
else:
new_dict[k] = [v]
await self.handle_posts_thread(new_dict)
# print("VAL", ceil(len(all_posts) / threads_per_core))
# split_posts = array_split(all_posts, ceil(len(all_posts) / threads_per_core))
# print("THREADS PER CORE SPLIT", len(split_posts))
# # print("SPLIT CHUNK", len(split_posts))
# for posts in split_posts:
# print("SPAWNED THREAD TO PROCESS", len(posts), "POSTS")
# await self.handle_posts_thread(posts)
# await self.handle_posts_thread(all_posts) async def handle_posts(self, posts):
@asyncio.coroutine
def handle_posts_thread(self, posts):
loop = asyncio.get_event_loop()
yield from loop.run_in_executor(p, self.handle_posts, posts)
def handle_posts(self, posts):
to_store = [] to_store = []
for key, post_list in posts.items(): for key, post_list in posts.items():
board, thread = key board, thread = key
for index, post in enumerate(post_list): for index, post in enumerate(post_list):
posts[key][index]["type"] = "msg" posts[key][index]["type"] = "msg"
# Calculate hash for post
post_normalised = ujson.dumps(post, sort_keys=True)
hash = siphash(self.hash_key, post_normalised)
hash = str(hash)
redis_key = f"cache.{board}.{thread}.{post['no']}"
key_content = db.r.get(redis_key)
if key_content:
key_content = key_content.decode("ascii")
if key_content == hash:
continue
else:
posts[key][index]["type"] = "update"
db.r.set(redis_key, hash)
for key2, value in list(post.items()):
if key2 in ATTRMAP:
post[ATTRMAP[key2]] = posts[key][index][key2]
del posts[key][index][key2]
if "ts" in post:
old_time = posts[key][index]["ts"]
# '08/30/22(Tue)02:25:37'
time_spl = old_time.split(":")
if len(time_spl) == 3:
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S")
else:
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M")
# new_ts = old_ts.isoformat()
new_ts = int(old_ts.timestamp())
posts[key][index]["ts"] = new_ts
if "msg" in post:
soup = BeautifulSoup(posts[key][index]["msg"], "html.parser")
msg = soup.get_text(separator="\n")
posts[key][index]["msg"] = msg
posts[key][index]["src"] = "4ch" posts[key][index]["src"] = "4ch"
posts[key][index]["net"] = board posts[key][index]["net"] = board
posts[key][index]["channel"] = thread posts[key][index]["channel"] = thread
to_store.append(posts[key][index]) to_store.append(posts[key][index])
# print({name_map[name]: val for name, val in post.items()})
# print(f"Got posts: {len(posts)}")
if to_store: if to_store:
db.store_message_bulk(to_store) await db.queue_message_bulk(to_store)
async def fetch(self, url, session, mapped): async def fetch(self, url, session, mapped):
async with session.get(url) as response: async with session.get(url) as response:
@ -235,7 +165,7 @@ class Chan4(object):
async with aiohttp.ClientSession(connector=connector) as session: async with aiohttp.ClientSession(connector=connector) as session:
for mapped, method in methods.items(): for mapped, method in methods.items():
url = f"{self.api_endpoint}/{method}" url = f"{self.api_endpoint}/{method}"
self.log.debug(f"GET {url}") # self.log.debug(f"GET {url}")
task = asyncio.create_task(self.bound_fetch(sem, url, session, mapped)) task = asyncio.create_task(self.bound_fetch(sem, url, session, mapped))
# task = asyncio.ensure_future(self.bound_fetch(sem, url, session)) # task = asyncio.ensure_future(self.bound_fetch(sem, url, session))
tasks.append(task) tasks.append(task)

View File

@ -41,4 +41,4 @@ class DiscordClient(discord.Client):
a["type"] = "msg" a["type"] = "msg"
a["src"] = "dis" a["src"] = "dis"
db.store_message(a) await db.queue_message(a)

View File

@ -1,15 +1,20 @@
import asyncio import asyncio
import ujson import orjson
import db import db
import util import util
from processing import process
SOURCES = ["irc"] SOURCES = ["4ch", "irc", "dis"]
KEYPREFIX = "queue." KEYPREFIX = "queue."
CHUNK_SIZE = 1000
# Chunk size per source (divide by len(SOURCES) for total)
CHUNK_SIZE = 9000
ITER_DELAY = 0.5 ITER_DELAY = 0.5
log = util.get_logger("ingest")
class Ingest(object): class Ingest(object):
def __init__(self): def __init__(self):
@ -18,19 +23,18 @@ class Ingest(object):
async def run(self): async def run(self):
while True: while True:
await self.process_chunk() await self.get_chunk()
await asyncio.sleep(ITER_DELAY) await asyncio.sleep(ITER_DELAY)
async def process_chunk(self): async def get_chunk(self):
items = [] items = []
for source in SOURCES: for source in SOURCES:
key = f"{KEYPREFIX}{source}" key = f"{KEYPREFIX}{source}"
chunk = await db.ar.spop(key, CHUNK_SIZE) chunk = await db.ar.spop(key, CHUNK_SIZE)
if not chunk: if not chunk:
continue continue
self.log.info(f"Got chunk: {chunk}")
for item in chunk: for item in chunk:
item = ujson.loads(item) item = orjson.loads(item)
self.log.info(f"Got item: {item}")
items.append(item) items.append(item)
db.store_message_bulk(items) if items:
await process.spawn_processing_threads(items)

View File

@ -3,7 +3,7 @@ import logging
log = logging.getLogger("util") log = logging.getLogger("util")
debug = False debug = True
# Color definitions # Color definitions
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8) BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)