Implement indexing into Apache Druid #1

Closed
m wants to merge 263 commits from druid into master
7 changed files with 50 additions and 130 deletions
Showing only changes of commit 20e22ae7ca - Show all commits

46
db.py
View File

@ -1,12 +1,13 @@
from math import ceil
import manticoresearch
import ujson
from manticoresearch.rest import ApiException
from numpy import array_split
from redis import StrictRedis
import util
from schemas.mc_s import schema
import ujson
from numpy import array_split
from math import ceil
configuration = manticoresearch.Configuration(host="http://monolith-db-1:9308")
api_client = manticoresearch.ApiClient(configuration)
@ -15,6 +16,7 @@ api_instance = manticoresearch.IndexApi(api_client)
log = util.get_logger("db")
r = StrictRedis(unix_socket_path="/var/run/redis/redis.sock", db=0)
def store_message(msg):
"""
Store a message into Manticore
@ -30,27 +32,21 @@ def store_message(msg):
if schema[key].startswith("string"):
msg[key] = str(value)
body = [
{
"insert": {
"index": "main",
"doc": msg
}
}
]
body = [{"insert": {"index": "main", "doc": msg}}]
body_post = ""
for item in body:
body_post += ujson.dumps(item)
body_post += "\n"
#print(body_post)
# print(body_post)
try:
# Bulk index operations
api_response = api_instance.bulk(body_post, async_req=True)
#print(api_response)
api_instance.bulk(body_post, async_req=True)
# print(api_response)
except ApiException as e:
print("Exception when calling IndexApi->bulk: %s\n" % e)
def store_message_bulk(data):
"""
Store a message into Manticore
@ -71,42 +67,38 @@ def store_message_bulk(data):
if schema[key].startswith("string"):
msg[key] = str(value)
body = {
"insert": {
"index": "main",
"doc": msg
}
}
body = {"insert": {"index": "main", "doc": msg}}
total.append(body)
body_post = ""
for item in total:
body_post += ujson.dumps(item)
body_post += "\n"
#print(body_post)
# print(body_post)
try:
# Bulk index operations
api_response = api_instance.bulk(body_post, async_req=True)
#print(api_response)
api_instance.bulk(body_post, async_req=True)
# print(api_response)
except ApiException as e:
print("Exception when calling IndexApi->bulk: %s\n" % e)
print("FINISHED PROCESSING SPLIT")
print("BULK FINISH")
def update_schema():
pass
def create_index(api_client):
util_instance = manticoresearch.UtilsApi(api_client)
schema_types = ", ".join([f"{k} {v}" for k,v in schema.items()])
schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()])
create_query = f"create table if not exists main({schema_types}) engine='columnar'"
print("Schema types", create_query)
util_instance.sql(create_query)
create_index(api_client)
update_schema()

View File

@ -8,6 +8,7 @@ from sources.dis import DiscordClient
# For development
if not getenv("DISCORD_TOKEN", None):
from dotenv import load_dotenv
load_dotenv()
log = util.get_logger("monolith")
@ -23,7 +24,7 @@ async def main(loop):
log.info("Starting Discord handler.")
client = DiscordClient()
loop.create_task(client.start(token))
#client.run(token)
# client.run(token)
log.info("Starting 4chan handler.")
chan = Chan4()
@ -32,7 +33,7 @@ async def main(loop):
loop = asyncio.get_event_loop()
loop.create_task(main(loop))
#reactor.run()
# reactor.run()
try:
loop.run_forever()
except KeyboardInterrupt:

View File

@ -19,4 +19,4 @@ ATTRMAP = {
"custom_spoiler": "file_custom_spoiler",
"m_img": "file_m_img",
"time": "unix_time",
}
}

View File

@ -16,4 +16,4 @@ ATTRMAP = {
"channel_category": "channel.category.name",
"channel_category_id": "channel.category.id",
"channel_category_nsfw": "channel.category.nsfw",
}
}

View File

