monolith/sources/ch4.py

206 lines
7.6 KiB
Python

# Python modules can't start with a number...
import asyncio
import random
import string
from concurrent.futures import ProcessPoolExecutor
from datetime import datetime
import aiohttp
import ujson
from bs4 import BeautifulSoup
from siphashc import siphash
import db
import util
from schemas.ch4_s import ATTRMAP
from numpy import array_split
from math import ceil
p = ProcessPoolExecutor(10)
class Chan4(object):
"""
4chan indexer, crawler and ingester.
"""
def __init__(self):
name = self.__class__.__name__
self.log = util.get_logger(name)
self.api_endpoint = "https://a.4cdn.org"
# self.boards = ["out", "g", "a", "3", "pol"] #
self.boards = []
self.thread_list = {}
# self.thread_deferreds = []
# self.content_deferreds = []
self.log.info(f"Starting crawler bot to {self.api_endpoint}")
self.hash_key = db.r.get("hashing_key")
if not self.hash_key:
letters = string.ascii_lowercase
self.hash_key = "".join(random.choice(letters) for i in range(16))
self.log.debug(f"Created new hash key: {self.hash_key}")
db.r.set("hashing_key", self.hash_key)
else:
self.hash_key = self.hash_key.decode("ascii")
self.log.debug(f"Decoded hash key: {self.hash_key}")
async def run(self):
await self.get_board_list()
async def get_board_list(self):
responses = await self.api_call({"_": "boards.json"})
for mapped, response in responses:
if not response:
continue
for board in response["boards"]:
self.boards.append(board["board"])
self.log.debug(f"Got boards: {self.boards}")
await self.get_thread_lists(self.boards)
async def get_thread_lists(self, boards):
self.log.debug(f"Getting thread list for {boards}")
board_urls = {board: f"{board}/catalog.json" for board in boards}
responses = await self.api_call(board_urls)
to_get = []
for mapped, response in responses:
if not response:
continue
for page in response:
for threads in page["threads"]:
no = threads["no"]
to_get.append((mapped, no))
self.log.info(f"Got thread list for {mapped}: {len(response)}")
print("THREAD LIST FULL LEN", len(to_get))
if not to_get:
await self.get_thread_lists(self.boards)
return
split_threads = array_split(to_get, ceil(len(to_get) / 10000))
print("SPLIT THREADS INTO", len(split_threads))
for threads in split_threads:
print("SUBMITTED THREADS FOR", len(threads))
await self.get_threads_content(threads)
#await self.get_threads_content(to_get)
# Recurse
await self.get_thread_lists(self.boards)
async def get_threads_content(self, thread_list):
thread_urls = {
(board, thread): f"{board}/thread/{thread}.json"
for board, thread in thread_list
}
self.log.debug(f"Getting information for threads: {thread_urls}")
responses = await self.api_call(thread_urls)
self.log.debug(f"Got information for threads: {thread_urls}")
all_posts = {}
for mapped, response in responses:
if not response:
continue
board, thread = mapped
self.log.debug(f"Got thread content for thread {thread} on board {board}")
all_posts[mapped] = response["posts"]
# Split into 10,000 chunks
# split_posts = array_split(all_posts, ceil(len(all_posts) / 10000))
# print("SPLIT CHUNK", len(split_posts))
# for posts in split_posts:
# with futures.ThreadPoolExecutor(max_workers=6) as executor:
# print("SUBMITTED THREAD FOR", len(posts))
# executor.submit(self.handle_posts, board, thread, posts)
# await self.handle_posts(board, thread, response["posts"])
# await asyncio.sleep(1)
await self.handle_posts_thread(all_posts)
# self.handle_posts(all_posts)
@asyncio.coroutine
def handle_posts_thread(self, posts):
loop = asyncio.get_event_loop()
print("HANDLE POSTS THREAD", len(posts.keys()))
yield from loop.run_in_executor(p, self.handle_posts, posts)
def handle_posts(self, posts):
to_store = []
for key, post_list in posts.items():
board, thread = key
for index, post in enumerate(post_list):
posts[key][index]["type"] = "msg"
# Calculate hash for post
post_normalised = ujson.dumps(post, sort_keys=True)
hash = siphash(self.hash_key, post_normalised)
hash = str(hash)
redis_key = f"cache.{board}.{thread}.{post['no']}"
key_content = db.r.get(redis_key)
if key_content:
key_content = key_content.decode("ascii")
if key_content == hash:
continue
else:
posts[key][index]["type"] = "update"
db.r.set(redis_key, hash)
for key2, value in list(post.items()):
if key2 in ATTRMAP:
post[ATTRMAP[key2]] = posts[key][index][key2]
del posts[key][index][key2]
if "ts" in post:
old_time = posts[key][index]["ts"]
# '08/30/22(Tue)02:25:37'
time_spl = old_time.split(":")
if len(time_spl) == 3:
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S")
else:
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M")
new_ts = old_ts.isoformat()
posts[key][index]["ts"] = new_ts
if "msg" in post:
soup = BeautifulSoup(posts[key][index]["msg"], "html.parser")
msg = soup.get_text(separator="\n")
posts[key][index]["msg"] = msg
posts[key][index]["src"] = "4ch"
to_store.append(posts[key][index])
# print({name_map[name]: val for name, val in post.items()})
# print(f"Got posts: {len(posts)}")
db.store_message_bulk(to_store)
async def fetch(self, url, session, mapped):
async with session.get(url) as response:
try:
return (mapped, await response.json())
except: # noqa
print("FETCH ERROR")
return (mapped, None)
async def bound_fetch(self, sem, url, session, mapped):
# Getter function with semaphore.
async with sem:
try:
return await self.fetch(url, session, mapped)
except: # noqa
print("BOUND ERROR")
return (mapped, None)
async def api_call(self, methods={}):
tasks = []
sem = asyncio.Semaphore(100)
connector = aiohttp.TCPConnector(limit=None)
async with aiohttp.ClientSession(connector=connector) as session:
for mapped, method in methods.items():
url = f"{self.api_endpoint}/{method}"
self.log.debug(f"GET {url}")
task = asyncio.create_task(self.bound_fetch(sem, url, session, mapped))
# task = asyncio.ensure_future(self.bound_fetch(sem, url, session))
tasks.append(task)
responses = await asyncio.gather(*tasks)
return responses