diff --git a/db.py b/db.py index 05148b0..cfa3fd9 100644 --- a/db.py +++ b/db.py @@ -1,16 +1,68 @@ +import json +from pprint import pprint + +import manticoresearch +from manticoresearch.rest import ApiException from redis import StrictRedis import util +from schemas.mc_s import schema +configuration = manticoresearch.Configuration(host="http://monolith-db-1:9308") +api_client = manticoresearch.ApiClient(configuration) +api_instance = manticoresearch.IndexApi(api_client) log = util.get_logger("db") - +r = StrictRedis(unix_socket_path="/var/run/redis/redis.sock", db=0) def store_message(msg): """ Store a message into Manticore :param msg: dict """ - log.debug(f"store_message() {msg}") + log.info(f"store_message() {msg}") + # normalise fields + for key, value in list(msg.items()): + if value is None: + del msg[key] + if key in schema: + if isinstance(value, int): + if schema[key].startswith("string"): + msg[key] = str(value) -r = StrictRedis(unix_socket_path="/var/run/redis/redis.sock", db=0) + body = [ + { + "insert": { + "index": "main", + "doc": msg + } + } + ] + body_post = "" + for item in body: + body_post += json.dumps(item) + body_post += "\n" + + #print(body_post) + try: + # Bulk index operations + api_response = api_instance.bulk(body_post) + pprint(api_response) + except ApiException as e: + print("Exception when calling IndexApi->bulk: %s\n" % e) + +def update_schema(): + pass + +def create_index(api_client): + util_instance = manticoresearch.UtilsApi(api_client) + schema_types = ", ".join([f"{k} {v}" for k,v in schema.items()]) + + create_query = f"create table if not exists main({schema_types}) engine='columnar'" + print("Schema types", create_query) + util_instance.sql(create_query) + + + +create_index(api_client) +update_schema() diff --git a/docker-compose.yml b/docker-compose.yml index 298fa86..748d9e4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,7 +9,29 @@ services: env_file: - .env volumes_from: - - tmp + - tmp + depends_on: + - "db" + - "redis" + + db: + image: manticoresearch/manticore + restart: always + expose: + - 9308 + - 9312 + ulimits: + nproc: 65535 + nofile: + soft: 65535 + hard: 65535 + memlock: + soft: -1 + hard: -1 + environment: + - MCL=1 + volumes: + - ./docker/data:/var/lib/manticore tmp: image: busybox diff --git a/docker/requirements.txt b/docker/requirements.txt index 32f13b7..41486b8 100644 --- a/docker/requirements.txt +++ b/docker/requirements.txt @@ -1,7 +1,7 @@ wheel -treq beautifulsoup4 redis siphashc -aiohttp +aiohttp[speedups] python-dotenv +manticoresearch diff --git a/monolith.py b/monolith.py index 498752d..efe0e0d 100644 --- a/monolith.py +++ b/monolith.py @@ -1,36 +1,13 @@ import asyncio -import signal -import sys from os import getenv -from twisted.internet import asyncioreactor - import util from sources.ch4 import Chan4 from sources.dis import DiscordClient -loop = asyncio.new_event_loop() -# asyncio.set_event_loop(loop) - - -# asyncioreactor.install(asyncio.new_event_loop()) -asyncioreactor.install(loop) # noqa -from twisted.internet import reactor, task # noqa - - -# Doesn't quite work but better than nothing -def stop(*args): - loop.stop() - reactor.stop() - sys.exit() - - -signal.signal(signal.SIGINT, stop) -# loop.add_signal_handler(signal.SIGINT, functools.partial(stop, loop)) # For development if not getenv("DISCORD_TOKEN", None): from dotenv import load_dotenv - load_dotenv() log = util.get_logger("monolith") @@ -42,23 +19,25 @@ if not token: raise Exception("No Discord token provided") -async def start(): +async def main(loop): log.info("Starting Discord handler.") - client = DiscordClient(loop=loop) + client = DiscordClient() loop.create_task(client.start(token)) + #client.run(token) - log.info("Starting 4chan handler.") - chan = Chan4() - running = chan.run() - deferred = task.ensureDeferred(running) - reactor.callLater(0.1, deferred.callback, "") + # log.info("Starting 4chan handler.") + # chan = Chan4() + # #running = chan.run() + # chan.run() + #deferred.addCallback(lambda: None) + #reactor.callLater(0.1, deferred.callback, None) -loop.create_task(start()) -# reactor.run() -reactor.run() +loop = asyncio.get_event_loop() +loop.create_task(main(loop)) +#reactor.run() try: loop.run_forever() except KeyboardInterrupt: diff --git a/requirements.txt b/requirements.txt index fec66d8..05d8a1b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,9 @@ wheel pre-commit -treq beautifulsoup4 redis siphashc -aiohttp +aiohttp[speedups] python-dotenv +manticoresearch + diff --git a/schemas/ch4_s.py b/schemas/ch4_s.py new file mode 100644 index 0000000..a474458 --- /dev/null +++ b/schemas/ch4_s.py @@ -0,0 +1,22 @@ +ATTRMAP = { + "no": "msg_id", + "now": "ts", + "name": "user", + "trip": "nick", + "id": "nick_id", + "resto": "id_reply", + "com": "msg", + "ext": "file_ext", + "w": "file_w", + "h": "file_h", + "tn_w": "file_tn_w", + "tn_h": "file_tn_h", + "tim": "file_tim", + "fsize": "file_size", + "md5": "file_md5", + "filedeleted": "file_deleted", + "spoiler": "file_spoiler", + "custom_spoiler": "file_custom_spoiler", + "m_img": "file_m_img", + "time": "unix_time", +} \ No newline at end of file diff --git a/schemas/dis_s.py b/schemas/dis_s.py new file mode 100644 index 0000000..e5459e8 --- /dev/null +++ b/schemas/dis_s.py @@ -0,0 +1,19 @@ +ATTRMAP = { + "msg": "content", + "msg_id": "id", + "nick": "author.name", + "host": "author.discriminator", + "ident": "author.nick", + "time": "created_at", + "channel": "channel.name", + "channel_nsfw": "channel.nsfw", + "bot": "author.bot", + "nick_id": "author.id", + "channel_id": "channel.id", + "net": "author.guild.name", + "net_id": "author.guild.id", + "guild_member_count": "author.guild.member_count", + "channel_category": "channel.category.name", + "channel_category_id": "channel.category.id", + "channel_category_nsfw": "channel.category.nsfw", +} \ No newline at end of file diff --git a/schemas/mc_s.py b/schemas/mc_s.py new file mode 100644 index 0000000..781cf1d --- /dev/null +++ b/schemas/mc_s.py @@ -0,0 +1,199 @@ +schema = { + "id": "bigint", + + # 1 + "archived": "int", + + # 1662150538 + "archived_on": "string indexed attribute", + + # CF + "board_flag": "string indexed attribute", + + # true, false + "bot": "bool", + + # 0 + "bumplimit": "int", + + # mod + "capcode": "string indexed attribute", + + # 393598265, #main, Rust Programmer's Club + "channel": "text", + + # Miscellaneous + "channel_category": "text", + + # 360581491907887100 + "channel_category_id": "string indexed attribute", + + # true, false + "channel_category_nsfw": "bool", + + # 734229101216530600 + "channel_id": "string indexed attribute", + + # true, false + "channel_nsfw": "bool", + + # 1 + "closed": "int", + + # GB + "country": "string indexed attribute", + + # United Kingdom + "country_name": "text", + + # 5 + "file_custom_spoiler": "int", + + # 1 + "file_deleted": "int", + + # .jpg + "file_ext": "string indexed attribute", + + # 1024 + "file_h": "int", + + # 1 + "file_m_img": "int", + + # tlArbrZDj7kbheSKPyDU0w== + "file_md5": "string indexed attribute", + + # 88967 + "file_size": "int", + + # 1 + "file_spoiler": "int", + + # 1662149436322819 + "file_tim": "string indexed attribute", + + # 250 + "file_tn_h": "int", + + # 241 + "file_tn_w": "int", + + # 1080 + "file_w": "int", + + # 6E646BED-297E-4B4F-9082-31EDADC49472 + "filename": "text", + + # Confederate + "flag_name": "string indexed attribute", + + + "guild": "text", # LEGACY -> channel + "guild_id": "string indexed attribute", # LEGACY -> channel_id + + # 36180 + "guild_member_count": "int", # ? -> channel_member_count + + # 9f7b2e6a0e9b + "host": "text", + + # 2447746 + "id_reply": "string indexed attribute", # resto + + # "522, trans rights shill", myname + "ident": "text", + + # 0 + "imagelimit": "int", + + # 0 + "images": "int", + + # 0 + "mode": "string indexed attribute", + + # b0n3 + "modearg": "string indexed attribute", + + # The quick brown fox jumped over the lazy dog + "msg": "text", + + # 393605030 + "msg_id": "string indexed attribute", + + # pol + "net": "text", + + # 273534239310479360 + "net_id": "string indexed attribute", + + # André de Santa Cruz, santa + "nick": "text", + + # 773802568324350000 + "nick_id": "string indexed attribute", + + # 1, 2, 3, 4, 5, 6, ... + "num": "int", + + # 12 + "replies": "int", + + # redacted-hate-thread + "semantic_url": "string indexed attribute", + + # -1 -> 1 as float + "sentiment": "float", + + # 2022 + "since4pass": "int", + + # 4ch, irc, dis + "src": "string indexed attribute", + + # true, false + "status": "bool", + + # 1 + "sticky": "int", + + # 1000 + "sticky_cap": "int", + + # Redacted Hate Thread, Gorbachev is dead + "sub": "string indexed attribute", + + # Loop + "tag": "string indexed attribute", + + # 100 + "tail_size": "int", + + "time": "timestamp", # LEGACY -> ts + + "tokens": "text", # ??? + + # 2022-09-02T16:10:36 + "ts": "timestamp", + + # msg, notice, update, who + "type": "string indexed attribute", + + # 10 + "unique_ips": "int", + + # 1662149436 + "unix_time": "string indexed attribute", + + # Anonymous + "user": "text", + + "user_id": "string indexed attribute", # LEGACY -> nick_id + + # 1, 2 + "version_sentiment": "int", + + # 1, 2 + "version_tokens": "int", +} \ No newline at end of file diff --git a/sources/ch4.py b/sources/ch4.py index 2e3743f..bc677de 100644 --- a/sources/ch4.py +++ b/sources/ch4.py @@ -8,11 +8,10 @@ from typing import Any, Dict import treq from bs4 import BeautifulSoup from siphashc import siphash -from twisted.internet.defer import inlineCallbacks import db import util - +from schemas.ch4_s import ATTRMAP class Chan4(object): """ @@ -24,10 +23,11 @@ class Chan4(object): self.log = util.get_logger(name) self.api_endpoint = "https://a.4cdn.org" - self.boards = [] + self.boards = ["out"] self.thread_list = {} - self.thread_deferreds = [] + #self.thread_deferreds = [] + #self.content_deferreds = [] self.log.info(f"Starting crawler bot to {self.api_endpoint}") @@ -45,61 +45,74 @@ class Chan4(object): @inlineCallbacks def run(self): yield self.get_board_list() - yield self.get_thread_lists() - yield self.get_thread_contents() + def got_thread_lists(self, thread_lists): + print("GOT THREAD LIST", thread_lists) + # Instead of while True, do it again! + d = self.get_thread_lists() + d.addCallback(self.got_thread_lists) + # @inlineCallbacks + # def mainloop(self): + # while True: + # yield self.get_thread_lists() + # yield self.get_thread_contents() + + @inlineCallbacks def get_board_list(self): - self.log.info("Getting board list") + self.log.debug("Getting board list") response = self.api_call("boards.json") response.addCallback(self.got_board_list) - return response + yield response + @inlineCallbacks def got_board_list(self, board_list): if board_list["success"]: for board in board_list["response"]["boards"]: self.boards.append(board["board"]) + self.log.debug(f"Got boards: {self.boards}") + d = self.get_thread_lists() + d.addCallback(self.got_thread_lists) + yield d @inlineCallbacks def get_thread_lists(self): + thread_deferreds = [] for board in self.boards: - yield self.get_thread_list(board) - # self.thread_deferreds.append(d) - # yield defer.gatherResults(self.thread_deferreds) - # self.thread_deferreds = [] - # self.log.info("Finished getting thread lists") - - @inlineCallbacks - def get_thread_contents(self): - for board in self.thread_list.keys(): - for page in self.thread_list[board]: - for threads in page["threads"]: - no = threads["no"] - yield self.get_thread_content(board, no) - # self.content_deferreds.append(d) - # al = yield defer.gatherResults(self.content_deferreds) - # self.content_deferreds = [] - # self.log.info("Finished getting content") + d = self.get_thread_list(board) + d.addCallback(self.got_thread_list, board) + thread_deferreds.append(d) + + yield defer.gatherResults(thread_deferreds) def get_thread_list(self, board): - self.log.info(f"Getting thread list for {board}") + self.log.debug(f"Getting thread list for {board}") response = self.api_call(f"{board}/catalog.json") - response.addCallback(self.got_thread_list, board) return response def got_thread_list(self, thread_list, board): + if not thread_list: + self.log.error(f"Thread list invalid: {thread_list} {board}") + return if thread_list["success"]: - self.thread_list[board] = thread_list["response"] + #self.thread_list[board] = thread_list["response"] + for page in thread_list["response"]: + for threads in page["threads"]: + no = threads["no"] + d = self.get_thread_content(board, no) + d.addCallback(self.got_thread_content, board, no) self.log.info(f"Got thread list for {board}: {len(thread_list)}") def get_thread_content(self, board, thread): - self.log.info(f"Getting information for thread {thread} on board {board}") + self.log.debug(f"Getting information for thread {thread} on board {board}") response = self.api_call(f"{board}/thread/{thread}.json") - response.addCallback(self.got_thread_content, board, thread) return response def got_thread_content(self, thread_content, board, thread): + if not thread_content: + self.log.error(f"Thread content invalid: {thread_content} {board} {thread}") + return if thread_content["success"]: - self.log.info(f"Got thread content for thread {thread} on board {board}") + self.log.debug(f"Got thread content for thread {thread} on board {board}") for post in thread_content["response"]["posts"]: # print(post) self.handle_post(board, thread, post) @@ -112,28 +125,6 @@ class Chan4(object): ) def handle_post(self, board, thread, post): - name_map = { - "no": "msg_id", - "now": "ts", - "name": "user", - "trip": "nick", - "id": "nick_id", - "resto": "id_reply", - "com": "msg", - "ext": "file_ext", - "w": "file_w", - "h": "file_h", - "tn_w": "file_tn_w", - "tn_h": "file_tn_h", - "tim": "file_tim", - "fsize": "file_size", - "md5": "file_md5", - "filedeleted": "file_deleted", - "spoiler": "file_spoiler", - "custom_spoiler": "file_custom_spoiler", - "m_img": "file_m_img", - "time": "unix_time", - } post["type"] = "msg" # Calculate hash for post @@ -152,8 +143,8 @@ class Chan4(object): # Check if hash exists # Store the hash for key, value in list(post.items()): - if key in name_map: - post[name_map[key]] = post[key] + if key in ATTRMAP: + post[ATTRMAP[key]] = post[key] del post[key] if "ts" in post: old_time = post["ts"] @@ -175,19 +166,25 @@ class Chan4(object): # print({name_map[name]: val for name, val in post.items()}) db.store_message(post) + def dump(self, *args, **kwargs): + self.log.error(f"Error: {args} {kwargs}") + @inlineCallbacks def callback_api_call(self, response, result): + result["status"] = response.code try: text = yield response.content() except: # noqa self.log.error("Error with API call") - return + return False + #print("RESP TEXT", text) try: result["response"] = json.loads(text) except json.decoder.JSONDecodeError: result["success"] = "ERROR" result["message"] = "Error parsing JSON." return result + #print("RESP AFTER JSON", result) result["status"] = response.code if response.code == 200: result["success"] = True @@ -208,9 +205,10 @@ class Chan4(object): response = treq.get(url, headers=headers) result: Dict[str, Any] = { "success": False, - "message": "Invalid Method", + "message": "Call not successful", "response": None, "status": None, } response.addCallback(self.callback_api_call, result) + response.addErrback(self.dump, url=url) return response diff --git a/sources/dis.py b/sources/dis.py index 9991ccb..3c87b16 100644 --- a/sources/dis.py +++ b/sources/dis.py @@ -6,25 +6,7 @@ import discord import db import util -ATTRMAP = { - "msg": "content", - "msg_id": "id", - "nick": "author.name", - "host": "author.discriminator", - "ident": "author.nick", - "time": "created_at", - "channel": "channel.name", - "channel_nsfw": "channel.nsfw", - "bot": "author.bot", - "user_id": "author.id", - "channel_id": "channel.id", - "net": "author.guild.name", - "net_id": "author.guild.id", - "guild_member_count": "author.guild.member_count", - "channel_category": "channel.category.name", - "channel_category_id": "channel.category.id", - "channel_category_nsfw": "channel.category.nsfw", -} +from schemas.dis_s import ATTRMAP class DiscordClient(discord.Client): diff --git a/util.py b/util.py index 045c95f..09f98da 100644 --- a/util.py +++ b/util.py @@ -3,7 +3,7 @@ import logging log = logging.getLogger("util") -debug = True +debug = False # Color definitions BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)