diff --git a/db.py b/db.py index cfa3fd9..547c5e5 100644 --- a/db.py +++ b/db.py @@ -19,8 +19,7 @@ def store_message(msg): Store a message into Manticore :param msg: dict """ - log.info(f"store_message() {msg}") - + print("DISCORD MSGLEN", len(msg["msg"])) # normalise fields for key, value in list(msg.items()): if value is None: @@ -46,8 +45,46 @@ def store_message(msg): #print(body_post) try: # Bulk index operations - api_response = api_instance.bulk(body_post) - pprint(api_response) + api_response = api_instance.bulk(body_post, async_req=True) + #print(api_response) + except ApiException as e: + print("Exception when calling IndexApi->bulk: %s\n" % e) + +async def store_message_bulk(messages): + """ + Store a message into Manticore + :param msg: dict + """ + print("BULK", len(messages)) + total = [] + for msg in messages: + # normalise fields + for key, value in list(msg.items()): + if value is None: + del msg[key] + if key in schema: + if isinstance(value, int): + if schema[key].startswith("string"): + msg[key] = str(value) + + body = { + "insert": { + "index": "main", + "doc": msg + } + } + total.append(body) + + body_post = "" + for item in total: + body_post += json.dumps(item) + body_post += "\n" + + #print(body_post) + try: + # Bulk index operations + api_response = api_instance.bulk(body_post, async_req=True) + #print(api_response) except ApiException as e: print("Exception when calling IndexApi->bulk: %s\n" % e) diff --git a/docker-compose.yml b/docker-compose.yml index 748d9e4..8b53ec1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,8 +11,8 @@ services: volumes_from: - tmp depends_on: - - "db" - - "redis" + - db + db: image: manticoresearch/manticore @@ -33,6 +33,7 @@ services: volumes: - ./docker/data:/var/lib/manticore + tmp: image: busybox command: chmod -R 777 /var/run/redis @@ -40,12 +41,12 @@ services: - /var/run/redis redis: - image: redis - command: redis-server /etc/redis.conf - volumes: - - ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf - volumes_from: - - tmp + image: redis + command: redis-server /etc/redis.conf + volumes: + - ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf + volumes_from: + - tmp networks: default: diff --git a/monolith.py b/monolith.py index efe0e0d..46bc46b 100644 --- a/monolith.py +++ b/monolith.py @@ -25,14 +25,9 @@ async def main(loop): loop.create_task(client.start(token)) #client.run(token) - # log.info("Starting 4chan handler.") - # chan = Chan4() - # #running = chan.run() - # chan.run() - #deferred.addCallback(lambda: None) - #reactor.callLater(0.1, deferred.callback, None) - - + log.info("Starting 4chan handler.") + chan = Chan4() + await chan.run() loop = asyncio.get_event_loop() diff --git a/sources/ch4.py b/sources/ch4.py index bc677de..e42e597 100644 --- a/sources/ch4.py +++ b/sources/ch4.py @@ -12,6 +12,8 @@ from siphashc import siphash import db import util from schemas.ch4_s import ATTRMAP +import aiohttp +import asyncio class Chan4(object): """ @@ -42,173 +44,123 @@ class Chan4(object): self.hash_key = self.hash_key.decode("ascii") self.log.debug(f"Decoded hash key: {self.hash_key}") - @inlineCallbacks - def run(self): - yield self.get_board_list() - - def got_thread_lists(self, thread_lists): - print("GOT THREAD LIST", thread_lists) - # Instead of while True, do it again! - d = self.get_thread_lists() - d.addCallback(self.got_thread_lists) - # @inlineCallbacks - # def mainloop(self): - # while True: - # yield self.get_thread_lists() - # yield self.get_thread_contents() - - @inlineCallbacks - def get_board_list(self): - self.log.debug("Getting board list") - response = self.api_call("boards.json") - response.addCallback(self.got_board_list) - yield response - - @inlineCallbacks - def got_board_list(self, board_list): - if board_list["success"]: - for board in board_list["response"]["boards"]: - self.boards.append(board["board"]) - self.log.debug(f"Got boards: {self.boards}") - d = self.get_thread_lists() - d.addCallback(self.got_thread_lists) - yield d - - @inlineCallbacks - def get_thread_lists(self): - thread_deferreds = [] - for board in self.boards: - d = self.get_thread_list(board) - d.addCallback(self.got_thread_list, board) - thread_deferreds.append(d) - - yield defer.gatherResults(thread_deferreds) - - def get_thread_list(self, board): - self.log.debug(f"Getting thread list for {board}") - response = self.api_call(f"{board}/catalog.json") - return response - - def got_thread_list(self, thread_list, board): - if not thread_list: - self.log.error(f"Thread list invalid: {thread_list} {board}") - return - if thread_list["success"]: - #self.thread_list[board] = thread_list["response"] - for page in thread_list["response"]: + async def run(self): + await self.get_board_list() + + async def get_board_list(self): + # responses = await self.api_call({"_": "boards.json"}) + # for mapped, response in responses: + # if not response: + # continue + # for board in response["boards"]: + # self.boards.append(board["board"]) + # self.log.debug(f"Got boards: {self.boards}") + + await self.get_thread_lists(self.boards) + + async def get_thread_lists(self, boards): + self.log.debug(f"Getting thread list for {boards}") + board_urls = {board: f"{board}/catalog.json" for board in boards} + responses = await self.api_call(board_urls) + to_get = [] + for mapped, response in responses: + if not response: + continue + for page in response: for threads in page["threads"]: no = threads["no"] - d = self.get_thread_content(board, no) - d.addCallback(self.got_thread_content, board, no) - self.log.info(f"Got thread list for {board}: {len(thread_list)}") - - def get_thread_content(self, board, thread): - self.log.debug(f"Getting information for thread {thread} on board {board}") - response = self.api_call(f"{board}/thread/{thread}.json") - return response - - def got_thread_content(self, thread_content, board, thread): - if not thread_content: - self.log.error(f"Thread content invalid: {thread_content} {board} {thread}") - return - if thread_content["success"]: + to_get.append((mapped, no)) + + self.log.info(f"Got thread list for {mapped}: {len(response)}") + await self.get_threads_content(to_get) + + # Recurse + await self.get_thread_lists(self.boards) + + async def get_threads_content(self, thread_list): + thread_urls = {(board, thread): f"{board}/thread/{thread}.json" for board, thread in thread_list} + self.log.debug(f"Getting information for threads: {thread_urls}") + responses = await self.api_call(thread_urls) + self.log.debug(f"Got information for threads: {thread_urls}") + for mapped, response in responses: + if not response: + continue + board, thread = mapped self.log.debug(f"Got thread content for thread {thread} on board {board}") - for post in thread_content["response"]["posts"]: - # print(post) - self.handle_post(board, thread, post) - else: - self.log.error( - ( - f"Error fetching thread {thread} on board {board}: " - f"{thread_content['message']}" - ) - ) - - def handle_post(self, board, thread, post): - post["type"] = "msg" - - # Calculate hash for post - post_normalised = json.dumps(post, sort_keys=True) - hash = siphash(self.hash_key, post_normalised) - hash = str(hash) - redis_key = f"cache.{board}.{thread}.{post['no']}" - key_content = db.r.get(redis_key) - if key_content: - key_content = key_content.decode("ascii") - if key_content == hash: - return - else: - post["type"] = "update" - db.r.set(redis_key, hash) - # Check if hash exists - # Store the hash - for key, value in list(post.items()): - if key in ATTRMAP: - post[ATTRMAP[key]] = post[key] - del post[key] - if "ts" in post: - old_time = post["ts"] - # '08/30/22(Tue)02:25:37' - time_spl = old_time.split(":") - if len(time_spl) == 3: - old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S") - else: - old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M") - new_ts = old_ts.isoformat() - post["ts"] = new_ts - if "msg" in post: - soup = BeautifulSoup(post["msg"], "html.parser") - msg = soup.get_text(separator="\n") - post["msg"] = msg - - post["src"] = "4ch" - - # print({name_map[name]: val for name, val in post.items()}) - db.store_message(post) - - def dump(self, *args, **kwargs): - self.log.error(f"Error: {args} {kwargs}") - - @inlineCallbacks - def callback_api_call(self, response, result): - result["status"] = response.code - try: - text = yield response.content() - except: # noqa - self.log.error("Error with API call") - return False - #print("RESP TEXT", text) - try: - result["response"] = json.loads(text) - except json.decoder.JSONDecodeError: - result["success"] = "ERROR" - result["message"] = "Error parsing JSON." - return result - #print("RESP AFTER JSON", result) - result["status"] = response.code - if response.code == 200: - result["success"] = True - result["message"] = "OK" - else: - result["message"] = "API ERROR" - - return result - - def api_call(self, method: str): + await self.handle_posts(board, thread, response["posts"]) + + async def handle_posts(self, board, thread, posts): + for index, post in enumerate(posts): + posts[index]["type"] = "msg" + + # Calculate hash for post + post_normalised = json.dumps(post, sort_keys=True) + hash = siphash(self.hash_key, post_normalised) + hash = str(hash) + redis_key = f"cache.{board}.{thread}.{post['no']}" + key_content = db.r.get(redis_key) + if key_content: + key_content = key_content.decode("ascii") + if key_content == hash: + return + else: + posts[index]["type"] = "update" + db.r.set(redis_key, hash) + + for key, value in list(post.items()): + if key in ATTRMAP: + post[ATTRMAP[key]] = posts[index][key] + del posts[index][key] + if "ts" in post: + old_time = posts[index]["ts"] + # '08/30/22(Tue)02:25:37' + time_spl = old_time.split(":") + if len(time_spl) == 3: + old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S") + else: + old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M") + new_ts = old_ts.isoformat() + posts[index]["ts"] = new_ts + if "msg" in post: + soup = BeautifulSoup(posts[index]["msg"], "html.parser") + msg = soup.get_text(separator="\n") + posts[index]["msg"] = msg + + posts[index]["src"] = "4ch" + + # print({name_map[name]: val for name, val in post.items()}) + #print(f"Got posts: {len(posts)}") + await db.store_message_bulk(posts) + + async def fetch(self, url, session, mapped): + async with session.get(url) as response: + return (mapped, await response.json()) + + + async def bound_fetch(self, sem, url, session, mapped): + # Getter function with semaphore. + async with sem: + try: + return await self.fetch(url, session, mapped) + except: + return (mapped, None) + + async def api_call(self, methods={}): headers = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; rv:68.0) Gecko/20100101 Firefox/68.0" ) } - url = f"{self.api_endpoint}/{method}" - self.log.debug(f"GET {url}") - response = treq.get(url, headers=headers) - result: Dict[str, Any] = { - "success": False, - "message": "Call not successful", - "response": None, - "status": None, - } - response.addCallback(self.callback_api_call, result) - response.addErrback(self.dump, url=url) - return response + tasks = [] + sem = asyncio.Semaphore(100) + connector = aiohttp.TCPConnector(limit=None) + async with aiohttp.ClientSession(connector=connector) as session: + for mapped, method in methods.items(): + url = f"{self.api_endpoint}/{method}" + self.log.debug(f"GET {url}") + task = asyncio.create_task(self.bound_fetch(sem, url, session, mapped)) + #task = asyncio.ensure_future(self.bound_fetch(sem, url, session)) + tasks.append(task) + responses = await asyncio.gather(*tasks) + return responses + diff --git a/util.py b/util.py index 09f98da..045c95f 100644 --- a/util.py +++ b/util.py @@ -3,7 +3,7 @@ import logging log = logging.getLogger("util") -debug = False +debug = True # Color definitions BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)