diff --git a/db.py b/db.py index e58f0bf..97c615c 100644 --- a/db.py +++ b/db.py @@ -1,12 +1,13 @@ +from math import ceil + import manticoresearch +import ujson from manticoresearch.rest import ApiException +from numpy import array_split from redis import StrictRedis import util from schemas.mc_s import schema -import ujson -from numpy import array_split -from math import ceil configuration = manticoresearch.Configuration(host="http://monolith-db-1:9308") api_client = manticoresearch.ApiClient(configuration) @@ -15,6 +16,7 @@ api_instance = manticoresearch.IndexApi(api_client) log = util.get_logger("db") r = StrictRedis(unix_socket_path="/var/run/redis/redis.sock", db=0) + def store_message(msg): """ Store a message into Manticore @@ -30,27 +32,21 @@ def store_message(msg): if schema[key].startswith("string"): msg[key] = str(value) - body = [ - { - "insert": { - "index": "main", - "doc": msg - } - } - ] + body = [{"insert": {"index": "main", "doc": msg}}] body_post = "" for item in body: body_post += ujson.dumps(item) body_post += "\n" - #print(body_post) + # print(body_post) try: # Bulk index operations - api_response = api_instance.bulk(body_post, async_req=True) - #print(api_response) + api_instance.bulk(body_post, async_req=True) + # print(api_response) except ApiException as e: print("Exception when calling IndexApi->bulk: %s\n" % e) + def store_message_bulk(data): """ Store a message into Manticore @@ -71,42 +67,38 @@ def store_message_bulk(data): if schema[key].startswith("string"): msg[key] = str(value) - body = { - "insert": { - "index": "main", - "doc": msg - } - } + body = {"insert": {"index": "main", "doc": msg}} total.append(body) - + body_post = "" for item in total: body_post += ujson.dumps(item) body_post += "\n" - #print(body_post) + # print(body_post) try: # Bulk index operations - api_response = api_instance.bulk(body_post, async_req=True) - #print(api_response) + api_instance.bulk(body_post, async_req=True) + # print(api_response) except ApiException as e: print("Exception when calling IndexApi->bulk: %s\n" % e) print("FINISHED PROCESSING SPLIT") print("BULK FINISH") + def update_schema(): pass + def create_index(api_client): util_instance = manticoresearch.UtilsApi(api_client) - schema_types = ", ".join([f"{k} {v}" for k,v in schema.items()]) - + schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()]) + create_query = f"create table if not exists main({schema_types}) engine='columnar'" print("Schema types", create_query) util_instance.sql(create_query) - create_index(api_client) update_schema() diff --git a/monolith.py b/monolith.py index 46bc46b..29c6001 100644 --- a/monolith.py +++ b/monolith.py @@ -8,6 +8,7 @@ from sources.dis import DiscordClient # For development if not getenv("DISCORD_TOKEN", None): from dotenv import load_dotenv + load_dotenv() log = util.get_logger("monolith") @@ -23,7 +24,7 @@ async def main(loop): log.info("Starting Discord handler.") client = DiscordClient() loop.create_task(client.start(token)) - #client.run(token) + # client.run(token) log.info("Starting 4chan handler.") chan = Chan4() @@ -32,7 +33,7 @@ async def main(loop): loop = asyncio.get_event_loop() loop.create_task(main(loop)) -#reactor.run() +# reactor.run() try: loop.run_forever() except KeyboardInterrupt: diff --git a/schemas/ch4_s.py b/schemas/ch4_s.py index a474458..b4b1673 100644 --- a/schemas/ch4_s.py +++ b/schemas/ch4_s.py @@ -19,4 +19,4 @@ ATTRMAP = { "custom_spoiler": "file_custom_spoiler", "m_img": "file_m_img", "time": "unix_time", -} \ No newline at end of file +} diff --git a/schemas/dis_s.py b/schemas/dis_s.py index e5459e8..dc843c4 100644 --- a/schemas/dis_s.py +++ b/schemas/dis_s.py @@ -16,4 +16,4 @@ ATTRMAP = { "channel_category": "channel.category.name", "channel_category_id": "channel.category.id", "channel_category_nsfw": "channel.category.nsfw", -} \ No newline at end of file +} diff --git a/schemas/mc_s.py b/schemas/mc_s.py index 781cf1d..b1e469a 100644 --- a/schemas/mc_s.py +++ b/schemas/mc_s.py @@ -1,199 +1,132 @@ schema = { "id": "bigint", - # 1 "archived": "int", - # 1662150538 "archived_on": "string indexed attribute", - # CF "board_flag": "string indexed attribute", - # true, false "bot": "bool", - # 0 "bumplimit": "int", - # mod "capcode": "string indexed attribute", - # 393598265, #main, Rust Programmer's Club "channel": "text", - # Miscellaneous "channel_category": "text", - # 360581491907887100 "channel_category_id": "string indexed attribute", - # true, false "channel_category_nsfw": "bool", - # 734229101216530600 "channel_id": "string indexed attribute", - # true, false "channel_nsfw": "bool", - # 1 "closed": "int", - # GB "country": "string indexed attribute", - # United Kingdom "country_name": "text", - # 5 "file_custom_spoiler": "int", - # 1 "file_deleted": "int", - # .jpg "file_ext": "string indexed attribute", - # 1024 "file_h": "int", - # 1 "file_m_img": "int", - # tlArbrZDj7kbheSKPyDU0w== "file_md5": "string indexed attribute", - # 88967 "file_size": "int", - # 1 "file_spoiler": "int", - # 1662149436322819 "file_tim": "string indexed attribute", - # 250 "file_tn_h": "int", - # 241 "file_tn_w": "int", - # 1080 "file_w": "int", - # 6E646BED-297E-4B4F-9082-31EDADC49472 "filename": "text", - # Confederate "flag_name": "string indexed attribute", - - - "guild": "text", # LEGACY -> channel - "guild_id": "string indexed attribute", # LEGACY -> channel_id - + "guild": "text", # LEGACY -> channel + "guild_id": "string indexed attribute", # LEGACY -> channel_id # 36180 - "guild_member_count": "int", # ? -> channel_member_count - + "guild_member_count": "int", # ? -> channel_member_count # 9f7b2e6a0e9b "host": "text", - # 2447746 - "id_reply": "string indexed attribute", # resto - + "id_reply": "string indexed attribute", # resto # "522, trans rights shill", myname "ident": "text", - # 0 "imagelimit": "int", - # 0 "images": "int", - # 0 "mode": "string indexed attribute", - # b0n3 "modearg": "string indexed attribute", - # The quick brown fox jumped over the lazy dog "msg": "text", - # 393605030 "msg_id": "string indexed attribute", - # pol "net": "text", - # 273534239310479360 "net_id": "string indexed attribute", - # André de Santa Cruz, santa "nick": "text", - # 773802568324350000 "nick_id": "string indexed attribute", - # 1, 2, 3, 4, 5, 6, ... "num": "int", - # 12 "replies": "int", - # redacted-hate-thread "semantic_url": "string indexed attribute", - # -1 -> 1 as float "sentiment": "float", - # 2022 "since4pass": "int", - # 4ch, irc, dis "src": "string indexed attribute", - # true, false "status": "bool", - # 1 "sticky": "int", - # 1000 "sticky_cap": "int", - # Redacted Hate Thread, Gorbachev is dead "sub": "string indexed attribute", - # Loop "tag": "string indexed attribute", - # 100 "tail_size": "int", - - "time": "timestamp", # LEGACY -> ts - - "tokens": "text", # ??? - + "time": "timestamp", # LEGACY -> ts + "tokens": "text", # ??? # 2022-09-02T16:10:36 "ts": "timestamp", - # msg, notice, update, who "type": "string indexed attribute", - # 10 "unique_ips": "int", - # 1662149436 "unix_time": "string indexed attribute", - # Anonymous "user": "text", - - "user_id": "string indexed attribute", # LEGACY -> nick_id - + "user_id": "string indexed attribute", # LEGACY -> nick_id # 1, 2 "version_sentiment": "int", - # 1, 2 "version_tokens": "int", -} \ No newline at end of file +} diff --git a/sources/ch4.py b/sources/ch4.py index 3434377..324e8ea 100644 --- a/sources/ch4.py +++ b/sources/ch4.py @@ -1,23 +1,22 @@ # Python modules can't start with a number... -import ujson +import asyncio import random import string +from concurrent.futures import ProcessPoolExecutor from datetime import datetime +import aiohttp +import ujson from bs4 import BeautifulSoup from siphashc import siphash import db import util from schemas.ch4_s import ATTRMAP -import aiohttp -import asyncio -from numpy import array_split -from math import ceil -from concurrent.futures import ProcessPoolExecutor p = ProcessPoolExecutor(10) + class Chan4(object): """ 4chan indexer, crawler and ingester. @@ -28,12 +27,12 @@ class Chan4(object): self.log = util.get_logger(name) self.api_endpoint = "https://a.4cdn.org" - #self.boards = ["out", "g", "a", "3", "pol"] # + # self.boards = ["out", "g", "a", "3", "pol"] # self.boards = [] self.thread_list = {} - #self.thread_deferreds = [] - #self.content_deferreds = [] + # self.thread_deferreds = [] + # self.content_deferreds = [] self.log.info(f"Starting crawler bot to {self.api_endpoint}") @@ -82,7 +81,10 @@ class Chan4(object): await self.get_thread_lists(self.boards) async def get_threads_content(self, thread_list): - thread_urls = {(board, thread): f"{board}/thread/{thread}.json" for board, thread in thread_list} + thread_urls = { + (board, thread): f"{board}/thread/{thread}.json" + for board, thread in thread_list + } self.log.debug(f"Getting information for threads: {thread_urls}") responses = await self.api_call(thread_urls) self.log.debug(f"Got information for threads: {thread_urls}") @@ -101,8 +103,8 @@ class Chan4(object): # with futures.ThreadPoolExecutor(max_workers=6) as executor: # print("SUBMITTED THREAD FOR", len(posts)) # executor.submit(self.handle_posts, board, thread, posts) - #await self.handle_posts(board, thread, response["posts"]) - #await asyncio.sleep(1) + # await self.handle_posts(board, thread, response["posts"]) + # await asyncio.sleep(1) await self.handle_posts_thread(all_posts) @asyncio.coroutine @@ -158,7 +160,7 @@ class Chan4(object): to_store.append(posts[key][index]) # print({name_map[name]: val for name, val in post.items()}) - #print(f"Got posts: {len(posts)}") + # print(f"Got posts: {len(posts)}") print("HANDLE POSTS DONE") db.store_message_bulk(to_store) print("STORE DB DONE") @@ -167,26 +169,20 @@ class Chan4(object): async with session.get(url) as response: try: return (mapped, await response.json()) - except: + except: # noqa print("FETCH ERROR") return (mapped, None) - async def bound_fetch(self, sem, url, session, mapped): # Getter function with semaphore. async with sem: try: return await self.fetch(url, session, mapped) - except: + except: # noqa print("BOUND ERROR") return (mapped, None) async def api_call(self, methods={}): - headers = { - "User-Agent": ( - "Mozilla/5.0 (Windows NT 10.0; rv:68.0) Gecko/20100101 Firefox/68.0" - ) - } tasks = [] sem = asyncio.Semaphore(100) connector = aiohttp.TCPConnector(limit=None) @@ -195,8 +191,7 @@ class Chan4(object): url = f"{self.api_endpoint}/{method}" self.log.debug(f"GET {url}") task = asyncio.create_task(self.bound_fetch(sem, url, session, mapped)) - #task = asyncio.ensure_future(self.bound_fetch(sem, url, session)) + # task = asyncio.ensure_future(self.bound_fetch(sem, url, session)) tasks.append(task) responses = await asyncio.gather(*tasks) return responses - diff --git a/sources/dis.py b/sources/dis.py index 3c87b16..000e786 100644 --- a/sources/dis.py +++ b/sources/dis.py @@ -5,7 +5,6 @@ import discord import db import util - from schemas.dis_s import ATTRMAP