From dcd648e1d242845a6d36948ce830ca0429a94dbb Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Mon, 5 Sep 2022 07:20:30 +0100 Subject: [PATCH] Make crawler more efficient and implement configurable parameters --- db.py | 2 ++ sources/ch4.py | 87 ++++++++++++++++++++++++++++++++++++++------------ sources/dis.py | 3 +- 3 files changed, 71 insertions(+), 21 deletions(-) diff --git a/db.py b/db.py index 45ae7c7..04730d6 100644 --- a/db.py +++ b/db.py @@ -55,6 +55,8 @@ def store_message_bulk(data): print("BULK", len(data)) if not data: return + # 10000: maximum inserts we can submit to + # Manticore as of Sept 2022 split_posts = array_split(data, ceil(len(data) / 10000)) for messages in split_posts: print("PROCESSING SPLIT OF", len(messages), "MESSAGES") diff --git a/sources/ch4.py b/sources/ch4.py index c2aeaaa..79083c4 100644 --- a/sources/ch4.py +++ b/sources/ch4.py @@ -17,9 +17,29 @@ from schemas.ch4_s import ATTRMAP from numpy import array_split from math import ceil -p = ProcessPoolExecutor(10) +# CONFIGURATION # + +# Number of 4chan threads to request at once +THREADS_CONCURRENT = 1000 + +# Seconds to wait between every THREADS_CONCURRENT requests +THREADS_DELAY = 0.1 + +# Seconds to wait between crawls +CRAWL_DELAY = 5 + +# Semaphore value ? +THREADS_SEMAPHORE = 100 + +# Maximum number of CPU threads to use for post processing +CPU_THREADS = 2 + +# CONFIGURATION END # + +p = ProcessPoolExecutor(CPU_THREADS) + class Chan4(object): """ 4chan indexer, crawler and ingester. @@ -32,7 +52,6 @@ class Chan4(object): self.api_endpoint = "https://a.4cdn.org" # self.boards = ["out", "g", "a", "3", "pol"] # self.boards = [] - self.thread_list = {} # self.thread_deferreds = [] # self.content_deferreds = [] @@ -52,6 +71,9 @@ class Chan4(object): async def run(self): await self.get_board_list() + while True: + await self.get_thread_lists(self.boards) + await asyncio.sleep(CRAWL_DELAY) async def get_board_list(self): responses = await self.api_call({"_": "boards.json"}) @@ -62,8 +84,6 @@ class Chan4(object): self.boards.append(board["board"]) self.log.debug(f"Got boards: {self.boards}") - await self.get_thread_lists(self.boards) - async def get_thread_lists(self, boards): self.log.debug(f"Getting thread list for {boards}") board_urls = {board: f"{board}/catalog.json" for board in boards} @@ -80,17 +100,27 @@ class Chan4(object): self.log.info(f"Got thread list for {mapped}: {len(response)}") print("THREAD LIST FULL LEN", len(to_get)) if not to_get: - await self.get_thread_lists(self.boards) return - split_threads = array_split(to_get, ceil(len(to_get) / 10000)) + split_threads = array_split(to_get, ceil(len(to_get) / THREADS_CONCURRENT)) print("SPLIT THREADS INTO", len(split_threads)) for threads in split_threads: print("SUBMITTED THREADS FOR", len(threads)) await self.get_threads_content(threads) + await asyncio.sleep(THREADS_DELAY) #await self.get_threads_content(to_get) - # Recurse - await self.get_thread_lists(self.boards) + def take_items(self, dict_list, n): + i = 0 + try: + for x in list(dict_list.keys()): + for item in list(dict_list[x]): + yield (x, item) + dict_list[x].remove(item) + i += 1 + if i == n: + raise StopIteration + except: + print("Take items took", i, "items") async def get_threads_content(self, thread_list): thread_urls = { @@ -109,16 +139,31 @@ class Chan4(object): all_posts[mapped] = response["posts"] # Split into 10,000 chunks - # split_posts = array_split(all_posts, ceil(len(all_posts) / 10000)) - # print("SPLIT CHUNK", len(split_posts)) + if not all_posts: + print("NOTALL POSTS", all_posts) + return + threads_per_core = int(len(all_posts) / CPU_THREADS) + print("THREADS PER CORE", threads_per_core) + for i in range(CPU_THREADS): + print("SUBMITTING CORE", i) + new_dict = {} + pulled_posts = self.take_items(all_posts, threads_per_core) + for k, v in pulled_posts: + if k in new_dict: + new_dict[k].append(v) + else: + new_dict[k] = [v] + print("SUBMITTING", len(new_dict), "THREADS") + await self.handle_posts_thread(new_dict) + # print("VAL", ceil(len(all_posts) / threads_per_core)) + # split_posts = array_split(all_posts, ceil(len(all_posts) / threads_per_core)) + # print("THREADS PER CORE SPLIT", len(split_posts)) + # # print("SPLIT CHUNK", len(split_posts)) # for posts in split_posts: - # with futures.ThreadPoolExecutor(max_workers=6) as executor: - # print("SUBMITTED THREAD FOR", len(posts)) - # executor.submit(self.handle_posts, board, thread, posts) - # await self.handle_posts(board, thread, response["posts"]) - # await asyncio.sleep(1) - await self.handle_posts_thread(all_posts) - # self.handle_posts(all_posts) + # print("SPAWNED THREAD TO PROCESS", len(posts), "POSTS") + # await self.handle_posts_thread(posts) + + #await self.handle_posts_thread(all_posts) @asyncio.coroutine def handle_posts_thread(self, posts): @@ -159,7 +204,8 @@ class Chan4(object): old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S") else: old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M") - new_ts = old_ts.isoformat() + # new_ts = old_ts.isoformat() + new_ts = int(old_ts.timestamp()) posts[key][index]["ts"] = new_ts if "msg" in post: soup = BeautifulSoup(posts[key][index]["msg"], "html.parser") @@ -171,7 +217,8 @@ class Chan4(object): # print({name_map[name]: val for name, val in post.items()}) # print(f"Got posts: {len(posts)}") - db.store_message_bulk(to_store) + if to_store: + db.store_message_bulk(to_store) async def fetch(self, url, session, mapped): async with session.get(url) as response: @@ -192,7 +239,7 @@ class Chan4(object): async def api_call(self, methods={}): tasks = [] - sem = asyncio.Semaphore(100) + sem = asyncio.Semaphore(THREADS_SEMAPHORE) connector = aiohttp.TCPConnector(limit=None) async with aiohttp.ClientSession(connector=connector) as session: for mapped, method in methods.items(): diff --git a/sources/dis.py b/sources/dis.py index 000e786..a3104b5 100644 --- a/sources/dis.py +++ b/sources/dis.py @@ -35,7 +35,8 @@ class DiscordClient(discord.Client): return a = self.recurse_dict(message) - a["ts"] = a["time"].isoformat() + #a["ts"] = a["time"].isoformat() + a["ts"] = int(a["time"].timestamp()) del a["time"] a["type"] = "msg" a["src"] = "dis"