Implement indexing into Apache Druid #1
6
db.py
6
db.py
|
@ -22,7 +22,6 @@ def store_message(msg):
|
||||||
Store a message into Manticore
|
Store a message into Manticore
|
||||||
:param msg: dict
|
:param msg: dict
|
||||||
"""
|
"""
|
||||||
print("DISCORD MSGLEN", len(msg["msg"]))
|
|
||||||
# normalise fields
|
# normalise fields
|
||||||
for key, value in list(msg.items()):
|
for key, value in list(msg.items()):
|
||||||
if value is None:
|
if value is None:
|
||||||
|
@ -52,14 +51,12 @@ def store_message_bulk(data):
|
||||||
Store a message into Manticore
|
Store a message into Manticore
|
||||||
:param msg: dict
|
:param msg: dict
|
||||||
"""
|
"""
|
||||||
print("BULK", len(data))
|
|
||||||
if not data:
|
if not data:
|
||||||
return
|
return
|
||||||
# 10000: maximum inserts we can submit to
|
# 10000: maximum inserts we can submit to
|
||||||
# Manticore as of Sept 2022
|
# Manticore as of Sept 2022
|
||||||
split_posts = array_split(data, ceil(len(data) / 10000))
|
split_posts = array_split(data, ceil(len(data) / 10000))
|
||||||
for messages in split_posts:
|
for messages in split_posts:
|
||||||
print("PROCESSING SPLIT OF", len(messages), "MESSAGES")
|
|
||||||
total = []
|
total = []
|
||||||
for msg in messages:
|
for msg in messages:
|
||||||
# normalise fields
|
# normalise fields
|
||||||
|
@ -86,9 +83,6 @@ def store_message_bulk(data):
|
||||||
print(api_response)
|
print(api_response)
|
||||||
except ApiException as e:
|
except ApiException as e:
|
||||||
print("Exception when calling IndexApi->bulk: %s\n" % e)
|
print("Exception when calling IndexApi->bulk: %s\n" % e)
|
||||||
print("FINISHED PROCESSING SPLIT")
|
|
||||||
|
|
||||||
print("BULK FINISH")
|
|
||||||
|
|
||||||
|
|
||||||
def update_schema():
|
def update_schema():
|
||||||
|
|
|
@ -96,13 +96,10 @@ class Chan4(object):
|
||||||
to_get.append((mapped, no))
|
to_get.append((mapped, no))
|
||||||
|
|
||||||
self.log.info(f"Got thread list for {mapped}: {len(response)}")
|
self.log.info(f"Got thread list for {mapped}: {len(response)}")
|
||||||
print("THREAD LIST FULL LEN", len(to_get))
|
|
||||||
if not to_get:
|
if not to_get:
|
||||||
return
|
return
|
||||||
split_threads = array_split(to_get, ceil(len(to_get) / THREADS_CONCURRENT))
|
split_threads = array_split(to_get, ceil(len(to_get) / THREADS_CONCURRENT))
|
||||||
print("SPLIT THREADS INTO", len(split_threads))
|
|
||||||
for threads in split_threads:
|
for threads in split_threads:
|
||||||
print("SUBMITTED THREADS FOR", len(threads))
|
|
||||||
await self.get_threads_content(threads)
|
await self.get_threads_content(threads)
|
||||||
await asyncio.sleep(THREADS_DELAY)
|
await asyncio.sleep(THREADS_DELAY)
|
||||||
# await self.get_threads_content(to_get)
|
# await self.get_threads_content(to_get)
|
||||||
|
@ -138,12 +135,9 @@ class Chan4(object):
|
||||||
|
|
||||||
# Split into 10,000 chunks
|
# Split into 10,000 chunks
|
||||||
if not all_posts:
|
if not all_posts:
|
||||||
print("NOTALL POSTS", all_posts)
|
|
||||||
return
|
return
|
||||||
threads_per_core = int(len(all_posts) / CPU_THREADS)
|
threads_per_core = int(len(all_posts) / CPU_THREADS)
|
||||||
print("THREADS PER CORE", threads_per_core)
|
|
||||||
for i in range(CPU_THREADS):
|
for i in range(CPU_THREADS):
|
||||||
print("SUBMITTING CORE", i)
|
|
||||||
new_dict = {}
|
new_dict = {}
|
||||||
pulled_posts = self.take_items(all_posts, threads_per_core)
|
pulled_posts = self.take_items(all_posts, threads_per_core)
|
||||||
for k, v in pulled_posts:
|
for k, v in pulled_posts:
|
||||||
|
@ -151,7 +145,6 @@ class Chan4(object):
|
||||||
new_dict[k].append(v)
|
new_dict[k].append(v)
|
||||||
else:
|
else:
|
||||||
new_dict[k] = [v]
|
new_dict[k] = [v]
|
||||||
print("SUBMITTING", len(new_dict), "THREADS")
|
|
||||||
await self.handle_posts_thread(new_dict)
|
await self.handle_posts_thread(new_dict)
|
||||||
# print("VAL", ceil(len(all_posts) / threads_per_core))
|
# print("VAL", ceil(len(all_posts) / threads_per_core))
|
||||||
# split_posts = array_split(all_posts, ceil(len(all_posts) / threads_per_core))
|
# split_posts = array_split(all_posts, ceil(len(all_posts) / threads_per_core))
|
||||||
|
@ -166,7 +159,6 @@ class Chan4(object):
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def handle_posts_thread(self, posts):
|
def handle_posts_thread(self, posts):
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
print("HANDLE POSTS THREAD", len(posts.keys()))
|
|
||||||
yield from loop.run_in_executor(p, self.handle_posts, posts)
|
yield from loop.run_in_executor(p, self.handle_posts, posts)
|
||||||
|
|
||||||
def handle_posts(self, posts):
|
def handle_posts(self, posts):
|
||||||
|
@ -226,7 +218,6 @@ class Chan4(object):
|
||||||
try:
|
try:
|
||||||
return (mapped, await response.json())
|
return (mapped, await response.json())
|
||||||
except: # noqa
|
except: # noqa
|
||||||
print("FETCH ERROR")
|
|
||||||
return (mapped, None)
|
return (mapped, None)
|
||||||
|
|
||||||
async def bound_fetch(self, sem, url, session, mapped):
|
async def bound_fetch(self, sem, url, session, mapped):
|
||||||
|
@ -235,7 +226,6 @@ class Chan4(object):
|
||||||
try:
|
try:
|
||||||
return await self.fetch(url, session, mapped)
|
return await self.fetch(url, session, mapped)
|
||||||
except: # noqa
|
except: # noqa
|
||||||
print("BOUND ERROR")
|
|
||||||
return (mapped, None)
|
return (mapped, None)
|
||||||
|
|
||||||
async def api_call(self, methods={}):
|
async def api_call(self, methods={}):
|
||||||
|
|
Loading…
Reference in New Issue