Implement indexing into Apache Druid #1

Closed
m wants to merge 263 commits from druid into master
11 changed files with 203 additions and 338 deletions
Showing only changes of commit 143f2a0bf0 - Show all commits

198
db.py
View File

@ -1,28 +1,15 @@
import random
from math import ceil
import aioredis
import manticoresearch
import ujson
import orjson
# Kafka
from aiokafka import AIOKafkaProducer
from manticoresearch.rest import ApiException
from numpy import array_split
from redis import StrictRedis
import util
# Manticore schema
from schemas import mc_s
# Manticore
configuration = manticoresearch.Configuration(host="http://monolith-db-1:9308")
api_client = manticoresearch.ApiClient(configuration)
api_instance = manticoresearch.IndexApi(api_client)
# Kafka
from aiokafka import AIOKafkaProducer
KAFKA_TOPIC = "msg"
# KAFKA_TOPIC = "msg"
log = util.get_logger("db")
@ -51,103 +38,62 @@ KEYPREFIX = "queue."
async def store_kafka_batch(data):
print("STORING KAFKA BATCH")
log.debug(f"Storing Kafka batch of {len(data)} messages")
producer = AIOKafkaProducer(bootstrap_servers="kafka:9092")
await producer.start()
batch = producer.create_batch()
for msg in data:
if msg["type"] in TYPES_MAIN:
index = "main"
schema = mc_s.schema_main
# schema = mc_s.schema_main
elif msg["type"] in TYPES_META:
index = "meta"
schema = mc_s.schema_meta
# schema = mc_s.schema_meta
elif msg["type"] in TYPES_INT:
index = "internal"
schema = mc_s.schema_int
# schema = mc_s.schema_int
KAFKA_TOPIC = index
# normalise fields
for key, value in list(msg.items()):
if value is None:
del msg[key]
if key in schema:
if isinstance(value, int):
if schema[key].startswith("string") or schema[key].startswith(
"text"
):
msg[key] = str(value)
message = ujson.dumps(msg)
body = str.encode(message)
# if key in schema:
# if isinstance(value, int):
# if schema[key].startswith("string") or schema[key].startswith(
# "text"
# ):
# msg[key] = str(value)
body = orjson.dumps(msg)
# orjson returns bytes
# body = str.encode(message)
if "ts" not in msg:
# print("MSG WITHOUT TS", msg)
continue
raise Exception("No TS in msg")
metadata = batch.append(key=None, value=body, timestamp=msg["ts"])
if metadata is None:
partitions = await producer.partitions_for(KAFKA_TOPIC)
partition = random.choice(tuple(partitions))
await producer.send_batch(batch, KAFKA_TOPIC, partition=partition)
print(
"%d messages sent to partition %d" % (batch.record_count(), partition)
)
log.debug(f"{batch.record_count()} messages sent to partition {partition}")
batch = producer.create_batch()
continue
partitions = await producer.partitions_for(KAFKA_TOPIC)
partition = random.choice(tuple(partitions))
await producer.send_batch(batch, KAFKA_TOPIC, partition=partition)
print("%d messages sent to partition %d" % (batch.record_count(), partition))
log.debug(f"{batch.record_count()} messages sent to partition {partition}")
await producer.stop()
# def store_message(msg):
# """
# Store a message into Manticore
# :param msg: dict
# """
# store_kafka(msg)
# # Duplicated to avoid extra function call
# if msg["type"] in TYPES_MAIN:
# index = "main"
# schema = mc_s.schema_main
# elif msg["type"] in TYPES_META:
# index = "meta"
# schema = mc_s.schema_meta
# elif msg["type"] in TYPES_INT:
# index = "internal"
# schema = mc_s.schema_int
# # normalise fields
# for key, value in list(msg.items()):
# if value is None:
# del msg[key]
# if key in schema:
# if isinstance(value, int):
# if schema[key].startswith("string") or schema[key].startswith("text"):
# msg[key] = str(value)
# body = [{"insert": {"index": index, "doc": msg}}]
# body_post = ""
# for item in body:
# body_post += ujson.dumps(item)
# body_post += "\n"
# # print(body_post)
# try:
# # Bulk index operations
# print("FAKE POST")
# #api_response = api_instance.bulk(body_post) # , async_req=True
# # print(api_response)
# except ApiException as e:
# print("Exception when calling IndexApi->bulk: %s\n" % e)
# print("ATTEMPT", body_post)
async def queue_message(msg):
"""
Queue a message on the Redis buffer.
"""
src = msg["src"]
message = ujson.dumps(msg)
message = orjson.dumps(msg)
key = f"{KEYPREFIX}{src}"
# log.debug(f"Queueing single message of string length {len(message)}")
await ar.sadd(key, message)
@ -155,102 +101,10 @@ async def queue_message_bulk(data):
"""
Queue multiple messages on the Redis buffer.
"""
# log.debug(f"Queueing message batch of length {len(data)}")
for msg in data:
src = msg["src"]
message = ujson.dumps(msg)
message = orjson.dumps(msg)
key = f"{KEYPREFIX}{src}"
await ar.sadd(key, message)
# For now, make a normal function until we go full async
def queue_message_bulk_sync(data):
"""
Queue multiple messages on the Redis buffer.
"""
for msg in data:
src = msg["src"]
message = ujson.dumps(msg)
key = "{KEYPREFIX}{src}"
r.sadd(key, message)
# def store_message_bulk(data):
# """
# Store a message into Manticore
# :param msg: dict
# """
# if not data:
# return
# for msg in data:
# store_kafka(msg)
# # 10000: maximum inserts we can submit to
# # Manticore as of Sept 2022
# split_posts = array_split(data, ceil(len(data) / 10000))
# for messages in split_posts:
# total = []
# for msg in messages:
# # Duplicated to avoid extra function call (see above)
# if msg["type"] in TYPES_MAIN:
# index = "main"
# schema = mc_s.schema_main
# elif msg["type"] in TYPES_META:
# index = "meta"
# schema = mc_s.schema_meta
# elif msg["type"] in TYPES_INT:
# index = "internal"
# schema = mc_s.schema_int
# # normalise fields
# for key, value in list(msg.items()):
# if value is None:
# del msg[key]
# if key in schema:
# if isinstance(value, int):
# if schema[key].startswith("string") or schema[key].startswith(
# "text"
# ):
# msg[key] = str(value)
# body = {"insert": {"index": index, "doc": msg}}
# total.append(body)
# body_post = ""
# for item in total:
# body_post += ujson.dumps(item)
# body_post += "\n"
# # print(body_post)
# try:
# # Bulk index operations
# print("FAKE POST")
# #api_response = api_instance.bulk(body_post) # , async_req=True
# #print(api_response)
# except ApiException as e:
# print("Exception when calling IndexApi->bulk: %s\n" % e)
# print("ATTEMPT", body_post)
# def update_schema():
# pass
# def create_index(api_client):
# util_instance = manticoresearch.UtilsApi(api_client)
# schemas = {
# "main": mc_s.schema_main,
# "meta": mc_s.schema_meta,
# "internal": mc_s.schema_int,
# }
# for name, schema in schemas.items():
# schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()])
# create_query = (
# f"create table if not exists {name}({schema_types}) engine='columnar'"
# )
# print("Schema types", create_query)
# util_instance.sql(create_query)
# create_index(api_client)
# update_schema()

