Implement indexing into Apache Druid #1

Closed
m wants to merge 263 commits from druid into master
7 changed files with 152 additions and 85 deletions
Showing only changes of commit db46fea550 - Show all commits

73
db.py
View File

@ -1,12 +1,13 @@
import json
from pprint import pprint
import manticoresearch import manticoresearch
from manticoresearch.rest import ApiException from manticoresearch.rest import ApiException
from redis import StrictRedis from redis import StrictRedis
import util import util
from schemas.mc_s import schema from schemas.mc_s import schema
import ujson
from numpy import array_split
from math import ceil
configuration = manticoresearch.Configuration(host="http://monolith-db-1:9308") configuration = manticoresearch.Configuration(host="http://monolith-db-1:9308")
api_client = manticoresearch.ApiClient(configuration) api_client = manticoresearch.ApiClient(configuration)
api_instance = manticoresearch.IndexApi(api_client) api_instance = manticoresearch.IndexApi(api_client)
@ -39,7 +40,7 @@ def store_message(msg):
] ]
body_post = "" body_post = ""
for item in body: for item in body:
body_post += json.dumps(item) body_post += ujson.dumps(item)
body_post += "\n" body_post += "\n"
#print(body_post) #print(body_post)
@ -50,43 +51,49 @@ def store_message(msg):
except ApiException as e: except ApiException as e:
print("Exception when calling IndexApi->bulk: %s\n" % e) print("Exception when calling IndexApi->bulk: %s\n" % e)
async def store_message_bulk(messages): def store_message_bulk(data):
""" """
Store a message into Manticore Store a message into Manticore
:param msg: dict :param msg: dict
""" """
print("BULK", len(messages)) print("BULK", len(data))
total = [] split_posts = array_split(data, ceil(len(data) / 10000))
for msg in messages: for messages in split_posts:
# normalise fields print("PROCESSING SPLIT OF", len(messages), "MESSAGES")
for key, value in list(msg.items()): total = []
if value is None: for msg in messages:
del msg[key] # normalise fields
if key in schema: for key, value in list(msg.items()):
if isinstance(value, int): if value is None:
if schema[key].startswith("string"): del msg[key]
msg[key] = str(value) if key in schema:
if isinstance(value, int):
if schema[key].startswith("string"):
msg[key] = str(value)
body = { body = {
"insert": { "insert": {
"index": "main", "index": "main",
"doc": msg "doc": msg
}
} }
} total.append(body)
total.append(body)
body_post = "" body_post = ""
for item in total: for item in total:
body_post += json.dumps(item) body_post += ujson.dumps(item)
body_post += "\n" body_post += "\n"
#print(body_post) #print(body_post)
try: try:
# Bulk index operations # Bulk index operations
api_response = api_instance.bulk(body_post, async_req=True) api_response = api_instance.bulk(body_post, async_req=True)
#print(api_response) #print(api_response)
except ApiException as e: except ApiException as e:
print("Exception when calling IndexApi->bulk: %s\n" % e) print("Exception when calling IndexApi->bulk: %s\n" % e)
print("FINISHED PROCESSING SPLIT")
print("BULK FINISH")
def update_schema(): def update_schema():
pass pass

View File

@ -17,7 +17,7 @@ services:
db: db:
image: manticoresearch/manticore image: manticoresearch/manticore
restart: always restart: always
expose: ports:
- 9308 - 9308
- 9312 - 9312
ulimits: ulimits:

View File

@ -10,6 +10,27 @@ services:
- .env - .env
volumes_from: volumes_from:
- tmp - tmp
depends_on:
- db
db:
image: manticoresearch/manticore
restart: always
ports:
- 9308
- 9312
ulimits:
nproc: 65535
nofile:
soft: 65535
hard: 65535
memlock:
soft: -1
hard: -1
environment:
- MCL=1
volumes:
- ./docker/data:/var/lib/manticore
tmp: tmp:
image: busybox image: busybox

