# Python modules can't start with a number... import asyncio import random import string from concurrent.futures import ProcessPoolExecutor from datetime import datetime from math import ceil import aiohttp import ujson from bs4 import BeautifulSoup from numpy import array_split from siphashc import siphash import db import util from schemas.ch4_s import ATTRMAP # CONFIGURATION # # Number of 4chan threads to request at once THREADS_CONCURRENT = 1000 # Seconds to wait between every THREADS_CONCURRENT requests THREADS_DELAY = 0.1 # Seconds to wait between crawls CRAWL_DELAY = 5 # Semaphore value ? THREADS_SEMAPHORE = 100 # Maximum number of CPU threads to use for post processing CPU_THREADS = 2 # CONFIGURATION END # p = ProcessPoolExecutor(CPU_THREADS) class Chan4(object): """ 4chan indexer, crawler and ingester. """ def __init__(self): name = self.__class__.__name__ self.log = util.get_logger(name) self.api_endpoint = "https://a.4cdn.org" # self.boards = ["out", "g", "a", "3", "pol"] # self.boards = [] # self.thread_deferreds = [] # self.content_deferreds = [] self.log.info(f"Starting crawler bot to {self.api_endpoint}") self.hash_key = db.r.get("hashing_key") if not self.hash_key: letters = string.ascii_lowercase self.hash_key = "".join(random.choice(letters) for i in range(16)) self.log.debug(f"Created new hash key: {self.hash_key}") db.r.set("hashing_key", self.hash_key) else: self.hash_key = self.hash_key.decode("ascii") self.log.debug(f"Decoded hash key: {self.hash_key}") async def run(self): await self.get_board_list() while True: await self.get_thread_lists(self.boards) await asyncio.sleep(CRAWL_DELAY) async def get_board_list(self): responses = await self.api_call({"_": "boards.json"}) for mapped, response in responses: if not response: continue for board in response["boards"]: self.boards.append(board["board"]) self.log.debug(f"Got boards: {self.boards}") async def get_thread_lists(self, boards): self.log.debug(f"Getting thread list for {boards}") board_urls = {board: f"{board}/catalog.json" for board in boards} responses = await self.api_call(board_urls) to_get = [] for mapped, response in responses: if not response: continue for page in response: for threads in page["threads"]: no = threads["no"] to_get.append((mapped, no)) self.log.info(f"Got thread list for {mapped}: {len(response)}") print("THREAD LIST FULL LEN", len(to_get)) if not to_get: return split_threads = array_split(to_get, ceil(len(to_get) / THREADS_CONCURRENT)) print("SPLIT THREADS INTO", len(split_threads)) for threads in split_threads: print("SUBMITTED THREADS FOR", len(threads)) await self.get_threads_content(threads) await asyncio.sleep(THREADS_DELAY) # await self.get_threads_content(to_get) def take_items(self, dict_list, n): i = 0 try: for x in list(dict_list.keys()): for item in list(dict_list[x]): yield (x, item) dict_list[x].remove(item) i += 1 if i == n: raise StopIteration except StopIteration: print("Take items took", i, "items") async def get_threads_content(self, thread_list): thread_urls = { (board, thread): f"{board}/thread/{thread}.json" for board, thread in thread_list } self.log.debug(f"Getting information for threads: {thread_urls}") responses = await self.api_call(thread_urls) self.log.debug(f"Got information for threads: {thread_urls}") all_posts = {} for mapped, response in responses: if not response: continue board, thread = mapped self.log.debug(f"Got thread content for thread {thread} on board {board}") all_posts[mapped] = response["posts"] # Split into 10,000 chunks if not all_posts: print("NOTALL POSTS", all_posts) return threads_per_core = int(len(all_posts) / CPU_THREADS) print("THREADS PER CORE", threads_per_core) for i in range(CPU_THREADS): print("SUBMITTING CORE", i) new_dict = {} pulled_posts = self.take_items(all_posts, threads_per_core) for k, v in pulled_posts: if k in new_dict: new_dict[k].append(v) else: new_dict[k] = [v] print("SUBMITTING", len(new_dict), "THREADS") await self.handle_posts_thread(new_dict) # print("VAL", ceil(len(all_posts) / threads_per_core)) # split_posts = array_split(all_posts, ceil(len(all_posts) / threads_per_core)) # print("THREADS PER CORE SPLIT", len(split_posts)) # # print("SPLIT CHUNK", len(split_posts)) # for posts in split_posts: # print("SPAWNED THREAD TO PROCESS", len(posts), "POSTS") # await self.handle_posts_thread(posts) # await self.handle_posts_thread(all_posts) @asyncio.coroutine def handle_posts_thread(self, posts): loop = asyncio.get_event_loop() print("HANDLE POSTS THREAD", len(posts.keys())) yield from loop.run_in_executor(p, self.handle_posts, posts) def handle_posts(self, posts): to_store = [] for key, post_list in posts.items(): board, thread = key for index, post in enumerate(post_list): posts[key][index]["type"] = "msg" # Calculate hash for post post_normalised = ujson.dumps(post, sort_keys=True) hash = siphash(self.hash_key, post_normalised) hash = str(hash) redis_key = f"cache.{board}.{thread}.{post['no']}" key_content = db.r.get(redis_key) if key_content: key_content = key_content.decode("ascii") if key_content == hash: continue else: posts[key][index]["type"] = "update" db.r.set(redis_key, hash) for key2, value in list(post.items()): if key2 in ATTRMAP: post[ATTRMAP[key2]] = posts[key][index][key2] del posts[key][index][key2] if "ts" in post: old_time = posts[key][index]["ts"] # '08/30/22(Tue)02:25:37' time_spl = old_time.split(":") if len(time_spl) == 3: old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S") else: old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M") # new_ts = old_ts.isoformat() new_ts = int(old_ts.timestamp()) posts[key][index]["ts"] = new_ts if "msg" in post: soup = BeautifulSoup(posts[key][index]["msg"], "html.parser") msg = soup.get_text(separator="\n") posts[key][index]["msg"] = msg posts[key][index]["src"] = "4ch" posts[key][index]["net"] = board posts[key][index]["channel"] = thread to_store.append(posts[key][index]) # print({name_map[name]: val for name, val in post.items()}) # print(f"Got posts: {len(posts)}") if to_store: db.store_message_bulk(to_store) async def fetch(self, url, session, mapped): async with session.get(url) as response: try: return (mapped, await response.json()) except: # noqa print("FETCH ERROR") return (mapped, None) async def bound_fetch(self, sem, url, session, mapped): # Getter function with semaphore. async with sem: try: return await self.fetch(url, session, mapped) except: # noqa print("BOUND ERROR") return (mapped, None) async def api_call(self, methods={}): tasks = [] sem = asyncio.Semaphore(THREADS_SEMAPHORE) connector = aiohttp.TCPConnector(limit=None) async with aiohttp.ClientSession(connector=connector) as session: for mapped, method in methods.items(): url = f"{self.api_endpoint}/{method}" self.log.debug(f"GET {url}") task = asyncio.create_task(self.bound_fetch(sem, url, session, mapped)) # task = asyncio.ensure_future(self.bound_fetch(sem, url, session)) tasks.append(task) responses = await asyncio.gather(*tasks) return responses