View File

@ -20,10 +20,14 @@ services:
volumes_from:
- tmp
depends_on:
- broker
- kafka
- tmp
- redis
broker:
condition: service_started
kafka:
condition: service_healthy
tmp:
condition: service_started
redis:
condition: service_healthy
# - db
threshold:
@ -46,8 +50,10 @@ services:
volumes_from:
- tmp
depends_on:
- tmp
- redis
tmp:
condition: service_started
redis:
condition: service_healthy
turnilo:
container_name: turnilo
@ -102,6 +108,17 @@ services:
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ALLOW_PLAINTEXT_LISTENER: yes
# healthcheck:
# test: ["CMD-SHELL", "kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic main --describe"]
# interval: 2s
# timeout: 2s
# retries: 15
healthcheck:
test: ["CMD", "kafka-topics.sh", "--list", "--bootstrap-server", "kafka:9092"]
start_period: 15s
interval: 2s
timeout: 5s
retries: 30
coordinator:
image: apache/druid:0.23.0
@ -230,6 +247,11 @@ services:
- ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf
volumes_from:
- tmp
healthcheck:
test: "redis-cli -s /var/run/redis/redis.sock ping"
interval: 2s
timeout: 2s
retries: 15
networks:
default:

View File

@ -16,7 +16,7 @@ COPY requirements.txt /code/
COPY discord-patched.tgz /code/
RUN python -m venv /venv
RUN . /venv/bin/activate && pip install -r requirements.txt
RUN . /venv/bin/activate && pip install -r requirements.txt && python -m spacy download en_core_web_sm
RUN tar xf /code/discord-patched.tgz -C /venv/lib/python3.10/site-packages