View File

@ -5,3 +5,5 @@ siphashc
aiohttp[speedups] aiohttp[speedups]
python-dotenv python-dotenv
manticoresearch manticoresearch
numpy
ujson

View File

@ -6,4 +6,5 @@ siphashc
aiohttp[speedups] aiohttp[speedups]
python-dotenv python-dotenv
manticoresearch manticoresearch
numpy
ujson

View File

@ -1,11 +1,9 @@
# Python modules can't start with a number... # Python modules can't start with a number...
import json import ujson
import random import random
import string import string
from datetime import datetime from datetime import datetime
from typing import Any, Dict
import treq
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from siphashc import siphash from siphashc import siphash
@ -14,6 +12,11 @@ import util
from schemas.ch4_s import ATTRMAP from schemas.ch4_s import ATTRMAP
import aiohttp import aiohttp
import asyncio import asyncio
from numpy import array_split
from math import ceil
from concurrent.futures import ProcessPoolExecutor
p = ProcessPoolExecutor(10)
class Chan4(object): class Chan4(object):
""" """
@ -25,7 +28,8 @@ class Chan4(object):
self.log = util.get_logger(name) self.log = util.get_logger(name)
self.api_endpoint = "https://a.4cdn.org" self.api_endpoint = "https://a.4cdn.org"
self.boards = ["out"] #self.boards = ["out", "g", "a", "3", "pol"] #
self.boards = []
self.thread_list = {} self.thread_list = {}
#self.thread_deferreds = [] #self.thread_deferreds = []
@ -48,13 +52,13 @@ class Chan4(object):
await self.get_board_list() await self.get_board_list()
async def get_board_list(self): async def get_board_list(self):
# responses = await self.api_call({"_": "boards.json"}) responses = await self.api_call({"_": "boards.json"})
# for mapped, response in responses: for mapped, response in responses:
# if not response: if not response:
# continue continue
# for board in response["boards"]: for board in response["boards"]:
# self.boards.append(board["board"]) self.boards.append(board["board"])
# self.log.debug(f"Got boards: {self.boards}") self.log.debug(f"Got boards: {self.boards}")
await self.get_thread_lists(self.boards) await self.get_thread_lists(self.boards)
@ -82,59 +86,90 @@ class Chan4(object):
self.log.debug(f"Getting information for threads: {thread_urls}") self.log.debug(f"Getting information for threads: {thread_urls}")
responses = await self.api_call(thread_urls) responses = await self.api_call(thread_urls)
self.log.debug(f"Got information for threads: {thread_urls}") self.log.debug(f"Got information for threads: {thread_urls}")
all_posts = {}
for mapped, response in responses: for mapped, response in responses:
if not response: if not response:
continue continue
board, thread = mapped board, thread = mapped
self.log.debug(f"Got thread content for thread {thread} on board {board}") self.log.debug(f"Got thread content for thread {thread} on board {board}")
await self.handle_posts(board, thread, response["posts"]) all_posts[mapped] = response["posts"]
async def handle_posts(self, board, thread, posts): # Split into 10,000 chunks
for index, post in enumerate(posts): # split_posts = array_split(all_posts, ceil(len(all_posts) / 10000))
posts[index]["type"] = "msg" # print("SPLIT CHUNK", len(split_posts))
# for posts in split_posts:
# with futures.ThreadPoolExecutor(max_workers=6) as executor:
# print("SUBMITTED THREAD FOR", len(posts))
# executor.submit(self.handle_posts, board, thread, posts)
#await self.handle_posts(board, thread, response["posts"])
#await asyncio.sleep(1)
await self.handle_posts_thread(all_posts)
# Calculate hash for post @asyncio.coroutine
post_normalised = json.dumps(post, sort_keys=True) def handle_posts_thread(self, posts):
hash = siphash(self.hash_key, post_normalised) loop = asyncio.get_event_loop()
hash = str(hash) print("HANDLE POSTS THREAD", len(posts.keys()))
redis_key = f"cache.{board}.{thread}.{post['no']}" yield from loop.run_in_executor(p, self.handle_posts, posts)
key_content = db.r.get(redis_key)
if key_content:
key_content = key_content.decode("ascii")
if key_content == hash:
return
else:
posts[index]["type"] = "update"
db.r.set(redis_key, hash)
for key, value in list(post.items()): def handle_posts(self, posts):
if key in ATTRMAP: print("HANDLE POSTS START")
post[ATTRMAP[key]] = posts[index][key] to_store = []
del posts[index][key] for key, post_list in posts.items():
if "ts" in post: board, thread = key
old_time = posts[index]["ts"] print("PROCESSING BOARD", board, "THREAD", thread)
# '08/30/22(Tue)02:25:37' print("POSTS HERE", len(post_list))
time_spl = old_time.split(":") for index, post in enumerate(post_list):
if len(time_spl) == 3: posts[key][index]["type"] = "msg"
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S")
else:
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M")
new_ts = old_ts.isoformat()
posts[index]["ts"] = new_ts
if "msg" in post:
soup = BeautifulSoup(posts[index]["msg"], "html.parser")
msg = soup.get_text(separator="\n")
posts[index]["msg"] = msg
posts[index]["src"] = "4ch" # Calculate hash for post
post_normalised = ujson.dumps(post, sort_keys=True)
hash = siphash(self.hash_key, post_normalised)
hash = str(hash)
redis_key = f"cache.{board}.{thread}.{post['no']}"
key_content = db.r.get(redis_key)
if key_content:
key_content = key_content.decode("ascii")
if key_content == hash:
continue
else:
posts[key][index]["type"] = "update"
#db.r.set(redis_key, hash)
for key2, value in list(post.items()):
if key2 in ATTRMAP:
post[ATTRMAP[key2]] = posts[key][index][key2]
del posts[key][index][key2]
if "ts" in post:
old_time = posts[key][index]["ts"]
# '08/30/22(Tue)02:25:37'
time_spl = old_time.split(":")
if len(time_spl) == 3:
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M:%S")
else:
old_ts = datetime.strptime(old_time, "%m/%d/%y(%a)%H:%M")
new_ts = old_ts.isoformat()
posts[key][index]["ts"] = new_ts
if "msg" in post:
soup = BeautifulSoup(posts[key][index]["msg"], "html.parser")
msg = soup.get_text(separator="\n")
posts[key][index]["msg"] = msg
posts[key][index]["src"] = "4ch"
to_store.append(posts[key][index])
# print({name_map[name]: val for name, val in post.items()}) # print({name_map[name]: val for name, val in post.items()})
#print(f"Got posts: {len(posts)}") #print(f"Got posts: {len(posts)}")
await db.store_message_bulk(posts) print("HANDLE POSTS DONE")
db.store_message_bulk(to_store)
print("STORE DB DONE")
async def fetch(self, url, session, mapped): async def fetch(self, url, session, mapped):
async with session.get(url) as response: async with session.get(url) as response:
return (mapped, await response.json()) try:
return (mapped, await response.json())
except:
print("FETCH ERROR")
return (mapped, None)
async def bound_fetch(self, sem, url, session, mapped): async def bound_fetch(self, sem, url, session, mapped):
@ -143,6 +178,7 @@ class Chan4(object):
try: try:
return await self.fetch(url, session, mapped) return await self.fetch(url, session, mapped)
except: except:
print("BOUND ERROR")
return (mapped, None) return (mapped, None)
async def api_call(self, methods={}): async def api_call(self, methods={}):

View File

@ -3,7 +3,7 @@ import logging
log = logging.getLogger("util") log = logging.getLogger("util")
debug = True debug = False
# Color definitions # Color definitions
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8) BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)