You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

198 lines
7.3 KiB
Python

# Python modules can't start with a number...
import asyncio
import random
import string
from concurrent.futures import ProcessPoolExecutor
from datetime import datetime
import aiohttp
import ujson
from bs4 import BeautifulSoup
from siphashc import siphash
import db
import util
from schemas.ch4_s import ATTRMAP
p = ProcessPoolExecutor(10)
class Chan4(object):
"""
4chan indexer, crawler and ingester.
"""
def __init__(self):
name = self.__class__.__name__
self.log = util.get_logger(name)
self.api_endpoint = "https://a.4cdn.org"
# self.boards = ["out", "g", "a", "3", "pol"] #
self.boards = []
self.thread_list = {}
# self.thread_deferreds = []
# self.content_deferreds = []
self.log.info(f"Starting crawler bot to {self.api_endpoint}")
self.hash_key = db.r.get("hashing_key")
if not self.hash_key:
letters = string.ascii_lowercase
self.hash_key = "".join(random.choice(letters) for i in range(16))
self.log.debug(f"Created new hash key: {self.hash_key}")
db.r.set("hashing_key", self.hash_key)
else:
self.hash_key = self.hash_key.decode("ascii")
self.log.debug(f"Decoded hash key: {self.hash_key}")
async def run(self):
await self.get_board_list()
async def get_board_list(self):
responses = await self.api_call({"_": "boards.json"})
for mapped, response in responses:
if not response:
continue
for board in response["boards"]:
self.boards.append(board["board"])
self.log.debug(f"Got boards: {self.boards}")
await self.get_thread_lists(self.boards)
async def get_thread_lists(self, boards):
self.log.debug(f"Getting thread list for {boards}")
board_urls = {board: f"{board}/catalog.json" for board in boards}
responses = await self.api_call(board_urls)
to_get = []
for mapped, response in responses:
if not response:
continue
for page in response:
for threads in page["threads"]:
no = threads["no"]
to_get.append((mapped, no))
self.log.info(f"Got thread list for {mapped}: {len(response)}")
await self.get_threads_content(to_get)
# Recurse
await self.get_thread_lists(self.boards)
async def get_threads_content(self, thread_list):
thread_urls = {
(board, thread): f"{board}/thread/{thread}.json"
for board, thread in thread_list
}
self.log.debug(f"Getting information for threads: {thread_urls}")
responses = await self.api_call(thread_urls)
self.log.debug(f"Got information for threads: {thread_urls}")
all_posts = {}
for mapped, response in responses:
if not response:
continue
board, thread = mapped
self.log.debug(f"Got thread content for thread {thread} on board {board}")
all_posts[mapped] = response["posts"]
# Split into 10,000 chunks
# split_posts = array_split(all_posts, ceil(len(all_posts) / 10000))
# print("SPLIT CHUNK", len(split_posts))
# for posts in split_posts:
# with futures.ThreadPoolExecutor(max_workers=6) as executor:
# print("SUBMITTED THREAD FOR", len(posts))
# executor.submit(self.handle_posts, board, thread, posts)
# await self.handle_posts(board, thread, response["posts"])
# await asyncio.sleep(1)
await self.handle_posts_thread(all_posts)
@asyncio.coroutine
def handle_posts_thread(self, posts):
loop = asyncio.get_event_loop()
print("HANDLE POSTS THREAD", len(posts.keys()))
yield from loop.run_in_executor(p, self.handle_posts, posts)
def handle_posts(self, posts):
print("HANDLE POSTS START")
to_store = []
for key, post_list in posts.items():
board, thread = key
print("PROCESSING BOARD", board, "THREAD", thread)
print("POSTS HERE", len(post_list))
for index, post in enumerate(post_list):
posts[key][index]["type"] = "msg"
# Calculate hash for post
post_normalised = ujson.dumps(post, sort_keys=True)
hash = siphash(self.hash_key, post_normalised)
hash = str(hash)
redis_key = f"cache.{board}.{thread}.{post['no']}"
key_content = db.r.get(redis_key)
if key_content:
key_content = key_content.decode("ascii")
if key_content == hash:
continue
else:
posts[key][index]["type"] = "update"
db.r.set(redis_key, hash)
for key2, value in list(post.items()):
if key2 in ATTRMAP:
post[ATTRMAP[key2]] = posts[key][index][key2]
del posts[key][index][key2]
if "ts" in post:
old_time = posts[key][index]["ts"]
# '08/30/22(Tue)02:25:37'
time_spl = old_time.split(":")
if len(time_spl) == 3:
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S")
else:
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M")
new_ts = old_ts.isoformat()
posts[key][index]["ts"] = new_ts
if "msg" in post:
soup = BeautifulSoup(posts[key][index]["msg"], "html.parser")
msg = soup.get_text(separator="\n")
posts[key][index]["msg"] = msg
posts[key][index]["src"] = "4ch"
to_store.append(posts[key][index])
# print({name_map[name]: val for name, val in post.items()})
# print(f"Got posts: {len(posts)}")
print("HANDLE POSTS DONE")
db.store_message_bulk(to_store)
print("STORE DB DONE")
async def fetch(self, url, session, mapped):
async with session.get(url) as response:
try:
return (mapped, await response.json())
except: # noqa
print("FETCH ERROR")
return (mapped, None)
async def bound_fetch(self, sem, url, session, mapped):
# Getter function with semaphore.
async with sem:
try:
return await self.fetch(url, session, mapped)
except: # noqa
print("BOUND ERROR")
return (mapped, None)
async def api_call(self, methods={}):
tasks = []
sem = asyncio.Semaphore(100)
connector = aiohttp.TCPConnector(limit=None)
async with aiohttp.ClientSession(connector=connector) as session:
for mapped, method in methods.items():
url = f"{self.api_endpoint}/{method}"
self.log.debug(f"GET {url}")
task = asyncio.create_task(self.bound_fetch(sem, url, session, mapped))
# task = asyncio.ensure_future(self.bound_fetch(sem, url, session))
tasks.append(task)
responses = await asyncio.gather(*tasks)
return responses