View File

@ -4,8 +4,18 @@ redis
siphashc
aiohttp[speedups]
python-dotenv
manticoresearch
#manticoresearch
numpy
ujson
aioredis[hiredis]
aiokafka
vaderSentiment
polyglot
pyicu
pycld2
morfessor
six
nltk
spacy
python-Levenshtein
orjson

0
event_log.txt Normal file
View File

View File

@ -1,19 +1,11 @@
import asyncio
from os import getenv
import db
import util
from sources.ch4 import Chan4
from sources.dis import DiscordClient
from sources.ingest import Ingest
# For development
# if not getenv("DISCORD_TOKEN", None):
# print("Could not get Discord token, attempting load from .env")
# from dotenv import load_dotenv
# load_dotenv()
log = util.get_logger("monolith")
modules_enabled = getenv("MODULES_ENABLED", False)

View File

@ -4,25 +4,73 @@ import random
# For key generation
import string
# Squash errors
import warnings
from concurrent.futures import ProcessPoolExecutor
# For timestamp processing
from datetime import datetime
from math import ceil
import ujson
import orjson
# Tokenisation
import spacy
# For 4chan message parsing
from bs4 import BeautifulSoup
from numpy import array_split
from polyglot.detect.base import logger as polyglot_logger
# For NLP
from polyglot.text import Text
from pycld2 import error as cld2_error
from siphashc import siphash
# For sentiment
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import db
import util
# 4chan schema
from schemas.ch4_s import ATTRMAP
# For tokenisation
# from gensim.parsing.preprocessing import (
# strip_tags,
# strip_punctuation,
# strip_numeric,
# stem_text,
# strip_multiple_whitespaces,
# strip_non_alphanum,
# remove_stopwords,
# strip_short,
# preprocess_string,
# )
# CUSTOM_FILTERS = [
# lambda x: x.lower(),
# strip_tags, #
# strip_punctuation, #
# strip_multiple_whitespaces,
# strip_numeric,
# remove_stopwords,
# strip_short,
# #stem_text,
# strip_non_alphanum, #
# ]
# Squash errors
polyglot_logger.setLevel("ERROR")
warnings.filterwarnings("ignore", category=UserWarning, module="bs4")
TAGS = ["NOUN", "ADJ", "VERB", "ADV"]
nlp = spacy.load("en_core_web_sm", disable=["parser", "ner"])
log = util.get_logger("process")
# Maximum number of CPU threads to use for post processing
@ -49,67 +97,44 @@ hash_key = get_hash_key()
@asyncio.coroutine
async def spawn_processing_threads(data):
len_data = len(data)
log.debug(f"Spawning processing threads for batch of {len_data} messages")
loop = asyncio.get_event_loop()
tasks = []
oldts = [x["now"] for x in data if "now" in x]
if len(data) < CPU_THREADS:
split_data = [data]
else:
msg_per_core = int(len(data) / CPU_THREADS)
print("MSG PER CORE", msg_per_core)
split_data = array_split(data, ceil(len(data) / msg_per_core))
for index, split in enumerate(split_data):
print("DELEGATING TO THREAD", len(split))
future = loop.run_in_executor(p, process_data, data)
# future = p.submit(process_data, split)
tasks.append(future)
# results = [x.result(timeout=50) for x in tasks]
results = await asyncio.gather(*tasks)
print("RESULTS", len(results))
log.debug(f"Delegating processing of {len(split)} messages to thread {index}")
task = loop.run_in_executor(p, process_data, data)
tasks.append(task)
results = [await task for task in tasks]
log.debug(f"Results from processing of {len_data} messages: {len(results)}")
# Join the results back from the split list
flat_list = [item for sublist in results for item in sublist]
print("LENFLAT", len(flat_list))
print("LENDATA", len(data))
newts = [x["ts"] for x in flat_list if "ts" in x]
print("lenoldts", len(oldts))
print("lennewts", len(newts))
allts = all(["ts" in x for x in flat_list])
print("ALLTS", allts)
alllen = [len(x) for x in flat_list]
print("ALLLEN", alllen)
await db.store_kafka_batch(flat_list)
# @asyncio.coroutine
# def process_data_thread(data):
# """
# Helper to spawn threads to process a list of data.
# """
# loop = asyncio.get_event_loop()
# if len(data) < CPU_THREADS:
# split_data = [data]
# else:
# msg_per_core = int(len(data) / CPU_THREADS)
# print("MSG PER CORE", msg_per_core)
# split_data = array_split(data, ceil(len(data) / msg_per_core))
# for index, split in enumerate(split_data):
# print("DELEGATING TO THREAD", len(split))
# #f = process_data_thread(split)
# yield loop.run_in_executor(p, process_data, data)
log.debug(f"Finished processing {len_data} messages")
def process_data(data):
print("PROCESS DATA START")
# to_store = []
for index, msg in enumerate(data):
# print("PROCESSING", msg)
to_store = []
# Initialise sentiment analyser
analyzer = SentimentIntensityAnalyzer()
for msg in data:
if msg["src"] == "4ch":
board = msg["net"]
thread = msg["channel"]
# Calculate hash for post
post_normalised = ujson.dumps(msg, sort_keys=True)
post_normalised = orjson.dumps(msg, option=orjson.OPT_SORT_KEYS)
hash = siphash(hash_key, post_normalised)
hash = str(hash)
redis_key = f"cache.{board}.{thread}.{msg['no']}"
@ -117,19 +142,17 @@ def process_data(data):
if key_content:
key_content = key_content.decode("ascii")
if key_content == hash:
del data[index]
# This deletes the message since the append at the end won't be hit
continue
else:
data[index]["type"] = "update"
msg["type"] = "update"
db.r.set(redis_key, hash)
if "now" not in data[index]:
print("NOW NOT IN INDEX", data[index])
for key2, value in list(data[index].items()):
for key2, value in list(msg.items()):
if key2 in ATTRMAP:
data[index][ATTRMAP[key2]] = data[index][key2]
del data[index][key2]
if "ts" in data[index]:
old_time = data[index]["ts"]
msg[ATTRMAP[key2]] = msg[key2]
del msg[key2]
if "ts" in msg:
old_time = msg["ts"]
# '08/30/22(Tue)02:25:37'
time_spl = old_time.split(":")
if len(time_spl) == 3:
@ -138,14 +161,42 @@ def process_data(data):
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M")
# new_ts = old_ts.isoformat()
new_ts = int(old_ts.timestamp())
data[index]["ts"] = new_ts
msg["ts"] = new_ts
else:
print("MSG WITHOUT TS PROCESS", data[index])
continue
raise Exception("No TS in msg")
if "msg" in msg:
soup = BeautifulSoup(data[index]["msg"], "html.parser")
msg = soup.get_text(separator="\n")
data[index]["msg"] = msg
# to_store.append(data[index])
print("FINISHED PROCESSING DATA")
return data
soup = BeautifulSoup(msg["msg"], "html.parser")
msg_str = soup.get_text(separator="\n")
msg["msg"] = msg_str
# Annotate sentiment/NLP
if "msg" in msg:
# Language
text = Text(msg["msg"])
try:
lang_code = text.language.code
lang_name = text.language.name
msg["lang_code"] = lang_code
msg["lang_name"] = lang_name
except cld2_error as e:
log.error(f"Error detecting language: {e}")
# So below block doesn't fail
lang_code = None
# Blatant discrimination
if lang_code == "en":
# Sentiment
vs = analyzer.polarity_scores(str(msg["msg"]))
addendum = vs["compound"]
msg["sentiment"] = addendum
# Tokens
n = nlp(msg["msg"])
for tag in TAGS:
tag_name = tag.lower()
tags_flag = [token.lemma_ for token in n if token.pos_ == tag]
msg[f"words_{tag_name}"] = tags_flag
# Add the mutated message to the return buffer
to_store.append(msg)
return to_store

