Implement indexing into Apache Druid #1

Closed
m wants to merge 263 commits from druid into master
12 changed files with 532 additions and 0 deletions
Showing only changes of commit 36de004ee5 - Show all commits

16
db.py Normal file
View File

@ -0,0 +1,16 @@
from redis import StrictRedis
import util
log = util.get_logger("db")
def store_message(msg):
"""
Store a message into Manticore
:param msg: dict
"""
log.debug(f"store_message() {msg}")
r = StrictRedis(unix_socket_path="/var/run/redis/redis.sock", db=0)

31
docker-compose.yml Normal file
View File

@ -0,0 +1,31 @@
version: "2"
services:
app:
image: pathogen/monolith:latest
build: ./docker
volumes:
- ${PORTAINER_GIT_DIR}:/code
env_file:
- .env
volumes_from:
- tmp
tmp:
image: busybox
command: chmod -R 777 /var/run/redis
volumes:
- /var/run/redis
redis:
image: redis
command: redis-server /etc/redis.conf
volumes:
- ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf
volumes_from:
- tmp
networks:
default:
external:
name: pathogen

23
docker/Dockerfile Normal file
View File

@ -0,0 +1,23 @@
# syntax=docker/dockerfile:1
FROM python:3
RUN useradd -d /code pathogen
RUN mkdir /code
RUN chown pathogen:pathogen /code
RUN mkdir /venv
RUN chown pathogen:pathogen /venv
USER pathogen
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
WORKDIR /code
COPY requirements.txt /code/
COPY discord-patched.tgz /code/
RUN python -m venv /venv
RUN . /venv/bin/activate && pip install -r requirements.txt
RUN tar xf /code/discord-patched.tgz -C /venv/lib/python3.10/site-packages
CMD . /venv/bin/activate && exec python monolith.py

BIN
docker/discord-patched.tgz Normal file

Binary file not shown.

View File

@ -0,0 +1,31 @@
version: "2"
services:
app:
image: pathogen/monolith:latest
build: ./docker
volumes:
- ${PORTAINER_GIT_DIR}:/code
env_file:
- .env
volumes_from:
- tmp
tmp:
image: busybox
command: chmod -R 777 /var/run/redis
volumes:
- /var/run/redis
redis:
image: redis
command: redis-server /etc/redis.conf
volumes:
- ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf
volumes_from:
- tmp
networks:
default:
external:
name: pathogen

2
docker/redis.conf Normal file
View File

@ -0,0 +1,2 @@
unixsocket /var/run/redis/redis.sock
unixsocketperm 777

7
docker/requirements.txt Normal file
View File

@ -0,0 +1,7 @@
wheel
treq
beautifulsoup4
redis
siphashc
aiohttp
python-dotenv

67
monolith.py Normal file
View File

@ -0,0 +1,67 @@
import asyncio
import signal
import sys
from os import getenv
from twisted.internet import asyncioreactor
import util
from sources.ch4 import Chan4
from sources.dis import DiscordClient
loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
# asyncioreactor.install(asyncio.new_event_loop())
asyncioreactor.install(loop) # noqa
from twisted.internet import reactor, task # noqa
# Doesn't quite work but better than nothing
def stop(*args):
loop.stop()
reactor.stop()
sys.exit()
signal.signal(signal.SIGINT, stop)
# loop.add_signal_handler(signal.SIGINT, functools.partial(stop, loop))
# For development
if not getenv("DISCORD_TOKEN", None):
from dotenv import load_dotenv
load_dotenv()
log = util.get_logger("monolith")
modules_enabled = getenv("MODULES_ENABLED", False)
token = getenv("DISCORD_TOKEN", None)
if not token:
raise Exception("No Discord token provided")
async def start():
log.info("Starting Discord handler.")
client = DiscordClient(loop=loop)
loop.create_task(client.start(token))
log.info("Starting 4chan handler.")
chan = Chan4()
running = chan.run()
deferred = task.ensureDeferred(running)
reactor.callLater(0.1, deferred.callback, "")
loop.create_task(start())
# reactor.run()
reactor.run()
try:
loop.run_forever()
except KeyboardInterrupt:
log.info("Process terminating")
finally:
loop.close()

8
requirements.txt Normal file
View File

@ -0,0 +1,8 @@
wheel
pre-commit
treq
beautifulsoup4
redis
siphashc
aiohttp
python-dotenv

216
sources/ch4.py Normal file
View File

