Implement running Discord and 4chan gathering simultaneously
This commit is contained in:
commit
734a2b7879
|
@ -0,0 +1,16 @@
|
||||||
|
from redis import StrictRedis
|
||||||
|
|
||||||
|
import util
|
||||||
|
|
||||||
|
log = util.get_logger("db")
|
||||||
|
|
||||||
|
|
||||||
|
def store_message(msg):
|
||||||
|
"""
|
||||||
|
Store a message into Manticore
|
||||||
|
:param msg: dict
|
||||||
|
"""
|
||||||
|
log.debug(f"store_message() {msg}")
|
||||||
|
|
||||||
|
|
||||||
|
r = StrictRedis(unix_socket_path="/var/run/redis/redis.sock", db=0)
|
|
@ -0,0 +1,31 @@
|
||||||
|
version: "2"
|
||||||
|
|
||||||
|
services:
|
||||||
|
app:
|
||||||
|
image: pathogen/monolith:latest
|
||||||
|
build: ./docker
|
||||||
|
volumes:
|
||||||
|
- ${PORTAINER_GIT_DIR}:/code
|
||||||
|
env_file:
|
||||||
|
- .env
|
||||||
|
volumes_from:
|
||||||
|
- tmp
|
||||||
|
|
||||||
|
tmp:
|
||||||
|
image: busybox
|
||||||
|
command: chmod -R 777 /var/run/redis
|
||||||
|
volumes:
|
||||||
|
- /var/run/redis
|
||||||
|
|
||||||
|
redis:
|
||||||
|
image: redis
|
||||||
|
command: redis-server /etc/redis.conf
|
||||||
|
volumes:
|
||||||
|
- ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf
|
||||||
|
volumes_from:
|
||||||
|
- tmp
|
||||||
|
|
||||||
|
networks:
|
||||||
|
default:
|
||||||
|
external:
|
||||||
|
name: pathogen
|
|
@ -0,0 +1,23 @@
|
||||||
|
# syntax=docker/dockerfile:1
|
||||||
|
FROM python:3
|
||||||
|
|
||||||
|
RUN useradd -d /code pathogen
|
||||||
|
RUN mkdir /code
|
||||||
|
RUN chown pathogen:pathogen /code
|
||||||
|
|
||||||
|
RUN mkdir /venv
|
||||||
|
RUN chown pathogen:pathogen /venv
|
||||||
|
|
||||||
|
USER pathogen
|
||||||
|
ENV PYTHONDONTWRITEBYTECODE=1
|
||||||
|
ENV PYTHONUNBUFFERED=1
|
||||||
|
WORKDIR /code
|
||||||
|
COPY requirements.txt /code/
|
||||||
|
COPY discord-patched.tgz /code/
|
||||||
|
|
||||||
|
RUN python -m venv /venv
|
||||||
|
RUN . /venv/bin/activate && pip install -r requirements.txt
|
||||||
|
|
||||||
|
RUN tar xf /code/discord-patched.tgz -C /venv/lib/python3.10/site-packages
|
||||||
|
|
||||||
|
CMD . /venv/bin/activate && exec python monolith.py
|
Binary file not shown.
|
@ -0,0 +1,31 @@
|
||||||
|
version: "2"
|
||||||
|
|
||||||
|
services:
|
||||||
|
app:
|
||||||
|
image: pathogen/monolith:latest
|
||||||
|
build: ./docker
|
||||||
|
volumes:
|
||||||
|
- ${PORTAINER_GIT_DIR}:/code
|
||||||
|
env_file:
|
||||||
|
- .env
|
||||||
|
volumes_from:
|
||||||
|
- tmp
|
||||||
|
|
||||||
|
tmp:
|
||||||
|
image: busybox
|
||||||
|
command: chmod -R 777 /var/run/redis
|
||||||
|
volumes:
|
||||||
|
- /var/run/redis
|
||||||
|
|
||||||
|
redis:
|
||||||
|
image: redis
|
||||||
|
command: redis-server /etc/redis.conf
|
||||||
|
volumes:
|
||||||
|
- ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf
|
||||||
|
volumes_from:
|
||||||
|
- tmp
|
||||||
|
|
||||||
|
networks:
|
||||||
|
default:
|
||||||
|
external:
|
||||||
|
name: pathogen
|
|
@ -0,0 +1,2 @@
|
||||||
|
unixsocket /var/run/redis/redis.sock
|
||||||
|
unixsocketperm 777
|
|
@ -0,0 +1,7 @@
|
||||||
|
wheel
|
||||||
|
treq
|
||||||
|
beautifulsoup4
|
||||||
|
redis
|
||||||
|
siphashc
|
||||||
|
aiohttp
|
||||||
|
python-dotenv
|
|
@ -0,0 +1,67 @@
|
||||||
|
import asyncio
|
||||||
|
import signal
|
||||||
|
import sys
|
||||||
|
from os import getenv
|
||||||
|
|
||||||
|
from twisted.internet import asyncioreactor
|
||||||
|
|
||||||
|
import util
|
||||||
|
from sources.ch4 import Chan4
|
||||||
|
from sources.dis import DiscordClient
|
||||||
|
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
# asyncio.set_event_loop(loop)
|
||||||
|
|
||||||
|
|
||||||
|
# asyncioreactor.install(asyncio.new_event_loop())
|
||||||
|
asyncioreactor.install(loop) # noqa
|
||||||
|
from twisted.internet import reactor, task # noqa
|
||||||
|
|
||||||
|
|
||||||
|
# Doesn't quite work but better than nothing
|
||||||
|
def stop(*args):
|
||||||
|
loop.stop()
|
||||||
|
reactor.stop()
|
||||||
|
sys.exit()
|
||||||
|
|
||||||
|
|
||||||
|
signal.signal(signal.SIGINT, stop)
|
||||||
|
# loop.add_signal_handler(signal.SIGINT, functools.partial(stop, loop))
|
||||||
|
# For development
|
||||||
|
if not getenv("DISCORD_TOKEN", None):
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
|
log = util.get_logger("monolith")
|
||||||
|
|
||||||
|
modules_enabled = getenv("MODULES_ENABLED", False)
|
||||||
|
|
||||||
|
token = getenv("DISCORD_TOKEN", None)
|
||||||
|
if not token:
|
||||||
|
raise Exception("No Discord token provided")
|
||||||
|
|
||||||
|
|
||||||
|
async def start():
|
||||||
|
log.info("Starting Discord handler.")
|
||||||
|
client = DiscordClient(loop=loop)
|
||||||
|
loop.create_task(client.start(token))
|
||||||
|
|
||||||
|
log.info("Starting 4chan handler.")
|
||||||
|
chan = Chan4()
|
||||||
|
running = chan.run()
|
||||||
|
deferred = task.ensureDeferred(running)
|
||||||
|
reactor.callLater(0.1, deferred.callback, "")
|
||||||
|
|
||||||
|
|
||||||
|
loop.create_task(start())
|
||||||
|
|
||||||
|
|
||||||
|
# reactor.run()
|
||||||
|
reactor.run()
|
||||||
|
try:
|
||||||
|
loop.run_forever()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
log.info("Process terminating")
|
||||||
|
finally:
|
||||||
|
loop.close()
|
|
@ -0,0 +1,8 @@
|
||||||
|
wheel
|
||||||
|
pre-commit
|
||||||
|
treq
|
||||||
|
beautifulsoup4
|
||||||
|
redis
|
||||||
|
siphashc
|
||||||
|
aiohttp
|
||||||
|
python-dotenv
|
|
@ -0,0 +1,216 @@
|
||||||
|
# Python modules can't start with a number...
|
||||||
|
import json
|
||||||
|
import random
|
||||||
|
import string
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import Any, Dict
|
||||||
|
|
||||||
|
import treq
|
||||||
|
from bs4 import BeautifulSoup
|
||||||
|
from siphashc import siphash
|
||||||
|
from twisted.internet.defer import inlineCallbacks
|
||||||
|
|
||||||
|
import db
|
||||||
|
import util
|
||||||
|
|
||||||
|
|
||||||
|
class Chan4(object):
|
||||||
|
"""
|
||||||
|
4chan indexer, crawler and ingester.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
name = self.__class__.__name__
|
||||||
|
self.log = util.get_logger(name)
|
||||||
|
|
||||||
|
self.api_endpoint = "https://a.4cdn.org"
|
||||||
|
self.boards = []
|
||||||
|
self.thread_list = {}
|
||||||
|
|
||||||
|
self.thread_deferreds = []
|
||||||
|
|
||||||
|
self.log.info(f"Starting crawler bot to {self.api_endpoint}")
|
||||||
|
|
||||||
|
self.hash_key = db.r.get("hashing_key")
|
||||||
|
if not self.hash_key:
|
||||||
|
letters = string.ascii_lowercase
|
||||||
|
self.hash_key = "".join(random.choice(letters) for i in range(16))
|
||||||
|
self.log.debug(f"Created new hash key: {self.hash_key}")
|
||||||
|
db.r.set("hashing_key", self.hash_key)
|
||||||
|
else:
|
||||||
|
|
||||||
|
self.hash_key = self.hash_key.decode("ascii")
|
||||||
|
self.log.debug(f"Decoded hash key: {self.hash_key}")
|
||||||
|
|
||||||
|
@inlineCallbacks
|
||||||
|
def run(self):
|
||||||
|
yield self.get_board_list()
|
||||||
|
yield self.get_thread_lists()
|
||||||
|
yield self.get_thread_contents()
|
||||||
|
|
||||||
|
def get_board_list(self):
|
||||||
|
self.log.info("Getting board list")
|
||||||
|
response = self.api_call("boards.json")
|
||||||
|
response.addCallback(self.got_board_list)
|
||||||
|
return response
|
||||||
|
|
||||||
|
def got_board_list(self, board_list):
|
||||||
|
if board_list["success"]:
|
||||||
|
for board in board_list["response"]["boards"]:
|
||||||
|
self.boards.append(board["board"])
|
||||||
|
|
||||||
|
@inlineCallbacks
|
||||||
|
def get_thread_lists(self):
|
||||||
|
for board in self.boards:
|
||||||
|
yield self.get_thread_list(board)
|
||||||
|
# self.thread_deferreds.append(d)
|
||||||
|
# yield defer.gatherResults(self.thread_deferreds)
|
||||||
|
# self.thread_deferreds = []
|
||||||
|
# self.log.info("Finished getting thread lists")
|
||||||
|
|
||||||
|
@inlineCallbacks
|
||||||
|
def get_thread_contents(self):
|
||||||
|
for board in self.thread_list.keys():
|
||||||
|
for page in self.thread_list[board]:
|
||||||
|
for threads in page["threads"]:
|
||||||
|
no = threads["no"]
|
||||||
|
yield self.get_thread_content(board, no)
|
||||||
|
# self.content_deferreds.append(d)
|
||||||
|
# al = yield defer.gatherResults(self.content_deferreds)
|
||||||
|
# self.content_deferreds = []
|
||||||
|
# self.log.info("Finished getting content")
|
||||||
|
|
||||||
|
def get_thread_list(self, board):
|
||||||
|
self.log.info(f"Getting thread list for {board}")
|
||||||
|
response = self.api_call(f"{board}/catalog.json")
|
||||||
|
response.addCallback(self.got_thread_list, board)
|
||||||
|
return response
|
||||||
|
|
||||||
|
def got_thread_list(self, thread_list, board):
|
||||||
|
if thread_list["success"]:
|
||||||
|
self.thread_list[board] = thread_list["response"]
|
||||||
|
self.log.info(f"Got thread list for {board}: {len(thread_list)}")
|
||||||
|
|
||||||
|
def get_thread_content(self, board, thread):
|
||||||
|
self.log.info(f"Getting information for thread {thread} on board {board}")
|
||||||
|
response = self.api_call(f"{board}/thread/{thread}.json")
|
||||||
|
response.addCallback(self.got_thread_content, board, thread)
|
||||||
|
return response
|
||||||
|
|
||||||
|
def got_thread_content(self, thread_content, board, thread):
|
||||||
|
if thread_content["success"]:
|
||||||
|
self.log.info(f"Got thread content for thread {thread} on board {board}")
|
||||||
|
for post in thread_content["response"]["posts"]:
|
||||||
|
# print(post)
|
||||||
|
self.handle_post(board, thread, post)
|
||||||
|
else:
|
||||||
|
self.log.error(
|
||||||
|
(
|
||||||
|
f"Error fetching thread {thread} on board {board}: "
|
||||||
|
f"{thread_content['message']}"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
def handle_post(self, board, thread, post):
|
||||||
|
name_map = {
|
||||||
|
"no": "msg_id",
|
||||||
|
"now": "ts",
|
||||||
|
"name": "user",
|
||||||
|
"trip": "nick",
|
||||||
|
"id": "nick_id",
|
||||||
|
"resto": "id_reply",
|
||||||
|
"com": "msg",
|
||||||
|
"ext": "file_ext",
|
||||||
|
"w": "file_w",
|
||||||
|
"h": "file_h",
|
||||||
|
"tn_w": "file_tn_w",
|
||||||
|
"tn_h": "file_tn_h",
|
||||||
|
"tim": "file_tim",
|
||||||
|
"fsize": "file_size",
|
||||||
|
"md5": "file_md5",
|
||||||
|
"filedeleted": "file_deleted",
|
||||||
|
"spoiler": "file_spoiler",
|
||||||
|
"custom_spoiler": "file_custom_spoiler",
|
||||||
|
"m_img": "file_m_img",
|
||||||
|
"time": "unix_time",
|
||||||
|
}
|
||||||
|
post["type"] = "msg"
|
||||||
|
|
||||||
|
# Calculate hash for post
|
||||||
|
post_normalised = json.dumps(post, sort_keys=True)
|
||||||
|
hash = siphash(self.hash_key, post_normalised)
|
||||||
|
hash = str(hash)
|
||||||
|
redis_key = f"cache.{board}.{thread}.{post['no']}"
|
||||||
|
key_content = db.r.get(redis_key)
|
||||||
|
if key_content:
|
||||||
|
key_content = key_content.decode("ascii")
|
||||||
|
if key_content == hash:
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
post["type"] = "update"
|
||||||
|
db.r.set(redis_key, hash)
|
||||||
|
# Check if hash exists
|
||||||
|
# Store the hash
|
||||||
|
for key, value in list(post.items()):
|
||||||
|
if key in name_map:
|
||||||
|
post[name_map[key]] = post[key]
|
||||||
|
del post[key]
|
||||||
|
if "ts" in post:
|
||||||
|
old_time = post["ts"]
|
||||||
|
# '08/30/22(Tue)02:25:37'
|
||||||
|
time_spl = old_time.split(":")
|
||||||
|
if len(time_spl) == 3:
|
||||||
|
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S")
|
||||||
|
else:
|
||||||
|
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M")
|
||||||
|
new_ts = old_ts.isoformat()
|
||||||
|
post["ts"] = new_ts
|
||||||
|
if "msg" in post:
|
||||||
|
soup = BeautifulSoup(post["msg"], "html.parser")
|
||||||
|
msg = soup.get_text(separator="\n")
|
||||||
|
post["msg"] = msg
|
||||||
|
|
||||||
|
post["src"] = "4ch"
|
||||||
|
|
||||||
|
# print({name_map[name]: val for name, val in post.items()})
|
||||||
|
db.store_message(post)
|
||||||
|
|
||||||
|
@inlineCallbacks
|
||||||
|
def callback_api_call(self, response, result):
|
||||||
|
try:
|
||||||
|
text = yield response.content()
|
||||||
|
except: # noqa
|
||||||
|
self.log.error("Error with API call")
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
result["response"] = json.loads(text)
|
||||||
|
except json.decoder.JSONDecodeError:
|
||||||
|
result["success"] = "ERROR"
|
||||||
|
result["message"] = "Error parsing JSON."
|
||||||
|
return result
|
||||||
|
result["status"] = response.code
|
||||||
|
if response.code == 200:
|
||||||
|
result["success"] = True
|
||||||
|
result["message"] = "OK"
|
||||||
|
else:
|
||||||
|
result["message"] = "API ERROR"
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def api_call(self, method: str):
|
||||||
|
headers = {
|
||||||
|
"User-Agent": (
|
||||||
|
"Mozilla/5.0 (Windows NT 10.0; rv:68.0) Gecko/20100101 Firefox/68.0"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
url = f"{self.api_endpoint}/{method}"
|
||||||
|
self.log.debug(f"GET {url}")
|
||||||
|
response = treq.get(url, headers=headers)
|
||||||
|
result: Dict[str, Any] = {
|
||||||
|
"success": False,
|
||||||
|
"message": "Invalid Method",
|
||||||
|
"response": None,
|
||||||
|
"status": None,
|
||||||
|
}
|
||||||
|
response.addCallback(self.callback_api_call, result)
|
||||||
|
return response
|
|
@ -0,0 +1,62 @@
|
||||||
|
#!/usr/bin/env python
|
||||||
|
from operator import attrgetter
|
||||||
|
|
||||||
|
import discord
|
||||||
|
|
||||||
|
import db
|
||||||
|
import util
|
||||||
|
|
||||||
|
ATTRMAP = {
|
||||||
|
"msg": "content",
|
||||||
|
"msg_id": "id",
|
||||||
|
"nick": "author.name",
|
||||||
|
"host": "author.discriminator",
|
||||||
|
"ident": "author.nick",
|
||||||
|
"time": "created_at",
|
||||||
|
"channel": "channel.name",
|
||||||
|
"channel_nsfw": "channel.nsfw",
|
||||||
|
"bot": "author.bot",
|
||||||
|
"user_id": "author.id",
|
||||||
|
"channel_id": "channel.id",
|
||||||
|
"net": "author.guild.name",
|
||||||
|
"net_id": "author.guild.id",
|
||||||
|
"guild_member_count": "author.guild.member_count",
|
||||||
|
"channel_category": "channel.category.name",
|
||||||
|
"channel_category_id": "channel.category.id",
|
||||||
|
"channel_category_nsfw": "channel.category.nsfw",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class DiscordClient(discord.Client):
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
self.logger = None
|
||||||
|
self.did_something = False
|
||||||
|
name = self.__class__.__name__
|
||||||
|
self.log = util.get_logger(name)
|
||||||
|
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
async def on_ready(self):
|
||||||
|
self.log.info("Discord connection established.")
|
||||||
|
|
||||||
|
def recurse_dict(self, obj):
|
||||||
|
to_return = {}
|
||||||
|
for key, mapped in ATTRMAP.items():
|
||||||
|
try:
|
||||||
|
to_return[key] = attrgetter(mapped)(obj)
|
||||||
|
except AttributeError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
return to_return
|
||||||
|
|
||||||
|
async def on_message(self, message):
|
||||||
|
if not message.content:
|
||||||
|
return
|
||||||
|
|
||||||
|
a = self.recurse_dict(message)
|
||||||
|
a["ts"] = a["time"].isoformat()
|
||||||
|
del a["time"]
|
||||||
|
a["type"] = "msg"
|
||||||
|
a["src"] = "dis"
|
||||||
|
|
||||||
|
db.store_message(a)
|
|
@ -0,0 +1,69 @@
|
||||||
|
# Other library imports
|
||||||
|
import logging
|
||||||
|
|
||||||
|
log = logging.getLogger("util")
|
||||||
|
|
||||||
|
debug = True
|
||||||
|
|
||||||
|
# Color definitions
|
||||||
|
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
|
||||||
|
COLORS = {
|
||||||
|
"WARNING": YELLOW,
|
||||||
|
"INFO": WHITE,
|
||||||
|
"DEBUG": BLUE,
|
||||||
|
"CRITICAL": YELLOW,
|
||||||
|
"ERROR": RED,
|
||||||
|
}
|
||||||
|
RESET_SEQ = "\033[0m"
|
||||||
|
COLOR_SEQ = "\033[1;%dm"
|
||||||
|
BOLD_SEQ = "\033[1m"
|
||||||
|
|
||||||
|
|
||||||
|
def formatter_message(message, use_color=True):
|
||||||
|
if use_color:
|
||||||
|
message = message.replace("$RESET", RESET_SEQ).replace("$BOLD", BOLD_SEQ)
|
||||||
|
else:
|
||||||
|
message = message.replace("$RESET", "").replace("$BOLD", "")
|
||||||
|
return message
|
||||||
|
|
||||||
|
|
||||||
|
class ColoredFormatter(logging.Formatter):
|
||||||
|
def __init__(self, msg, use_color=True):
|
||||||
|
logging.Formatter.__init__(self, msg)
|
||||||
|
self.use_color = use_color
|
||||||
|
|
||||||
|
def format(self, record):
|
||||||
|
levelname = record.levelname
|
||||||
|
if self.use_color and levelname in COLORS:
|
||||||
|
levelname_color = (
|
||||||
|
COLOR_SEQ % (30 + COLORS[levelname]) + levelname + RESET_SEQ
|
||||||
|
)
|
||||||
|
record.levelname = levelname_color
|
||||||
|
return logging.Formatter.format(self, record)
|
||||||
|
|
||||||
|
|
||||||
|
def get_logger(name):
|
||||||
|
|
||||||
|
# Define the logging format
|
||||||
|
FORMAT = "%(asctime)s %(levelname)18s $BOLD%(name)13s$RESET - %(message)s"
|
||||||
|
COLOR_FORMAT = formatter_message(FORMAT, True)
|
||||||
|
color_formatter = ColoredFormatter(COLOR_FORMAT)
|
||||||
|
# formatter = logging.Formatter(
|
||||||
|
|
||||||
|
# Why is this so complicated?
|
||||||
|
ch = logging.StreamHandler()
|
||||||
|
ch.setLevel(logging.INFO)
|
||||||
|
# ch.setFormatter(formatter)
|
||||||
|
ch.setFormatter(color_formatter)
|
||||||
|
|
||||||
|
# Define the logger on the base class
|
||||||
|
log = logging.getLogger(name)
|
||||||
|
log.setLevel(logging.INFO)
|
||||||
|
if debug:
|
||||||
|
log.setLevel(logging.DEBUG)
|
||||||
|
ch.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
# Add the handler and stop it being silly and printing everything twice
|
||||||
|
log.addHandler(ch)
|
||||||
|
log.propagate = False
|
||||||
|
return log
|
Loading…
Reference in New Issue