Implement indexing into Apache Druid #1

Closed
m wants to merge 263 commits from druid into master
5 changed files with 155 additions and 170 deletions
Showing only changes of commit 22cef33342 - Show all commits

45
db.py
View File

@ -19,8 +19,7 @@ def store_message(msg):
Store a message into Manticore
:param msg: dict
"""
log.info(f"store_message() {msg}")
print("DISCORD MSGLEN", len(msg["msg"]))
# normalise fields
for key, value in list(msg.items()):
if value is None:
@ -46,8 +45,46 @@ def store_message(msg):
#print(body_post)
try:
# Bulk index operations
api_response = api_instance.bulk(body_post)
pprint(api_response)
api_response = api_instance.bulk(body_post, async_req=True)
#print(api_response)
except ApiException as e:
print("Exception when calling IndexApi->bulk: %s\n" % e)
async def store_message_bulk(messages):
"""
Store a message into Manticore
:param msg: dict
"""
print("BULK", len(messages))
total = []
for msg in messages:
# normalise fields
for key, value in list(msg.items()):
if value is None:
del msg[key]
if key in schema:
if isinstance(value, int):
if schema[key].startswith("string"):
msg[key] = str(value)
body = {
"insert": {
"index": "main",
"doc": msg
}
}
total.append(body)
body_post = ""
for item in total:
body_post += json.dumps(item)
body_post += "\n"
#print(body_post)
try:
# Bulk index operations
api_response = api_instance.bulk(body_post, async_req=True)
#print(api_response)
except ApiException as e:
print("Exception when calling IndexApi->bulk: %s\n" % e)

View File

@ -11,8 +11,8 @@ services:
volumes_from:
- tmp
depends_on:
- "db"
- "redis"
- db
db:
image: manticoresearch/manticore
@ -33,6 +33,7 @@ services:
volumes:
- ./docker/data:/var/lib/manticore
tmp:
image: busybox
command: chmod -R 777 /var/run/redis

View File

@ -25,14 +25,9 @@ async def main(loop):
loop.create_task(client.start(token))
#client.run(token)
# log.info("Starting 4chan handler.")
# chan = Chan4()
# #running = chan.run()
# chan.run()
#deferred.addCallback(lambda: None)
#reactor.callLater(0.1, deferred.callback, None)
log.info("Starting 4chan handler.")
chan = Chan4()
await chan.run()
loop = asyncio.get_event_loop()

View File

@ -12,6 +12,8 @@ from siphashc import siphash
import db
import util
from schemas.ch4_s import ATTRMAP
import aiohttp
import asyncio
class Chan4(object):
"""
@ -42,90 +44,54 @@ class Chan4(object):
self.hash_key = self.hash_key.decode("ascii")
self.log.debug(f"Decoded hash key: {self.hash_key}")
@inlineCallbacks
def run(self):
yield self.get_board_list()
async def run(self):
await self.get_board_list()
def got_thread_lists(self, thread_lists):
print("GOT THREAD LIST", thread_lists)
# Instead of while True, do it again!
d = self.get_thread_lists()
d.addCallback(self.got_thread_lists)
# @inlineCallbacks
# def mainloop(self):
# while True:
# yield self.get_thread_lists()
# yield self.get_thread_contents()
async def get_board_list(self):
# responses = await self.api_call({"_": "boards.json"})
# for mapped, response in responses:
# if not response:
# continue
# for board in response["boards"]:
# self.boards.append(board["board"])
# self.log.debug(f"Got boards: {self.boards}")
@inlineCallbacks
def get_board_list(self):
self.log.debug("Getting board list")
response = self.api_call("boards.json")
response.addCallback(self.got_board_list)
yield response
await self.get_thread_lists(self.boards)
@inlineCallbacks
def got_board_list(self, board_list):
if board_list["success"]:
for board in board_list["response"]["boards"]:
self.boards.append(board["board"])
self.log.debug(f"Got boards: {self.boards}")
d = self.get_thread_lists()
d.addCallback(self.got_thread_lists)
yield d
@inlineCallbacks
def get_thread_lists(self):
thread_deferreds = []
for board in self.boards:
d = self.get_thread_list(board)
d.addCallback(self.got_thread_list, board)
thread_deferreds.append(d)
yield defer.gatherResults(thread_deferreds)
def get_thread_list(self, board):
self.log.debug(f"Getting thread list for {board}")
response = self.api_call(f"{board}/catalog.json")
return response
def got_thread_list(self, thread_list, board):
if not thread_list:
self.log.error(f"Thread list invalid: {thread_list} {board}")
return
if thread_list["success"]:
#self.thread_list[board] = thread_list["response"]
for page in thread_list["response"]:
async def get_thread_lists(self, boards):
self.log.debug(f"Getting thread list for {boards}")
board_urls = {board: f"{board}/catalog.json" for board in boards}
responses = await self.api_call(board_urls)
to_get = []
for mapped, response in responses:
if not response:
continue
for page in response:
for threads in page["threads"]:
no = threads["no"]
d = self.get_thread_content(board, no)
d.addCallback(self.got_thread_content, board, no)
self.log.info(f"Got thread list for {board}: {len(thread_list)}")
to_get.append((mapped, no))
def get_thread_content(self, board, thread):
self.log.debug(f"Getting information for thread {thread} on board {board}")
response = self.api_call(f"{board}/thread/{thread}.json")
return response
self.log.info(f"Got thread list for {mapped}: {len(response)}")
await self.get_threads_content(to_get)
def got_thread_content(self, thread_content, board, thread):
if not thread_content:
self.log.error(f"Thread content invalid: {thread_content} {board} {thread}")
return
if thread_content["success"]:
# Recurse
await self.get_thread_lists(self.boards)
async def get_threads_content(self, thread_list):
thread_urls = {(board, thread): f"{board}/thread/{thread}.json" for board, thread in thread_list}
self.log.debug(f"Getting information for threads: {thread_urls}")
responses = await self.api_call(thread_urls)
self.log.debug(f"Got information for threads: {thread_urls}")
for mapped, response in responses:
if not response:
continue
board, thread = mapped
self.log.debug(f"Got thread content for thread {thread} on board {board}")
for post in thread_content["response"]["posts"]:
# print(post)
self.handle_post(board, thread, post)
else:
self.log.error(
(
f"Error fetching thread {thread} on board {board}: "
f"{thread_content['message']}"
)
)
await self.handle_posts(board, thread, response["posts"])
def handle_post(self, board, thread, post):
post["type"] = "msg"
async def handle_posts(self, board, thread, posts):
for index, post in enumerate(posts):
posts[index]["type"] = "msg"
# Calculate hash for post
post_normalised = json.dumps(post, sort_keys=True)
@ -138,16 +104,15 @@ class Chan4(object):
if key_content == hash:
return
else:
post["type"] = "update"
posts[index]["type"] = "update"
db.r.set(redis_key, hash)
# Check if hash exists
# Store the hash
for key, value in list(post.items()):
if key in ATTRMAP:
post[ATTRMAP[key]] = post[key]
del post[key]
post[ATTRMAP[key]] = posts[index][key]
del posts[index][key]
if "ts" in post:
old_time = post["ts"]
old_time = posts[index]["ts"]
# '08/30/22(Tue)02:25:37'
time_spl = old_time.split(":")
if len(time_spl) == 3:
@ -155,60 +120,47 @@ class Chan4(object):
else:
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M")
new_ts = old_ts.isoformat()
post["ts"] = new_ts
posts[index]["ts"] = new_ts
if "msg" in post:
soup = BeautifulSoup(post["msg"], "html.parser")
soup = BeautifulSoup(posts[index]["msg"], "html.parser")
msg = soup.get_text(separator="\n")
post["msg"] = msg
posts[index]["msg"] = msg
post["src"] = "4ch"
posts[index]["src"] = "4ch"
# print({name_map[name]: val for name, val in post.items()})
db.store_message(post)
#print(f"Got posts: {len(posts)}")
await db.store_message_bulk(posts)
def dump(self, *args, **kwargs):
self.log.error(f"Error: {args} {kwargs}")
async def fetch(self, url, session, mapped):
async with session.get(url) as response:
return (mapped, await response.json())
@inlineCallbacks
def callback_api_call(self, response, result):
result["status"] = response.code
async def bound_fetch(self, sem, url, session, mapped):
# Getter function with semaphore.
async with sem:
try:
text = yield response.content()
except: # noqa
self.log.error("Error with API call")
return False
#print("RESP TEXT", text)
try:
result["response"] = json.loads(text)
except json.decoder.JSONDecodeError:
result["success"] = "ERROR"
result["message"] = "Error parsing JSON."
return result
#print("RESP AFTER JSON", result)
result["status"] = response.code
if response.code == 200:
result["success"] = True
result["message"] = "OK"
else:
result["message"] = "API ERROR"
return await self.fetch(url, session, mapped)
except:
return (mapped, None)
return result
def api_call(self, method: str):
async def api_call(self, methods={}):
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; rv:68.0) Gecko/20100101 Firefox/68.0"
)
}
tasks = []
sem = asyncio.Semaphore(100)
connector = aiohttp.TCPConnector(limit=None)
async with aiohttp.ClientSession(connector=connector) as session:
for mapped, method in methods.items():
url = f"{self.api_endpoint}/{method}"
self.log.debug(f"GET {url}")
response = treq.get(url, headers=headers)
result: Dict[str, Any] = {
"success": False,
"message": "Call not successful",
"response": None,
"status": None,
}
response.addCallback(self.callback_api_call, result)
response.addErrback(self.dump, url=url)
return response
task = asyncio.create_task(self.bound_fetch(sem, url, session, mapped))
#task = asyncio.ensure_future(self.bound_fetch(sem, url, session))
tasks.append(task)
responses = await asyncio.gather(*tasks)
return responses

View File

@ -3,7 +3,7 @@ import logging
log = logging.getLogger("util")
debug = False
debug = True
# Color definitions
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)