@ -0,0 +1,216 @@
# Python modules can't start with a number...
import json
import random
import string
from datetime import datetime
from typing import Any, Dict
import treq
from bs4 import BeautifulSoup
from siphashc import siphash
from twisted.internet.defer import inlineCallbacks
import db
import util
class Chan4(object):
"""
4chan indexer, crawler and ingester.
"""
def __init__(self):
name = self.__class__.__name__
self.log = util.get_logger(name)
self.api_endpoint = "https://a.4cdn.org"
self.boards = []
self.thread_list = {}
self.thread_deferreds = []
self.log.info(f"Starting crawler bot to {self.api_endpoint}")
self.hash_key = db.r.get("hashing_key")
if not self.hash_key:
letters = string.ascii_lowercase
self.hash_key = "".join(random.choice(letters) for i in range(16))
self.log.debug(f"Created new hash key: {self.hash_key}")
db.r.set("hashing_key", self.hash_key)
else:
self.hash_key = self.hash_key.decode("ascii")
self.log.debug(f"Decoded hash key: {self.hash_key}")
@inlineCallbacks
def run(self):
yield self.get_board_list()
yield self.get_thread_lists()
yield self.get_thread_contents()
def get_board_list(self):
self.log.info("Getting board list")
response = self.api_call("boards.json")
response.addCallback(self.got_board_list)
return response
def got_board_list(self, board_list):
if board_list["success"]:
for board in board_list["response"]["boards"]:
self.boards.append(board["board"])
@inlineCallbacks
def get_thread_lists(self):
for board in self.boards:
yield self.get_thread_list(board)
# self.thread_deferreds.append(d)
# yield defer.gatherResults(self.thread_deferreds)
# self.thread_deferreds = []
# self.log.info("Finished getting thread lists")
@inlineCallbacks
def get_thread_contents(self):
for board in self.thread_list.keys():
for page in self.thread_list[board]:
for threads in page["threads"]:
no = threads["no"]
yield self.get_thread_content(board, no)
# self.content_deferreds.append(d)
# al = yield defer.gatherResults(self.content_deferreds)
# self.content_deferreds = []
# self.log.info("Finished getting content")
def get_thread_list(self, board):
self.log.info(f"Getting thread list for {board}")
response = self.api_call(f"{board}/catalog.json")
response.addCallback(self.got_thread_list, board)
return response
def got_thread_list(self, thread_list, board):
if thread_list["success"]:
self.thread_list[board] = thread_list["response"]
self.log.info(f"Got thread list for {board}: {len(thread_list)}")
def get_thread_content(self, board, thread):
self.log.info(f"Getting information for thread {thread} on board {board}")
response = self.api_call(f"{board}/thread/{thread}.json")
response.addCallback(self.got_thread_content, board, thread)
return response
def got_thread_content(self, thread_content, board, thread):
if thread_content["success"]:
self.log.info(f"Got thread content for thread {thread} on board {board}")
for post in thread_content["response"]["posts"]:
# print(post)
self.handle_post(board, thread, post)
else:
self.log.error(
(
f"Error fetching thread {thread} on board {board}: "
f"{thread_content['message']}"
)
)
def handle_post(self, board, thread, post):
name_map = {
"no": "msg_id",
"now": "ts",
"name": "user",
"trip": "nick",
"id": "nick_id",
"resto": "id_reply",
"com": "msg",
"ext": "file_ext",
"w": "file_w",
"h": "file_h",
"tn_w": "file_tn_w",
"tn_h": "file_tn_h",
"tim": "file_tim",
"fsize": "file_size",
"md5": "file_md5",
"filedeleted": "file_deleted",
"spoiler": "file_spoiler",
"custom_spoiler": "file_custom_spoiler",
"m_img": "file_m_img",
"time": "unix_time",
}
post["type"] = "msg"
# Calculate hash for post
post_normalised = json.dumps(post, sort_keys=True)
hash = siphash(self.hash_key, post_normalised)
hash = str(hash)
redis_key = f"cache.{board}.{thread}.{post['no']}"
key_content = db.r.get(redis_key)
if key_content:
key_content = key_content.decode("ascii")
if key_content == hash:
return
else:
post["type"] = "update"
db.r.set(redis_key, hash)
# Check if hash exists
# Store the hash
for key, value in list(post.items()):
if key in name_map:
post[name_map[key]] = post[key]
del post[key]
if "ts" in post:
old_time = post["ts"]
# '08/30/22(Tue)02:25:37'
time_spl = old_time.split(":")
if len(time_spl) == 3:
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S")
else:
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M")
new_ts = old_ts.isoformat()
post["ts"] = new_ts
if "msg" in post:
soup = BeautifulSoup(post["msg"], "html.parser")
msg = soup.get_text(separator="\n")
post["msg"] = msg
post["src"] = "4ch"
# print({name_map[name]: val for name, val in post.items()})
db.store_message(post)
@inlineCallbacks
def callback_api_call(self, response, result):
try:
text = yield response.content()
except: # noqa
self.log.error("Error with API call")
return
try:
result["response"] = json.loads(text)
except json.decoder.JSONDecodeError:
result["success"] = "ERROR"
result["message"] = "Error parsing JSON."
return result
result["status"] = response.code
if response.code == 200:
result["success"] = True
result["message"] = "OK"
else:
result["message"] = "API ERROR"
return result
def api_call(self, method: str):
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; rv:68.0) Gecko/20100101 Firefox/68.0"
)
}
url = f"{self.api_endpoint}/{method}"
self.log.debug(f"GET {url}")
response = treq.get(url, headers=headers)
result: Dict[str, Any] = {
"success": False,
"message": "Invalid Method",
"response": None,
"status": None,
}
response.addCallback(self.callback_api_call, result)
return response

