Implement indexing into Apache Druid #1
2
db.py
2
db.py
|
@ -55,6 +55,8 @@ def store_message_bulk(data):
|
||||||
print("BULK", len(data))
|
print("BULK", len(data))
|
||||||
if not data:
|
if not data:
|
||||||
return
|
return
|
||||||
|
# 10000: maximum inserts we can submit to
|
||||||
|
# Manticore as of Sept 2022
|
||||||
split_posts = array_split(data, ceil(len(data) / 10000))
|
split_posts = array_split(data, ceil(len(data) / 10000))
|
||||||
for messages in split_posts:
|
for messages in split_posts:
|
||||||
print("PROCESSING SPLIT OF", len(messages), "MESSAGES")
|
print("PROCESSING SPLIT OF", len(messages), "MESSAGES")
|
||||||
|
|
|
@ -17,9 +17,29 @@ from schemas.ch4_s import ATTRMAP
|
||||||
from numpy import array_split
|
from numpy import array_split
|
||||||
from math import ceil
|
from math import ceil
|
||||||
|
|
||||||
p = ProcessPoolExecutor(10)
|
|
||||||
|
|
||||||
|
|
||||||
|
# CONFIGURATION #
|
||||||
|
|
||||||
|
# Number of 4chan threads to request at once
|
||||||
|
THREADS_CONCURRENT = 1000
|
||||||
|
|
||||||
|
# Seconds to wait between every THREADS_CONCURRENT requests
|
||||||
|
THREADS_DELAY = 0.1
|
||||||
|
|
||||||
|
# Seconds to wait between crawls
|
||||||
|
CRAWL_DELAY = 5
|
||||||
|
|
||||||
|
# Semaphore value ?
|
||||||
|
THREADS_SEMAPHORE = 100
|
||||||
|
|
||||||
|
# Maximum number of CPU threads to use for post processing
|
||||||
|
CPU_THREADS = 2
|
||||||
|
|
||||||
|
# CONFIGURATION END #
|
||||||
|
|
||||||
|
p = ProcessPoolExecutor(CPU_THREADS)
|
||||||
|
|
||||||
class Chan4(object):
|
class Chan4(object):
|
||||||
"""
|
"""
|
||||||
4chan indexer, crawler and ingester.
|
4chan indexer, crawler and ingester.
|
||||||
|
@ -32,7 +52,6 @@ class Chan4(object):
|
||||||
self.api_endpoint = "https://a.4cdn.org"
|
self.api_endpoint = "https://a.4cdn.org"
|
||||||
# self.boards = ["out", "g", "a", "3", "pol"] #
|
# self.boards = ["out", "g", "a", "3", "pol"] #
|
||||||
self.boards = []
|
self.boards = []
|
||||||
self.thread_list = {}
|
|
||||||
|
|
||||||
# self.thread_deferreds = []
|
# self.thread_deferreds = []
|
||||||
# self.content_deferreds = []
|
# self.content_deferreds = []
|
||||||
|
@ -52,6 +71,9 @@ class Chan4(object):
|
||||||
|
|
||||||
async def run(self):
|
async def run(self):
|
||||||
await self.get_board_list()
|
await self.get_board_list()
|
||||||
|
while True:
|
||||||
|
await self.get_thread_lists(self.boards)
|
||||||
|
await asyncio.sleep(CRAWL_DELAY)
|
||||||
|
|
||||||
async def get_board_list(self):
|
async def get_board_list(self):
|
||||||
responses = await self.api_call({"_": "boards.json"})
|
responses = await self.api_call({"_": "boards.json"})
|
||||||
|
@ -62,8 +84,6 @@ class Chan4(object):
|
||||||
self.boards.append(board["board"])
|
self.boards.append(board["board"])
|
||||||
self.log.debug(f"Got boards: {self.boards}")
|
self.log.debug(f"Got boards: {self.boards}")
|
||||||
|
|
||||||
await self.get_thread_lists(self.boards)
|
|
||||||
|
|
||||||
async def get_thread_lists(self, boards):
|
async def get_thread_lists(self, boards):
|
||||||
self.log.debug(f"Getting thread list for {boards}")
|
self.log.debug(f"Getting thread list for {boards}")
|
||||||
board_urls = {board: f"{board}/catalog.json" for board in boards}
|
board_urls = {board: f"{board}/catalog.json" for board in boards}
|
||||||
|
@ -80,17 +100,27 @@ class Chan4(object):
|
||||||
self.log.info(f"Got thread list for {mapped}: {len(response)}")
|
self.log.info(f"Got thread list for {mapped}: {len(response)}")
|
||||||
print("THREAD LIST FULL LEN", len(to_get))
|
print("THREAD LIST FULL LEN", len(to_get))
|
||||||
if not to_get:
|
if not to_get:
|
||||||
await self.get_thread_lists(self.boards)
|
|
||||||
return
|
return
|
||||||
split_threads = array_split(to_get, ceil(len(to_get) / 10000))
|
split_threads = array_split(to_get, ceil(len(to_get) / THREADS_CONCURRENT))
|
||||||
print("SPLIT THREADS INTO", len(split_threads))
|
print("SPLIT THREADS INTO", len(split_threads))
|
||||||
for threads in split_threads:
|
for threads in split_threads:
|
||||||
print("SUBMITTED THREADS FOR", len(threads))
|
print("SUBMITTED THREADS FOR", len(threads))
|
||||||
await self.get_threads_content(threads)
|
await self.get_threads_content(threads)
|
||||||
|
await asyncio.sleep(THREADS_DELAY)
|
||||||
#await self.get_threads_content(to_get)
|
#await self.get_threads_content(to_get)
|
||||||
|
|
||||||
# Recurse
|
def take_items(self, dict_list, n):
|
||||||
await self.get_thread_lists(self.boards)
|
i = 0
|
||||||
|
try:
|
||||||
|
for x in list(dict_list.keys()):
|
||||||
|
for item in list(dict_list[x]):
|
||||||
|
yield (x, item)
|
||||||
|
dict_list[x].remove(item)
|
||||||
|
i += 1
|
||||||
|
if i == n:
|
||||||
|
raise StopIteration
|
||||||
|
except:
|
||||||
|
print("Take items took", i, "items")
|
||||||
|
|
||||||
async def get_threads_content(self, thread_list):
|
async def get_threads_content(self, thread_list):
|
||||||
thread_urls = {
|
thread_urls = {
|
||||||
|
@ -109,16 +139,31 @@ class Chan4(object):
|
||||||
all_posts[mapped] = response["posts"]
|
all_posts[mapped] = response["posts"]
|
||||||
|
|
||||||
# Split into 10,000 chunks
|
# Split into 10,000 chunks
|
||||||
# split_posts = array_split(all_posts, ceil(len(all_posts) / 10000))
|
if not all_posts:
|
||||||
# print("SPLIT CHUNK", len(split_posts))
|
print("NOTALL POSTS", all_posts)
|
||||||
|
return
|
||||||
|
threads_per_core = int(len(all_posts) / CPU_THREADS)
|
||||||
|
print("THREADS PER CORE", threads_per_core)
|
||||||
|
for i in range(CPU_THREADS):
|
||||||
|
print("SUBMITTING CORE", i)
|
||||||
|
new_dict = {}
|
||||||
|
pulled_posts = self.take_items(all_posts, threads_per_core)
|
||||||
|
for k, v in pulled_posts:
|
||||||
|
if k in new_dict:
|
||||||
|
new_dict[k].append(v)
|
||||||
|
else:
|
||||||
|
new_dict[k] = [v]
|
||||||
|
print("SUBMITTING", len(new_dict), "THREADS")
|
||||||
|
await self.handle_posts_thread(new_dict)
|
||||||
|
# print("VAL", ceil(len(all_posts) / threads_per_core))
|
||||||
|
# split_posts = array_split(all_posts, ceil(len(all_posts) / threads_per_core))
|
||||||
|
# print("THREADS PER CORE SPLIT", len(split_posts))
|
||||||
|
# # print("SPLIT CHUNK", len(split_posts))
|
||||||
# for posts in split_posts:
|
# for posts in split_posts:
|
||||||
# with futures.ThreadPoolExecutor(max_workers=6) as executor:
|
# print("SPAWNED THREAD TO PROCESS", len(posts), "POSTS")
|
||||||
# print("SUBMITTED THREAD FOR", len(posts))
|
# await self.handle_posts_thread(posts)
|
||||||
# executor.submit(self.handle_posts, board, thread, posts)
|
|
||||||
# await self.handle_posts(board, thread, response["posts"])
|
#await self.handle_posts_thread(all_posts)
|
||||||
# await asyncio.sleep(1)
|
|
||||||
await self.handle_posts_thread(all_posts)
|
|
||||||
# self.handle_posts(all_posts)
|
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def handle_posts_thread(self, posts):
|
def handle_posts_thread(self, posts):
|
||||||
|
@ -159,7 +204,8 @@ class Chan4(object):
|
||||||
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S")
|
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S")
|
||||||
else:
|
else:
|
||||||
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M")
|
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M")
|
||||||
new_ts = old_ts.isoformat()
|
# new_ts = old_ts.isoformat()
|
||||||
|
new_ts = int(old_ts.timestamp())
|
||||||
posts[key][index]["ts"] = new_ts
|
posts[key][index]["ts"] = new_ts
|
||||||
if "msg" in post:
|
if "msg" in post:
|
||||||
soup = BeautifulSoup(posts[key][index]["msg"], "html.parser")
|
soup = BeautifulSoup(posts[key][index]["msg"], "html.parser")
|
||||||
|
@ -171,7 +217,8 @@ class Chan4(object):
|
||||||
|
|
||||||
# print({name_map[name]: val for name, val in post.items()})
|
# print({name_map[name]: val for name, val in post.items()})
|
||||||
# print(f"Got posts: {len(posts)}")
|
# print(f"Got posts: {len(posts)}")
|
||||||
db.store_message_bulk(to_store)
|
if to_store:
|
||||||
|
db.store_message_bulk(to_store)
|
||||||
|
|
||||||
async def fetch(self, url, session, mapped):
|
async def fetch(self, url, session, mapped):
|
||||||
async with session.get(url) as response:
|
async with session.get(url) as response:
|
||||||
|
@ -192,7 +239,7 @@ class Chan4(object):
|
||||||
|
|
||||||
async def api_call(self, methods={}):
|
async def api_call(self, methods={}):
|
||||||
tasks = []
|
tasks = []
|
||||||
sem = asyncio.Semaphore(100)
|
sem = asyncio.Semaphore(THREADS_SEMAPHORE)
|
||||||
connector = aiohttp.TCPConnector(limit=None)
|
connector = aiohttp.TCPConnector(limit=None)
|
||||||
async with aiohttp.ClientSession(connector=connector) as session:
|
async with aiohttp.ClientSession(connector=connector) as session:
|
||||||
for mapped, method in methods.items():
|
for mapped, method in methods.items():
|
||||||
|
|
|
@ -35,7 +35,8 @@ class DiscordClient(discord.Client):
|
||||||
return
|
return
|
||||||
|
|
||||||
a = self.recurse_dict(message)
|
a = self.recurse_dict(message)
|
||||||
a["ts"] = a["time"].isoformat()
|
#a["ts"] = a["time"].isoformat()
|
||||||
|
a["ts"] = int(a["time"].timestamp())
|
||||||
del a["time"]
|
del a["time"]
|
||||||
a["type"] = "msg"
|
a["type"] = "msg"
|
||||||
a["src"] = "dis"
|
a["src"] = "dis"
|
||||||
|
|
Loading…
Reference in New Issue