commit 734a2b78798fce0f3074d37c0ddafed6d4f767ee Author: Mark Veidemanis Date: Fri Sep 2 22:30:45 2022 +0100 Implement running Discord and 4chan gathering simultaneously diff --git a/db.py b/db.py new file mode 100644 index 0000000..05148b0 --- /dev/null +++ b/db.py @@ -0,0 +1,16 @@ +from redis import StrictRedis + +import util + +log = util.get_logger("db") + + +def store_message(msg): + """ + Store a message into Manticore + :param msg: dict + """ + log.debug(f"store_message() {msg}") + + +r = StrictRedis(unix_socket_path="/var/run/redis/redis.sock", db=0) diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..298fa86 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,31 @@ +version: "2" + +services: + app: + image: pathogen/monolith:latest + build: ./docker + volumes: + - ${PORTAINER_GIT_DIR}:/code + env_file: + - .env + volumes_from: + - tmp + + tmp: + image: busybox + command: chmod -R 777 /var/run/redis + volumes: + - /var/run/redis + + redis: + image: redis + command: redis-server /etc/redis.conf + volumes: + - ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf + volumes_from: + - tmp + +networks: + default: + external: + name: pathogen \ No newline at end of file diff --git a/docker/Dockerfile b/docker/Dockerfile new file mode 100644 index 0000000..c133ace --- /dev/null +++ b/docker/Dockerfile @@ -0,0 +1,23 @@ +# syntax=docker/dockerfile:1 +FROM python:3 + +RUN useradd -d /code pathogen +RUN mkdir /code +RUN chown pathogen:pathogen /code + +RUN mkdir /venv +RUN chown pathogen:pathogen /venv + +USER pathogen +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONUNBUFFERED=1 +WORKDIR /code +COPY requirements.txt /code/ +COPY discord-patched.tgz /code/ + +RUN python -m venv /venv +RUN . /venv/bin/activate && pip install -r requirements.txt + +RUN tar xf /code/discord-patched.tgz -C /venv/lib/python3.10/site-packages + +CMD . /venv/bin/activate && exec python monolith.py \ No newline at end of file diff --git a/docker/discord-patched.tgz b/docker/discord-patched.tgz new file mode 100644 index 0000000..bb94643 Binary files /dev/null and b/docker/discord-patched.tgz differ diff --git a/docker/docker-compose.prod.yml b/docker/docker-compose.prod.yml new file mode 100644 index 0000000..298fa86 --- /dev/null +++ b/docker/docker-compose.prod.yml @@ -0,0 +1,31 @@ +version: "2" + +services: + app: + image: pathogen/monolith:latest + build: ./docker + volumes: + - ${PORTAINER_GIT_DIR}:/code + env_file: + - .env + volumes_from: + - tmp + + tmp: + image: busybox + command: chmod -R 777 /var/run/redis + volumes: + - /var/run/redis + + redis: + image: redis + command: redis-server /etc/redis.conf + volumes: + - ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf + volumes_from: + - tmp + +networks: + default: + external: + name: pathogen \ No newline at end of file diff --git a/docker/redis.conf b/docker/redis.conf new file mode 100644 index 0000000..46366bf --- /dev/null +++ b/docker/redis.conf @@ -0,0 +1,2 @@ +unixsocket /var/run/redis/redis.sock +unixsocketperm 777 \ No newline at end of file diff --git a/docker/requirements.txt b/docker/requirements.txt new file mode 100644 index 0000000..32f13b7 --- /dev/null +++ b/docker/requirements.txt @@ -0,0 +1,7 @@ +wheel +treq +beautifulsoup4 +redis +siphashc +aiohttp +python-dotenv diff --git a/monolith.py b/monolith.py new file mode 100644 index 0000000..498752d --- /dev/null +++ b/monolith.py @@ -0,0 +1,67 @@ +import asyncio +import signal +import sys +from os import getenv + +from twisted.internet import asyncioreactor + +import util +from sources.ch4 import Chan4 +from sources.dis import DiscordClient + +loop = asyncio.new_event_loop() +# asyncio.set_event_loop(loop) + + +# asyncioreactor.install(asyncio.new_event_loop()) +asyncioreactor.install(loop) # noqa +from twisted.internet import reactor, task # noqa + + +# Doesn't quite work but better than nothing +def stop(*args): + loop.stop() + reactor.stop() + sys.exit() + + +signal.signal(signal.SIGINT, stop) +# loop.add_signal_handler(signal.SIGINT, functools.partial(stop, loop)) +# For development +if not getenv("DISCORD_TOKEN", None): + from dotenv import load_dotenv + + load_dotenv() + +log = util.get_logger("monolith") + +modules_enabled = getenv("MODULES_ENABLED", False) + +token = getenv("DISCORD_TOKEN", None) +if not token: + raise Exception("No Discord token provided") + + +async def start(): + log.info("Starting Discord handler.") + client = DiscordClient(loop=loop) + loop.create_task(client.start(token)) + + log.info("Starting 4chan handler.") + chan = Chan4() + running = chan.run() + deferred = task.ensureDeferred(running) + reactor.callLater(0.1, deferred.callback, "") + + +loop.create_task(start()) + + +# reactor.run() +reactor.run() +try: + loop.run_forever() +except KeyboardInterrupt: + log.info("Process terminating") +finally: + loop.close() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..fec66d8 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +wheel +pre-commit +treq +beautifulsoup4 +redis +siphashc +aiohttp +python-dotenv diff --git a/sources/ch4.py b/sources/ch4.py new file mode 100644 index 0000000..2e3743f --- /dev/null +++ b/sources/ch4.py @@ -0,0 +1,216 @@ +# Python modules can't start with a number... +import json +import random +import string +from datetime import datetime +from typing import Any, Dict + +import treq +from bs4 import BeautifulSoup +from siphashc import siphash +from twisted.internet.defer import inlineCallbacks + +import db +import util + + +class Chan4(object): + """ + 4chan indexer, crawler and ingester. + """ + + def __init__(self): + name = self.__class__.__name__ + self.log = util.get_logger(name) + + self.api_endpoint = "https://a.4cdn.org" + self.boards = [] + self.thread_list = {} + + self.thread_deferreds = [] + + self.log.info(f"Starting crawler bot to {self.api_endpoint}") + + self.hash_key = db.r.get("hashing_key") + if not self.hash_key: + letters = string.ascii_lowercase + self.hash_key = "".join(random.choice(letters) for i in range(16)) + self.log.debug(f"Created new hash key: {self.hash_key}") + db.r.set("hashing_key", self.hash_key) + else: + + self.hash_key = self.hash_key.decode("ascii") + self.log.debug(f"Decoded hash key: {self.hash_key}") + + @inlineCallbacks + def run(self): + yield self.get_board_list() + yield self.get_thread_lists() + yield self.get_thread_contents() + + def get_board_list(self): + self.log.info("Getting board list") + response = self.api_call("boards.json") + response.addCallback(self.got_board_list) + return response + + def got_board_list(self, board_list): + if board_list["success"]: + for board in board_list["response"]["boards"]: + self.boards.append(board["board"]) + + @inlineCallbacks + def get_thread_lists(self): + for board in self.boards: + yield self.get_thread_list(board) + # self.thread_deferreds.append(d) + # yield defer.gatherResults(self.thread_deferreds) + # self.thread_deferreds = [] + # self.log.info("Finished getting thread lists") + + @inlineCallbacks + def get_thread_contents(self): + for board in self.thread_list.keys(): + for page in self.thread_list[board]: + for threads in page["threads"]: + no = threads["no"] + yield self.get_thread_content(board, no) + # self.content_deferreds.append(d) + # al = yield defer.gatherResults(self.content_deferreds) + # self.content_deferreds = [] + # self.log.info("Finished getting content") + + def get_thread_list(self, board): + self.log.info(f"Getting thread list for {board}") + response = self.api_call(f"{board}/catalog.json") + response.addCallback(self.got_thread_list, board) + return response + + def got_thread_list(self, thread_list, board): + if thread_list["success"]: + self.thread_list[board] = thread_list["response"] + self.log.info(f"Got thread list for {board}: {len(thread_list)}") + + def get_thread_content(self, board, thread): + self.log.info(f"Getting information for thread {thread} on board {board}") + response = self.api_call(f"{board}/thread/{thread}.json") + response.addCallback(self.got_thread_content, board, thread) + return response + + def got_thread_content(self, thread_content, board, thread): + if thread_content["success"]: + self.log.info(f"Got thread content for thread {thread} on board {board}") + for post in thread_content["response"]["posts"]: + # print(post) + self.handle_post(board, thread, post) + else: + self.log.error( + ( + f"Error fetching thread {thread} on board {board}: " + f"{thread_content['message']}" + ) + ) + + def handle_post(self, board, thread, post): + name_map = { + "no": "msg_id", + "now": "ts", + "name": "user", + "trip": "nick", + "id": "nick_id", + "resto": "id_reply", + "com": "msg", + "ext": "file_ext", + "w": "file_w", + "h": "file_h", + "tn_w": "file_tn_w", + "tn_h": "file_tn_h", + "tim": "file_tim", + "fsize": "file_size", + "md5": "file_md5", + "filedeleted": "file_deleted", + "spoiler": "file_spoiler", + "custom_spoiler": "file_custom_spoiler", + "m_img": "file_m_img", + "time": "unix_time", + } + post["type"] = "msg" + + # Calculate hash for post + post_normalised = json.dumps(post, sort_keys=True) + hash = siphash(self.hash_key, post_normalised) + hash = str(hash) + redis_key = f"cache.{board}.{thread}.{post['no']}" + key_content = db.r.get(redis_key) + if key_content: + key_content = key_content.decode("ascii") + if key_content == hash: + return + else: + post["type"] = "update" + db.r.set(redis_key, hash) + # Check if hash exists + # Store the hash + for key, value in list(post.items()): + if key in name_map: + post[name_map[key]] = post[key] + del post[key] + if "ts" in post: + old_time = post["ts"] + # '08/30/22(Tue)02:25:37' + time_spl = old_time.split(":") + if len(time_spl) == 3: + old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S") + else: + old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M") + new_ts = old_ts.isoformat() + post["ts"] = new_ts + if "msg" in post: + soup = BeautifulSoup(post["msg"], "html.parser") + msg = soup.get_text(separator="\n") + post["msg"] = msg + + post["src"] = "4ch" + + # print({name_map[name]: val for name, val in post.items()}) + db.store_message(post) + + @inlineCallbacks + def callback_api_call(self, response, result): + try: + text = yield response.content() + except: # noqa + self.log.error("Error with API call") + return + try: + result["response"] = json.loads(text) + except json.decoder.JSONDecodeError: + result["success"] = "ERROR" + result["message"] = "Error parsing JSON." + return result + result["status"] = response.code + if response.code == 200: + result["success"] = True + result["message"] = "OK" + else: + result["message"] = "API ERROR" + + return result + + def api_call(self, method: str): + headers = { + "User-Agent": ( + "Mozilla/5.0 (Windows NT 10.0; rv:68.0) Gecko/20100101 Firefox/68.0" + ) + } + url = f"{self.api_endpoint}/{method}" + self.log.debug(f"GET {url}") + response = treq.get(url, headers=headers) + result: Dict[str, Any] = { + "success": False, + "message": "Invalid Method", + "response": None, + "status": None, + } + response.addCallback(self.callback_api_call, result) + return response diff --git a/sources/dis.py b/sources/dis.py new file mode 100644 index 0000000..9991ccb --- /dev/null +++ b/sources/dis.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python +from operator import attrgetter + +import discord + +import db +import util + +ATTRMAP = { + "msg": "content", + "msg_id": "id", + "nick": "author.name", + "host": "author.discriminator", + "ident": "author.nick", + "time": "created_at", + "channel": "channel.name", + "channel_nsfw": "channel.nsfw", + "bot": "author.bot", + "user_id": "author.id", + "channel_id": "channel.id", + "net": "author.guild.name", + "net_id": "author.guild.id", + "guild_member_count": "author.guild.member_count", + "channel_category": "channel.category.name", + "channel_category_id": "channel.category.id", + "channel_category_nsfw": "channel.category.nsfw", +} + + +class DiscordClient(discord.Client): + def __init__(self, *args, **kwargs): + self.logger = None + self.did_something = False + name = self.__class__.__name__ + self.log = util.get_logger(name) + + super().__init__(*args, **kwargs) + + async def on_ready(self): + self.log.info("Discord connection established.") + + def recurse_dict(self, obj): + to_return = {} + for key, mapped in ATTRMAP.items(): + try: + to_return[key] = attrgetter(mapped)(obj) + except AttributeError: + continue + + return to_return + + async def on_message(self, message): + if not message.content: + return + + a = self.recurse_dict(message) + a["ts"] = a["time"].isoformat() + del a["time"] + a["type"] = "msg" + a["src"] = "dis" + + db.store_message(a) diff --git a/util.py b/util.py new file mode 100644 index 0000000..045c95f --- /dev/null +++ b/util.py @@ -0,0 +1,69 @@ +# Other library imports +import logging + +log = logging.getLogger("util") + +debug = True + +# Color definitions +BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8) +COLORS = { + "WARNING": YELLOW, + "INFO": WHITE, + "DEBUG": BLUE, + "CRITICAL": YELLOW, + "ERROR": RED, +} +RESET_SEQ = "\033[0m" +COLOR_SEQ = "\033[1;%dm" +BOLD_SEQ = "\033[1m" + + +def formatter_message(message, use_color=True): + if use_color: + message = message.replace("$RESET", RESET_SEQ).replace("$BOLD", BOLD_SEQ) + else: + message = message.replace("$RESET", "").replace("$BOLD", "") + return message + + +class ColoredFormatter(logging.Formatter): + def __init__(self, msg, use_color=True): + logging.Formatter.__init__(self, msg) + self.use_color = use_color + + def format(self, record): + levelname = record.levelname + if self.use_color and levelname in COLORS: + levelname_color = ( + COLOR_SEQ % (30 + COLORS[levelname]) + levelname + RESET_SEQ + ) + record.levelname = levelname_color + return logging.Formatter.format(self, record) + + +def get_logger(name): + + # Define the logging format + FORMAT = "%(asctime)s %(levelname)18s $BOLD%(name)13s$RESET - %(message)s" + COLOR_FORMAT = formatter_message(FORMAT, True) + color_formatter = ColoredFormatter(COLOR_FORMAT) + # formatter = logging.Formatter( + + # Why is this so complicated? + ch = logging.StreamHandler() + ch.setLevel(logging.INFO) + # ch.setFormatter(formatter) + ch.setFormatter(color_formatter) + + # Define the logger on the base class + log = logging.getLogger(name) + log.setLevel(logging.INFO) + if debug: + log.setLevel(logging.DEBUG) + ch.setLevel(logging.DEBUG) + + # Add the handler and stop it being silly and printing everything twice + log.addHandler(ch) + log.propagate = False + return log