# Python modules can't start with a number... import asyncio import random import string from math import ceil from os import getenv import aiohttp from numpy import array_split import psutil import db import util # CONFIGURATION # # Number of 4chan threads to request at once THREADS_CONCURRENT = int(getenv("MONOLITH_CH4_THREADS_CONCURRENT", 1000)) # Seconds to wait between every THREADS_CONCURRENT requests THREADS_DELAY = float(getenv("MONOLITH_CH4_THREADS_DELAY", 0.1)) # Seconds to wait between crawls CRAWL_DELAY = int(getenv("MONOLITH_CH4_CRAWL_DELAY", 5)) # Semaphore value ? THREADS_SEMAPHORE = int(getenv("MONOLITH_CH4_THREADS_SEMAPHORE", 1000)) # Target CPU usage percentage TARGET_CPU_USAGE = float(getenv("MONOLITH_CH4_TARGET_CPU_USAGE", 50.0)) # Boards to crawl BOARDS = getenv("MONOLITH_CH4_BOARDS", "").split(",") # CONFIGURATION END # class Chan4(object): """ 4chan indexer, crawler and ingester. """ def __init__(self): name = self.__class__.__name__ self.log = util.get_logger(name) self.sleep_interval = 0.0 self.api_endpoint = "https://a.4cdn.org" # self.boards = ["out", "g", "a", "3", "pol"] # self.boards = [] # self.thread_deferreds = [] # self.content_deferreds = [] self.log.info(f"Starting crawler bot to {self.api_endpoint}") self.hash_key = db.r.get("hashing_key") if not self.hash_key: letters = string.ascii_lowercase self.hash_key = "".join(random.choice(letters) for i in range(16)) self.log.debug(f"Created new hash key: {self.hash_key}") db.r.set("hashing_key", self.hash_key) else: self.hash_key = self.hash_key.decode("ascii") self.log.debug(f"Decoded hash key: {self.hash_key}") async def dynamic_throttle(self): """ Dynamically sleeps before a request if CPU usage is above our target. Also, if CPU usage is far below the target, reduce the sleep time. Caps the sleep interval at 0.2s. Prints CPU usage and sleep interval like process.py. """ current_cpu_usage = psutil.cpu_percent(interval=0.2) if current_cpu_usage > TARGET_CPU_USAGE: self.sleep_interval += 0.01 if self.sleep_interval > 0.1: self.sleep_interval = 0.1 self.log.info( f"CPU {current_cpu_usage}% > {TARGET_CPU_USAGE}%, " f"=> sleep {self.sleep_interval:.3f}s" ) elif current_cpu_usage < TARGET_CPU_USAGE and self.sleep_interval > 0.01: self.sleep_interval -= 0.01 self.log.info( f"CPU {current_cpu_usage}% < {TARGET_CPU_USAGE}%, " f"=> sleep {self.sleep_interval:.3f}s" ) if self.sleep_interval > 0: await asyncio.sleep(self.sleep_interval) async def run(self): if "ALL" in BOARDS: await self.get_board_list() else: self.boards = BOARDS while True: await self.get_thread_lists(self.boards) await asyncio.sleep(CRAWL_DELAY) async def get_board_list(self): responses = await self.api_call({"_": "boards.json"}) for mapped, response in responses: if not response: continue for board in response["boards"]: self.boards.append(board["board"]) self.log.debug(f"Got boards: {self.boards}") # await self.dynamic_throttle() # TODO async def get_thread_lists(self, boards): # self.log.debug(f"Getting thread list for {boards}") board_urls = {board: f"{board}/threads.json" for board in boards} responses = await self.api_call(board_urls) to_get = [] flat_map = [board for board, thread in responses] self.log.debug(f"Got thread list for {len(responses)} boards: {flat_map}") for board, response in responses: if not response: continue for page in response: for threads in page["threads"]: no = threads["no"] to_get.append((board, no)) # await self.dynamic_throttle() # TODO if not to_get: return self.log.debug(f"Got {len(to_get)} threads to fetch") split_threads = array_split(to_get, ceil(len(to_get) / THREADS_CONCURRENT)) self.log.debug(f"Split threads into {len(split_threads)} series") for index, thr in enumerate(split_threads): self.log.debug(f"Series {index} - getting {len(thr)} threads") await self.get_threads_content(thr) # await self.dynamic_throttle() # TODO await asyncio.sleep(THREADS_DELAY) def take_items(self, dict_list, n): i = 0 try: for x in list(dict_list.keys()): for item in list(dict_list[x]): yield (x, item) dict_list[x].remove(item) i += 1 if i == n: raise StopIteration except StopIteration: print("Take items took", i, "items") async def get_threads_content(self, thread_list): thread_urls = { (board, thread): f"{board}/thread/{thread}.json" for board, thread in thread_list } # self.log.debug(f"Getting information for threads: {thread_urls}") responses = await self.api_call(thread_urls) self.log.debug(f"Got information for {len(responses)} threads") all_posts = {} for mapped, response in responses: if not response: continue board, thread = mapped all_posts[mapped] = response["posts"] # await self.dynamic_throttle() # TODO if not all_posts: return await self.handle_posts(all_posts) async def handle_posts(self, posts): to_store = [] for key, post_list in posts.items(): board, thread = key for post in post_list: post["type"] = "msg" post["src"] = "4ch" post["net"] = board post["channel"] = thread to_store.append(post) # await self.dynamic_throttle() # TODO if to_store: await db.queue_message_bulk(to_store) async def fetch(self, url, session, mapped): async with session.get(url) as response: try: return (mapped, await response.json()) except: # noqa return (mapped, None) async def bound_fetch(self, sem, url, session, mapped): # Getter function with semaphore. async with sem: await self.dynamic_throttle() try: return await self.fetch(url, session, mapped) except: # noqa return (mapped, None) async def api_call(self, methods={}): tasks = [] sem = asyncio.Semaphore(THREADS_SEMAPHORE) connector = aiohttp.TCPConnector(limit=None) async with aiohttp.ClientSession(connector=connector) as session: for mapped, method in methods.items(): url = f"{self.api_endpoint}/{method}" # self.log.debug(f"GET {url}") task = asyncio.create_task(self.bound_fetch(sem, url, session, mapped)) # task = asyncio.ensure_future(self.bound_fetch(sem, url, session)) tasks.append(task) responses = await asyncio.gather(*tasks) return responses