diff --git a/db.py b/db.py index 547c5e5..e58f0bf 100644 --- a/db.py +++ b/db.py @@ -1,12 +1,13 @@ -import json -from pprint import pprint - import manticoresearch from manticoresearch.rest import ApiException from redis import StrictRedis import util from schemas.mc_s import schema +import ujson +from numpy import array_split +from math import ceil + configuration = manticoresearch.Configuration(host="http://monolith-db-1:9308") api_client = manticoresearch.ApiClient(configuration) api_instance = manticoresearch.IndexApi(api_client) @@ -39,7 +40,7 @@ def store_message(msg): ] body_post = "" for item in body: - body_post += json.dumps(item) + body_post += ujson.dumps(item) body_post += "\n" #print(body_post) @@ -50,43 +51,49 @@ def store_message(msg): except ApiException as e: print("Exception when calling IndexApi->bulk: %s\n" % e) -async def store_message_bulk(messages): +def store_message_bulk(data): """ Store a message into Manticore :param msg: dict """ - print("BULK", len(messages)) - total = [] - for msg in messages: - # normalise fields - for key, value in list(msg.items()): - if value is None: - del msg[key] - if key in schema: - if isinstance(value, int): - if schema[key].startswith("string"): - msg[key] = str(value) + print("BULK", len(data)) + split_posts = array_split(data, ceil(len(data) / 10000)) + for messages in split_posts: + print("PROCESSING SPLIT OF", len(messages), "MESSAGES") + total = [] + for msg in messages: + # normalise fields + for key, value in list(msg.items()): + if value is None: + del msg[key] + if key in schema: + if isinstance(value, int): + if schema[key].startswith("string"): + msg[key] = str(value) - body = { - "insert": { - "index": "main", - "doc": msg + body = { + "insert": { + "index": "main", + "doc": msg + } } - } - total.append(body) + total.append(body) - body_post = "" - for item in total: - body_post += json.dumps(item) - body_post += "\n" + body_post = "" + for item in total: + body_post += ujson.dumps(item) + body_post += "\n" - #print(body_post) - try: - # Bulk index operations - api_response = api_instance.bulk(body_post, async_req=True) - #print(api_response) - except ApiException as e: - print("Exception when calling IndexApi->bulk: %s\n" % e) + #print(body_post) + try: + # Bulk index operations + api_response = api_instance.bulk(body_post, async_req=True) + #print(api_response) + except ApiException as e: + print("Exception when calling IndexApi->bulk: %s\n" % e) + print("FINISHED PROCESSING SPLIT") + + print("BULK FINISH") def update_schema(): pass diff --git a/docker-compose.yml b/docker-compose.yml index 8b53ec1..1287c29 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -17,7 +17,7 @@ services: db: image: manticoresearch/manticore restart: always - expose: + ports: - 9308 - 9312 ulimits: diff --git a/docker/docker-compose.prod.yml b/docker/docker-compose.prod.yml index 298fa86..11f8bba 100644 --- a/docker/docker-compose.prod.yml +++ b/docker/docker-compose.prod.yml @@ -10,6 +10,27 @@ services: - .env volumes_from: - tmp + depends_on: + - db + + db: + image: manticoresearch/manticore + restart: always + ports: + - 9308 + - 9312 + ulimits: + nproc: 65535 + nofile: + soft: 65535 + hard: 65535 + memlock: + soft: -1 + hard: -1 + environment: + - MCL=1 + volumes: + - ./docker/data:/var/lib/manticore tmp: image: busybox diff --git a/docker/requirements.txt b/docker/requirements.txt index 41486b8..5595557 100644 --- a/docker/requirements.txt +++ b/docker/requirements.txt @@ -5,3 +5,5 @@ siphashc aiohttp[speedups] python-dotenv manticoresearch +numpy +ujson diff --git a/requirements.txt b/requirements.txt index 05d8a1b..b9ebdc6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,4 +6,5 @@ siphashc aiohttp[speedups] python-dotenv manticoresearch - +numpy +ujson diff --git a/sources/ch4.py b/sources/ch4.py index e42e597..dd8839d 100644 --- a/sources/ch4.py +++ b/sources/ch4.py @@ -1,11 +1,9 @@ # Python modules can't start with a number... -import json +import ujson import random import string from datetime import datetime -from typing import Any, Dict -import treq from bs4 import BeautifulSoup from siphashc import siphash @@ -14,6 +12,11 @@ import util from schemas.ch4_s import ATTRMAP import aiohttp import asyncio +from numpy import array_split +from math import ceil + +from concurrent.futures import ProcessPoolExecutor +p = ProcessPoolExecutor(10) class Chan4(object): """ @@ -25,7 +28,8 @@ class Chan4(object): self.log = util.get_logger(name) self.api_endpoint = "https://a.4cdn.org" - self.boards = ["out"] + #self.boards = ["out", "g", "a", "3", "pol"] # + self.boards = [] self.thread_list = {} #self.thread_deferreds = [] @@ -48,13 +52,13 @@ class Chan4(object): await self.get_board_list() async def get_board_list(self): - # responses = await self.api_call({"_": "boards.json"}) - # for mapped, response in responses: - # if not response: - # continue - # for board in response["boards"]: - # self.boards.append(board["board"]) - # self.log.debug(f"Got boards: {self.boards}") + responses = await self.api_call({"_": "boards.json"}) + for mapped, response in responses: + if not response: + continue + for board in response["boards"]: + self.boards.append(board["board"]) + self.log.debug(f"Got boards: {self.boards}") await self.get_thread_lists(self.boards) @@ -82,59 +86,90 @@ class Chan4(object): self.log.debug(f"Getting information for threads: {thread_urls}") responses = await self.api_call(thread_urls) self.log.debug(f"Got information for threads: {thread_urls}") + all_posts = {} for mapped, response in responses: if not response: continue board, thread = mapped self.log.debug(f"Got thread content for thread {thread} on board {board}") - await self.handle_posts(board, thread, response["posts"]) - - async def handle_posts(self, board, thread, posts): - for index, post in enumerate(posts): - posts[index]["type"] = "msg" - - # Calculate hash for post - post_normalised = json.dumps(post, sort_keys=True) - hash = siphash(self.hash_key, post_normalised) - hash = str(hash) - redis_key = f"cache.{board}.{thread}.{post['no']}" - key_content = db.r.get(redis_key) - if key_content: - key_content = key_content.decode("ascii") - if key_content == hash: - return - else: - posts[index]["type"] = "update" - db.r.set(redis_key, hash) - - for key, value in list(post.items()): - if key in ATTRMAP: - post[ATTRMAP[key]] = posts[index][key] - del posts[index][key] - if "ts" in post: - old_time = posts[index]["ts"] - # '08/30/22(Tue)02:25:37' - time_spl = old_time.split(":") - if len(time_spl) == 3: - old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S") - else: - old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M") - new_ts = old_ts.isoformat() - posts[index]["ts"] = new_ts - if "msg" in post: - soup = BeautifulSoup(posts[index]["msg"], "html.parser") - msg = soup.get_text(separator="\n") - posts[index]["msg"] = msg - - posts[index]["src"] = "4ch" + all_posts[mapped] = response["posts"] + + # Split into 10,000 chunks + # split_posts = array_split(all_posts, ceil(len(all_posts) / 10000)) + # print("SPLIT CHUNK", len(split_posts)) + # for posts in split_posts: + # with futures.ThreadPoolExecutor(max_workers=6) as executor: + # print("SUBMITTED THREAD FOR", len(posts)) + # executor.submit(self.handle_posts, board, thread, posts) + #await self.handle_posts(board, thread, response["posts"]) + #await asyncio.sleep(1) + await self.handle_posts_thread(all_posts) + + @asyncio.coroutine + def handle_posts_thread(self, posts): + loop = asyncio.get_event_loop() + print("HANDLE POSTS THREAD", len(posts.keys())) + yield from loop.run_in_executor(p, self.handle_posts, posts) + + def handle_posts(self, posts): + print("HANDLE POSTS START") + to_store = [] + for key, post_list in posts.items(): + board, thread = key + print("PROCESSING BOARD", board, "THREAD", thread) + print("POSTS HERE", len(post_list)) + for index, post in enumerate(post_list): + posts[key][index]["type"] = "msg" + + # Calculate hash for post + post_normalised = ujson.dumps(post, sort_keys=True) + hash = siphash(self.hash_key, post_normalised) + hash = str(hash) + redis_key = f"cache.{board}.{thread}.{post['no']}" + key_content = db.r.get(redis_key) + if key_content: + key_content = key_content.decode("ascii") + if key_content == hash: + continue + else: + posts[key][index]["type"] = "update" + #db.r.set(redis_key, hash) + + for key2, value in list(post.items()): + if key2 in ATTRMAP: + post[ATTRMAP[key2]] = posts[key][index][key2] + del posts[key][index][key2] + if "ts" in post: + old_time = posts[key][index]["ts"] + # '08/30/22(Tue)02:25:37' + time_spl = old_time.split(":") + if len(time_spl) == 3: + old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S") + else: + old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M") + new_ts = old_ts.isoformat() + posts[key][index]["ts"] = new_ts + if "msg" in post: + soup = BeautifulSoup(posts[key][index]["msg"], "html.parser") + msg = soup.get_text(separator="\n") + posts[key][index]["msg"] = msg + + posts[key][index]["src"] = "4ch" + to_store.append(posts[key][index]) # print({name_map[name]: val for name, val in post.items()}) #print(f"Got posts: {len(posts)}") - await db.store_message_bulk(posts) + print("HANDLE POSTS DONE") + db.store_message_bulk(to_store) + print("STORE DB DONE") async def fetch(self, url, session, mapped): async with session.get(url) as response: - return (mapped, await response.json()) + try: + return (mapped, await response.json()) + except: + print("FETCH ERROR") + return (mapped, None) async def bound_fetch(self, sem, url, session, mapped): @@ -143,6 +178,7 @@ class Chan4(object): try: return await self.fetch(url, session, mapped) except: + print("BOUND ERROR") return (mapped, None) async def api_call(self, methods={}): diff --git a/util.py b/util.py index 045c95f..09f98da 100644 --- a/util.py +++ b/util.py @@ -3,7 +3,7 @@ import logging log = logging.getLogger("util") -debug = True +debug = False # Color definitions BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)