@ -1,199 +1,132 @@
schema = {
"id": "bigint",
# 1
"archived": "int",
# 1662150538
"archived_on": "string indexed attribute",
# CF
"board_flag": "string indexed attribute",
# true, false
"bot": "bool",
# 0
"bumplimit": "int",
# mod
"capcode": "string indexed attribute",
# 393598265, #main, Rust Programmer's Club
"channel": "text",
# Miscellaneous
"channel_category": "text",
# 360581491907887100
"channel_category_id": "string indexed attribute",
# true, false
"channel_category_nsfw": "bool",
# 734229101216530600
"channel_id": "string indexed attribute",
# true, false
"channel_nsfw": "bool",
# 1
"closed": "int",
# GB
"country": "string indexed attribute",
# United Kingdom
"country_name": "text",
# 5
"file_custom_spoiler": "int",
# 1
"file_deleted": "int",
# .jpg
"file_ext": "string indexed attribute",
# 1024
"file_h": "int",
# 1
"file_m_img": "int",
# tlArbrZDj7kbheSKPyDU0w==
"file_md5": "string indexed attribute",
# 88967
"file_size": "int",
# 1
"file_spoiler": "int",
# 1662149436322819
"file_tim": "string indexed attribute",
# 250
"file_tn_h": "int",
# 241
"file_tn_w": "int",
# 1080
"file_w": "int",
# 6E646BED-297E-4B4F-9082-31EDADC49472
"filename": "text",
# Confederate
"flag_name": "string indexed attribute",
"guild": "text", # LEGACY -> channel
"guild_id": "string indexed attribute", # LEGACY -> channel_id
"guild": "text", # LEGACY -> channel
"guild_id": "string indexed attribute", # LEGACY -> channel_id
# 36180
"guild_member_count": "int", # ? -> channel_member_count
"guild_member_count": "int", # ? -> channel_member_count
# 9f7b2e6a0e9b
"host": "text",
# 2447746
"id_reply": "string indexed attribute", # resto
"id_reply": "string indexed attribute", # resto
# "522, trans rights shill", myname
"ident": "text",
# 0
"imagelimit": "int",
# 0
"images": "int",
# 0
"mode": "string indexed attribute",
# b0n3
"modearg": "string indexed attribute",
# The quick brown fox jumped over the lazy dog
"msg": "text",
# 393605030
"msg_id": "string indexed attribute",
# pol
"net": "text",
# 273534239310479360
"net_id": "string indexed attribute",
# André de Santa Cruz, santa
"nick": "text",
# 773802568324350000
"nick_id": "string indexed attribute",
# 1, 2, 3, 4, 5, 6, ...
"num": "int",
# 12
"replies": "int",
# redacted-hate-thread
"semantic_url": "string indexed attribute",
# -1 -> 1 as float
"sentiment": "float",
# 2022
"since4pass": "int",
# 4ch, irc, dis
"src": "string indexed attribute",
# true, false
"status": "bool",
# 1
"sticky": "int",
# 1000
"sticky_cap": "int",
# Redacted Hate Thread, Gorbachev is dead
"sub": "string indexed attribute",
# Loop
"tag": "string indexed attribute",
# 100
"tail_size": "int",
"time": "timestamp", # LEGACY -> ts
"tokens": "text", # ???
"time": "timestamp", # LEGACY -> ts
"tokens": "text", # ???
# 2022-09-02T16:10:36
"ts": "timestamp",
# msg, notice, update, who
"type": "string indexed attribute",
# 10
"unique_ips": "int",
# 1662149436
"unix_time": "string indexed attribute",
# Anonymous
"user": "text",
"user_id": "string indexed attribute", # LEGACY -> nick_id
"user_id": "string indexed attribute", # LEGACY -> nick_id
# 1, 2
"version_sentiment": "int",
# 1, 2
"version_tokens": "int",
}
}

View File

@ -1,23 +1,22 @@
# Python modules can't start with a number...
import ujson
import asyncio
import random
import string
from concurrent.futures import ProcessPoolExecutor
from datetime import datetime
import aiohttp
import ujson
from bs4 import BeautifulSoup
from siphashc import siphash
import db
import util
from schemas.ch4_s import ATTRMAP
import aiohttp
import asyncio
from numpy import array_split
from math import ceil
from concurrent.futures import ProcessPoolExecutor
p = ProcessPoolExecutor(10)
class Chan4(object):
"""
4chan indexer, crawler and ingester.
@ -28,12 +27,12 @@ class Chan4(object):
self.log = util.get_logger(name)
self.api_endpoint = "https://a.4cdn.org"
#self.boards = ["out", "g", "a", "3", "pol"] #
# self.boards = ["out", "g", "a", "3", "pol"] #
self.boards = []
self.thread_list = {}
#self.thread_deferreds = []
#self.content_deferreds = []
# self.thread_deferreds = []
# self.content_deferreds = []
self.log.info(f"Starting crawler bot to {self.api_endpoint}")
@ -82,7 +81,10 @@ class Chan4(object):
await self.get_thread_lists(self.boards)
async def get_threads_content(self, thread_list):
thread_urls = {(board, thread): f"{board}/thread/{thread}.json" for board, thread in thread_list}
thread_urls = {
(board, thread): f"{board}/thread/{thread}.json"
for board, thread in thread_list
}
self.log.debug(f"Getting information for threads: {thread_urls}")
responses = await self.api_call(thread_urls)
self.log.debug(f"Got information for threads: {thread_urls}")
@ -101,8 +103,8 @@ class Chan4(object):
# with futures.ThreadPoolExecutor(max_workers=6) as executor:
# print("SUBMITTED THREAD FOR", len(posts))
# executor.submit(self.handle_posts, board, thread, posts)
#await self.handle_posts(board, thread, response["posts"])
#await asyncio.sleep(1)
# await self.handle_posts(board, thread, response["posts"])
# await asyncio.sleep(1)
await self.handle_posts_thread(all_posts)
@asyncio.coroutine
@ -158,7 +160,7 @@ class Chan4(object):
to_store.append(posts[key][index])
# print({name_map[name]: val for name, val in post.items()})
#print(f"Got posts: {len(posts)}")
# print(f"Got posts: {len(posts)}")
print("HANDLE POSTS DONE")
db.store_message_bulk(to_store)
print("STORE DB DONE")
@ -167,26 +169,20 @@ class Chan4(object):
async with session.get(url) as response:
try:
return (mapped, await response.json())
except:
except: # noqa
print("FETCH ERROR")
return (mapped, None)
async def bound_fetch(self, sem, url, session, mapped):
# Getter function with semaphore.
async with sem:
try:
return await self.fetch(url, session, mapped)
except:
except: # noqa
print("BOUND ERROR")
return (mapped, None)
async def api_call(self, methods={}):
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; rv:68.0) Gecko/20100101 Firefox/68.0"
)
}
tasks = []
sem = asyncio.Semaphore(100)
connector = aiohttp.TCPConnector(limit=None)
@ -195,8 +191,7 @@ class Chan4(object):
url = f"{self.api_endpoint}/{method}"
self.log.debug(f"GET {url}")
task = asyncio.create_task(self.bound_fetch(sem, url, session, mapped))
#task = asyncio.ensure_future(self.bound_fetch(sem, url, session))
# task = asyncio.ensure_future(self.bound_fetch(sem, url, session))
tasks.append(task)
responses = await asyncio.gather(*tasks)
return responses

View File

@ -5,7 +5,6 @@ import discord
import db
import util
from schemas.dis_s import ATTRMAP