View File

@ -5,8 +5,18 @@ redis
siphashc
aiohttp[speedups]
python-dotenv
manticoresearch
#manticoresearch
numpy
ujson
aioredis[hiredis]
aiokafka
vaderSentiment
polyglot
pyicu
pycld2
morfessor
six
nltk
spacy
python-Levenshtein
orjson

View File

@ -2,19 +2,13 @@
import asyncio
import random
import string
from concurrent.futures import ProcessPoolExecutor
from datetime import datetime
from math import ceil
import aiohttp
import ujson
from bs4 import BeautifulSoup
from numpy import array_split
from siphashc import siphash
import db
import util
from schemas.ch4_s import ATTRMAP
# CONFIGURATION #
@ -30,13 +24,8 @@ CRAWL_DELAY = 5
# Semaphore value ?
THREADS_SEMAPHORE = 1000
# Maximum number of CPU threads to use for post processing
CPU_THREADS = 8
# CONFIGURATION END #
p = ProcessPoolExecutor(CPU_THREADS)
class Chan4(object):
"""
@ -83,10 +72,12 @@ class Chan4(object):
self.log.debug(f"Got boards: {self.boards}")
async def get_thread_lists(self, boards):
self.log.debug(f"Getting thread list for {boards}")
# self.log.debug(f"Getting thread list for {boards}")
board_urls = {board: f"{board}/catalog.json" for board in boards}
responses = await self.api_call(board_urls)
to_get = []
flat_map = [board for board, thread in responses]
self.log.debug(f"Got thread list for {flat_map}: {len(responses)}")
for mapped, response in responses:
if not response:
continue
@ -95,7 +86,6 @@ class Chan4(object):
no = threads["no"]
to_get.append((mapped, no))
self.log.debug(f"Got thread list for {mapped}: {len(response)}")
if not to_get:
return
split_threads = array_split(to_get, ceil(len(to_get) / THREADS_CONCURRENT))
@ -122,46 +112,20 @@ class Chan4(object):
(board, thread): f"{board}/thread/{thread}.json"
for board, thread in thread_list
}
self.log.debug(f"Getting information for threads: {thread_urls}")
# self.log.debug(f"Getting information for threads: {thread_urls}")
responses = await self.api_call(thread_urls)
self.log.debug(f"Got information for threads: {thread_urls}")
self.log.debug(f"Got information for {len(responses)} threads")
all_posts = {}
for mapped, response in responses:
if not response:
continue
board, thread = mapped
self.log.debug(f"Got thread content for thread {thread} on board {board}")
all_posts[mapped] = response["posts"]
# Split into 10,000 chunks
if not all_posts:
return
await self.handle_posts(all_posts)
# threads_per_core = int(len(all_posts) / CPU_THREADS)
# for i in range(CPU_THREADS):
# new_dict = {}
# pulled_posts = self.take_items(all_posts, threads_per_core)
# for k, v in pulled_posts:
# if k in new_dict:
# new_dict[k].append(v)
# else:
# new_dict[k] = [v]
# await self.handle_posts_thread(new_dict)
# print("VAL", ceil(len(all_posts) / threads_per_core))
# split_posts = array_split(all_posts, ceil(len(all_posts) / threads_per_core))
# print("THREADS PER CORE SPLIT", len(split_posts))
# # print("SPLIT CHUNK", len(split_posts))
# for posts in split_posts:
# print("SPAWNED THREAD TO PROCESS", len(posts), "POSTS")
# await self.handle_posts_thread(posts)
# await self.handle_posts_thread(all_posts)
@asyncio.coroutine
def handle_posts_thread(self, posts):
loop = asyncio.get_event_loop()
yield from loop.run_in_executor(p, self.handle_posts, posts)
async def handle_posts(self, posts):
to_store = []
@ -170,50 +134,13 @@ class Chan4(object):
for index, post in enumerate(post_list):
posts[key][index]["type"] = "msg"
# # Calculate hash for post
# post_normalised = ujson.dumps(post, sort_keys=True)
# hash = siphash(self.hash_key, post_normalised)
# hash = str(hash)
# redis_key = f"cache.{board}.{thread}.{post['no']}"
# key_content = db.r.get(redis_key)
# if key_content:
# key_content = key_content.decode("ascii")
# if key_content == hash:
# continue
# else:
# posts[key][index]["type"] = "update"
# #db.r.set(redis_key, hash)
# for key2, value in list(post.items()):
# if key2 in ATTRMAP:
# post[ATTRMAP[key2]] = posts[key][index][key2]
# del posts[key][index][key2]
# if "ts" in post:
# old_time = posts[key][index]["ts"]
# # '08/30/22(Tue)02:25:37'
# time_spl = old_time.split(":")
# if len(time_spl) == 3:
# old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S")
# else:
# old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M")
# # new_ts = old_ts.isoformat()
# new_ts = int(old_ts.timestamp())
# posts[key][index]["ts"] = new_ts
# if "msg" in post:
# soup = BeautifulSoup(posts[key][index]["msg"], "html.parser")
# msg = soup.get_text(separator="\n")
# posts[key][index]["msg"] = msg
posts[key][index]["src"] = "4ch"
posts[key][index]["net"] = board
posts[key][index]["channel"] = thread
to_store.append(posts[key][index])
# print({name_map[name]: val for name, val in post.items()})
# print(f"Got posts: {len(posts)}")
if to_store:
print("STORING", len(to_store))
await db.queue_message_bulk(to_store)
async def fetch(self, url, session, mapped):
@ -238,7 +165,7 @@ class Chan4(object):
async with aiohttp.ClientSession(connector=connector) as session:
for mapped, method in methods.items():
url = f"{self.api_endpoint}/{method}"
self.log.debug(f"GET {url}")
# self.log.debug(f"GET {url}")
task = asyncio.create_task(self.bound_fetch(sem, url, session, mapped))
# task = asyncio.ensure_future(self.bound_fetch(sem, url, session))
tasks.append(task)

View File

@ -1,6 +1,6 @@
import asyncio
import ujson
import orjson
import db
import util
@ -8,9 +8,13 @@ from processing import process
SOURCES = ["4ch", "irc", "dis"]
KEYPREFIX = "queue."
CHUNK_SIZE = 90000
# Chunk size per source (divide by len(SOURCES) for total)
CHUNK_SIZE = 9000
ITER_DELAY = 0.5
log = util.get_logger("ingest")
class Ingest(object):
def __init__(self):
@ -18,8 +22,6 @@ class Ingest(object):
self.log = util.get_logger(name)
async def run(self):
# items = [{'no': 23567753, 'now': '09/12/22(Mon)20:10:29', 'name': 'Anonysmous', 'filename': '1644986767568', 'ext': '.webm', 'w': 1280, 'h': 720, 'tn_w': 125, 'tn_h': 70, 'tim': 1663027829301457, 'time': 1663027829, 'md5': 'zeElr1VR05XpZ2XuAPhmPA==', 'fsize': 3843621, 'resto': 23554700, 'type': 'msg', 'src': '4ch', 'net': 'gif', 'channel': '23554700'}]
# await process.spawn_processing_threads(items)
while True:
await self.get_chunk()
await asyncio.sleep(ITER_DELAY)
@ -31,11 +33,8 @@ class Ingest(object):
chunk = await db.ar.spop(key, CHUNK_SIZE)
if not chunk:
continue
# self.log.info(f"Got chunk: {chunk}")
for item in chunk:
item = ujson.loads(item)
# self.log.info(f"Got item: {item}")
item = orjson.loads(item)
items.append(item)
if items:
print("PROCESSING", len(items))
await process.spawn_processing_threads(items)

View File

@ -3,7 +3,7 @@ import logging
log = logging.getLogger("util")
debug = False
debug = True
# Color definitions
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)