62
sources/dis.py Normal file
View File

@ -0,0 +1,62 @@
#!/usr/bin/env python
from operator import attrgetter
import discord
import db
import util
ATTRMAP = {
"msg": "content",
"msg_id": "id",
"nick": "author.name",
"host": "author.discriminator",
"ident": "author.nick",
"time": "created_at",
"channel": "channel.name",
"channel_nsfw": "channel.nsfw",
"bot": "author.bot",
"user_id": "author.id",
"channel_id": "channel.id",
"net": "author.guild.name",
"net_id": "author.guild.id",
"guild_member_count": "author.guild.member_count",
"channel_category": "channel.category.name",
"channel_category_id": "channel.category.id",
"channel_category_nsfw": "channel.category.nsfw",
}
class DiscordClient(discord.Client):
def __init__(self, *args, **kwargs):
self.logger = None
self.did_something = False
name = self.__class__.__name__
self.log = util.get_logger(name)
super().__init__(*args, **kwargs)
async def on_ready(self):
self.log.info("Discord connection established.")
def recurse_dict(self, obj):
to_return = {}
for key, mapped in ATTRMAP.items():
try:
to_return[key] = attrgetter(mapped)(obj)
except AttributeError:
continue
return to_return
async def on_message(self, message):
if not message.content:
return
a = self.recurse_dict(message)
a["ts"] = a["time"].isoformat()
del a["time"]
a["type"] = "msg"
a["src"] = "dis"
db.store_message(a)

69
util.py Normal file
View File

@ -0,0 +1,69 @@
# Other library imports
import logging
log = logging.getLogger("util")
debug = True
# Color definitions
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
COLORS = {
"WARNING": YELLOW,
"INFO": WHITE,
"DEBUG": BLUE,
"CRITICAL": YELLOW,
"ERROR": RED,
}
RESET_SEQ = "\033[0m"
COLOR_SEQ = "\033[1;%dm"
BOLD_SEQ = "\033[1m"
def formatter_message(message, use_color=True):
if use_color:
message = message.replace("$RESET", RESET_SEQ).replace("$BOLD", BOLD_SEQ)
else:
message = message.replace("$RESET", "").replace("$BOLD", "")
return message
class ColoredFormatter(logging.Formatter):
def __init__(self, msg, use_color=True):
logging.Formatter.__init__(self, msg)
self.use_color = use_color
def format(self, record):
levelname = record.levelname
if self.use_color and levelname in COLORS:
levelname_color = (
COLOR_SEQ % (30 + COLORS[levelname]) + levelname + RESET_SEQ
)
record.levelname = levelname_color
return logging.Formatter.format(self, record)
def get_logger(name):
# Define the logging format
FORMAT = "%(asctime)s %(levelname)18s $BOLD%(name)13s$RESET - %(message)s"
COLOR_FORMAT = formatter_message(FORMAT, True)
color_formatter = ColoredFormatter(COLOR_FORMAT)
# formatter = logging.Formatter(
# Why is this so complicated?
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
# ch.setFormatter(formatter)
ch.setFormatter(color_formatter)
# Define the logger on the base class
log = logging.getLogger(name)
log.setLevel(logging.INFO)
if debug:
log.setLevel(logging.DEBUG)
ch.setLevel(logging.DEBUG)
# Add the handler and stop it being silly and printing everything twice
log.addHandler(ch)
log.propagate = False
return log