monolith/sources/ch4.py

215 lines
7.1 KiB
Python

# Python modules can't start with a number...
import json
import random
import string
from datetime import datetime
from typing import Any, Dict
import treq
from bs4 import BeautifulSoup
from siphashc import siphash
import db
import util
from schemas.ch4_s import ATTRMAP
class Chan4(object):
"""
4chan indexer, crawler and ingester.
"""
def __init__(self):
name = self.__class__.__name__
self.log = util.get_logger(name)
self.api_endpoint = "https://a.4cdn.org"
self.boards = ["out"]
self.thread_list = {}
#self.thread_deferreds = []
#self.content_deferreds = []
self.log.info(f"Starting crawler bot to {self.api_endpoint}")
self.hash_key = db.r.get("hashing_key")
if not self.hash_key:
letters = string.ascii_lowercase
self.hash_key = "".join(random.choice(letters) for i in range(16))
self.log.debug(f"Created new hash key: {self.hash_key}")
db.r.set("hashing_key", self.hash_key)
else:
self.hash_key = self.hash_key.decode("ascii")
self.log.debug(f"Decoded hash key: {self.hash_key}")
@inlineCallbacks
def run(self):
yield self.get_board_list()
def got_thread_lists(self, thread_lists):
print("GOT THREAD LIST", thread_lists)
# Instead of while True, do it again!
d = self.get_thread_lists()
d.addCallback(self.got_thread_lists)
# @inlineCallbacks
# def mainloop(self):
# while True:
# yield self.get_thread_lists()
# yield self.get_thread_contents()
@inlineCallbacks
def get_board_list(self):
self.log.debug("Getting board list")
response = self.api_call("boards.json")
response.addCallback(self.got_board_list)
yield response
@inlineCallbacks
def got_board_list(self, board_list):
if board_list["success"]:
for board in board_list["response"]["boards"]:
self.boards.append(board["board"])
self.log.debug(f"Got boards: {self.boards}")
d = self.get_thread_lists()
d.addCallback(self.got_thread_lists)
yield d
@inlineCallbacks
def get_thread_lists(self):
thread_deferreds = []
for board in self.boards:
d = self.get_thread_list(board)
d.addCallback(self.got_thread_list, board)
thread_deferreds.append(d)
yield defer.gatherResults(thread_deferreds)
def get_thread_list(self, board):
self.log.debug(f"Getting thread list for {board}")
response = self.api_call(f"{board}/catalog.json")
return response
def got_thread_list(self, thread_list, board):
if not thread_list:
self.log.error(f"Thread list invalid: {thread_list} {board}")
return
if thread_list["success"]:
#self.thread_list[board] = thread_list["response"]
for page in thread_list["response"]:
for threads in page["threads"]:
no = threads["no"]
d = self.get_thread_content(board, no)
d.addCallback(self.got_thread_content, board, no)
self.log.info(f"Got thread list for {board}: {len(thread_list)}")
def get_thread_content(self, board, thread):
self.log.debug(f"Getting information for thread {thread} on board {board}")
response = self.api_call(f"{board}/thread/{thread}.json")
return response
def got_thread_content(self, thread_content, board, thread):
if not thread_content:
self.log.error(f"Thread content invalid: {thread_content} {board} {thread}")
return
if thread_content["success"]:
self.log.debug(f"Got thread content for thread {thread} on board {board}")
for post in thread_content["response"]["posts"]:
# print(post)
self.handle_post(board, thread, post)
else:
self.log.error(
(
f"Error fetching thread {thread} on board {board}: "
f"{thread_content['message']}"
)
)
def handle_post(self, board, thread, post):
post["type"] = "msg"
# Calculate hash for post
post_normalised = json.dumps(post, sort_keys=True)
hash = siphash(self.hash_key, post_normalised)
hash = str(hash)
redis_key = f"cache.{board}.{thread}.{post['no']}"
key_content = db.r.get(redis_key)
if key_content:
key_content = key_content.decode("ascii")
if key_content == hash:
return
else:
post["type"] = "update"
db.r.set(redis_key, hash)
# Check if hash exists
# Store the hash
for key, value in list(post.items()):
if key in ATTRMAP:
post[ATTRMAP[key]] = post[key]
del post[key]
if "ts" in post:
old_time = post["ts"]
# '08/30/22(Tue)02:25:37'
time_spl = old_time.split(":")
if len(time_spl) == 3:
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S")
else:
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M")
new_ts = old_ts.isoformat()
post["ts"] = new_ts
if "msg" in post:
soup = BeautifulSoup(post["msg"], "html.parser")
msg = soup.get_text(separator="\n")
post["msg"] = msg
post["src"] = "4ch"
# print({name_map[name]: val for name, val in post.items()})
db.store_message(post)
def dump(self, *args, **kwargs):
self.log.error(f"Error: {args} {kwargs}")
@inlineCallbacks
def callback_api_call(self, response, result):
result["status"] = response.code
try:
text = yield response.content()
except: # noqa
self.log.error("Error with API call")
return False
#print("RESP TEXT", text)
try:
result["response"] = json.loads(text)
except json.decoder.JSONDecodeError:
result["success"] = "ERROR"
result["message"] = "Error parsing JSON."
return result
#print("RESP AFTER JSON", result)
result["status"] = response.code
if response.code == 200:
result["success"] = True
result["message"] = "OK"
else:
result["message"] = "API ERROR"
return result
def api_call(self, method: str):
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; rv:68.0) Gecko/20100101 Firefox/68.0"
)
}
url = f"{self.api_endpoint}/{method}"
self.log.debug(f"GET {url}")
response = treq.get(url, headers=headers)
result: Dict[str, Any] = {
"success": False,
"message": "Call not successful",
"response": None,
"status": None,
}
response.addCallback(self.callback_api_call, result)
response.addErrback(self.dump, url=url)
return response