Implement indexing into Apache Druid #1

Closed
m wants to merge 263 commits from druid into master
11 changed files with 392 additions and 118 deletions
Showing only changes of commit 663a26778d - Show all commits

58
db.py
View File

@ -1,16 +1,68 @@
import json
from pprint import pprint
import manticoresearch
from manticoresearch.rest import ApiException
from redis import StrictRedis from redis import StrictRedis
import util import util
from schemas.mc_s import schema
configuration = manticoresearch.Configuration(host="http://monolith-db-1:9308")
api_client = manticoresearch.ApiClient(configuration)
api_instance = manticoresearch.IndexApi(api_client)
log = util.get_logger("db") log = util.get_logger("db")
r = StrictRedis(unix_socket_path="/var/run/redis/redis.sock", db=0)
def store_message(msg): def store_message(msg):
""" """
Store a message into Manticore Store a message into Manticore
:param msg: dict :param msg: dict
""" """
log.debug(f"store_message() {msg}") log.info(f"store_message() {msg}")
# normalise fields
for key, value in list(msg.items()):
if value is None:
del msg[key]
if key in schema:
if isinstance(value, int):
if schema[key].startswith("string"):
msg[key] = str(value)
body = [
{
"insert": {
"index": "main",
"doc": msg
}
}
]
body_post = ""
for item in body:
body_post += json.dumps(item)
body_post += "\n"
#print(body_post)
try:
# Bulk index operations
api_response = api_instance.bulk(body_post)
pprint(api_response)
except ApiException as e:
print("Exception when calling IndexApi->bulk: %s\n" % e)
def update_schema():
pass
def create_index(api_client):
util_instance = manticoresearch.UtilsApi(api_client)
schema_types = ", ".join([f"{k} {v}" for k,v in schema.items()])
create_query = f"create table if not exists main({schema_types}) engine='columnar'"
print("Schema types", create_query)
util_instance.sql(create_query)
r = StrictRedis(unix_socket_path="/var/run/redis/redis.sock", db=0)
create_index(api_client)
update_schema()

View File

@ -10,6 +10,28 @@ services:
- .env - .env
volumes_from: volumes_from:
- tmp - tmp
depends_on:
- "db"
- "redis"
db:
image: manticoresearch/manticore
restart: always
expose:
- 9308
- 9312
ulimits:
nproc: 65535
nofile:
soft: 65535
hard: 65535
memlock:
soft: -1
hard: -1
environment:
- MCL=1
volumes:
- ./docker/data:/var/lib/manticore
tmp: tmp:
image: busybox image: busybox

View File

@ -1,7 +1,7 @@
wheel wheel
treq
beautifulsoup4 beautifulsoup4
redis redis
siphashc siphashc
aiohttp aiohttp[speedups]
python-dotenv python-dotenv
manticoresearch

View File

@ -1,36 +1,13 @@
import asyncio import asyncio
import signal
import sys
from os import getenv from os import getenv
from twisted.internet import asyncioreactor
import util import util
from sources.ch4 import Chan4 from sources.ch4 import Chan4
from sources.dis import DiscordClient from sources.dis import DiscordClient
loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
# asyncioreactor.install(asyncio.new_event_loop())
asyncioreactor.install(loop) # noqa
from twisted.internet import reactor, task # noqa
# Doesn't quite work but better than nothing
def stop(*args):
loop.stop()
reactor.stop()
sys.exit()
signal.signal(signal.SIGINT, stop)
# loop.add_signal_handler(signal.SIGINT, functools.partial(stop, loop))
# For development # For development
if not getenv("DISCORD_TOKEN", None): if not getenv("DISCORD_TOKEN", None):
from dotenv import load_dotenv from dotenv import load_dotenv
load_dotenv() load_dotenv()
log = util.get_logger("monolith") log = util.get_logger("monolith")
@ -42,23 +19,25 @@ if not token:
raise Exception("No Discord token provided") raise Exception("No Discord token provided")
async def start(): async def main(loop):
log.info("Starting Discord handler.") log.info("Starting Discord handler.")
client = DiscordClient(loop=loop) client = DiscordClient()
loop.create_task(client.start(token)) loop.create_task(client.start(token))
#client.run(token)
log.info("Starting 4chan handler.") # log.info("Starting 4chan handler.")
chan = Chan4() # chan = Chan4()
running = chan.run() # #running = chan.run()
deferred = task.ensureDeferred(running) # chan.run()
reactor.callLater(0.1, deferred.callback, "") #deferred.addCallback(lambda: None)
#reactor.callLater(0.1, deferred.callback, None)
loop.create_task(start())
# reactor.run() loop = asyncio.get_event_loop()
reactor.run() loop.create_task(main(loop))
#reactor.run()
try: try:
loop.run_forever() loop.run_forever()
except KeyboardInterrupt: except KeyboardInterrupt:

View File

@ -1,8 +1,9 @@
wheel wheel
pre-commit pre-commit
treq
beautifulsoup4 beautifulsoup4
redis redis
siphashc siphashc
aiohttp aiohttp[speedups]
python-dotenv python-dotenv
manticoresearch

22
schemas/ch4_s.py Normal file
View File

@ -0,0 +1,22 @@
ATTRMAP = {
"no": "msg_id",
"now": "ts",
"name": "user",
"trip": "nick",
"id": "nick_id",
"resto": "id_reply",
"com": "msg",
"ext": "file_ext",
"w": "file_w",
"h": "file_h",
"tn_w": "file_tn_w",
"tn_h": "file_tn_h",
"tim": "file_tim",
"fsize": "file_size",
"md5": "file_md5",
"filedeleted": "file_deleted",
"spoiler": "file_spoiler",
"custom_spoiler": "file_custom_spoiler",
"m_img": "file_m_img",
"time": "unix_time",
}

19
schemas/dis_s.py Normal file
View File

@ -0,0 +1,19 @@
ATTRMAP = {
"msg": "content",
"msg_id": "id",
"nick": "author.name",
"host": "author.discriminator",
"ident": "author.nick",
"time": "created_at",
"channel": "channel.name",
"channel_nsfw": "channel.nsfw",
"bot": "author.bot",
"nick_id": "author.id",
"channel_id": "channel.id",
"net": "author.guild.name",
"net_id": "author.guild.id",
"guild_member_count": "author.guild.member_count",
"channel_category": "channel.category.name",
"channel_category_id": "channel.category.id",
"channel_category_nsfw": "channel.category.nsfw",
}

199
schemas/mc_s.py Normal file
View File

@ -0,0 +1,199 @@
schema = {
"id": "bigint",
# 1
"archived": "int",
# 1662150538
"archived_on": "string indexed attribute",
# CF
"board_flag": "string indexed attribute",
# true, false
"bot": "bool",
# 0
"bumplimit": "int",
# mod
"capcode": "string indexed attribute",
# 393598265, #main, Rust Programmer's Club
"channel": "text",
# Miscellaneous
"channel_category": "text",
# 360581491907887100
"channel_category_id": "string indexed attribute",
# true, false
"channel_category_nsfw": "bool",
# 734229101216530600
"channel_id": "string indexed attribute",
# true, false
"channel_nsfw": "bool",
# 1
"closed": "int",
# GB
"country": "string indexed attribute",
# United Kingdom
"country_name": "text",
# 5
"file_custom_spoiler": "int",
# 1
"file_deleted": "int",
# .jpg
"file_ext": "string indexed attribute",
# 1024
"file_h": "int",
# 1
"file_m_img": "int",
# tlArbrZDj7kbheSKPyDU0w==
"file_md5": "string indexed attribute",
# 88967
"file_size": "int",
# 1
"file_spoiler": "int",
# 1662149436322819
"file_tim": "string indexed attribute",
# 250
"file_tn_h": "int",
# 241
"file_tn_w": "int",
# 1080
"file_w": "int",
# 6E646BED-297E-4B4F-9082-31EDADC49472
"filename": "text",
# Confederate
"flag_name": "string indexed attribute",
"guild": "text", # LEGACY -> channel
"guild_id": "string indexed attribute", # LEGACY -> channel_id
# 36180
"guild_member_count": "int", # ? -> channel_member_count
# 9f7b2e6a0e9b
"host": "text",
# 2447746
"id_reply": "string indexed attribute", # resto
# "522, trans rights shill", myname
"ident": "text",
# 0
"imagelimit": "int",
# 0
"images": "int",
# 0
"mode": "string indexed attribute",
# b0n3
"modearg": "string indexed attribute",
# The quick brown fox jumped over the lazy dog
"msg": "text",
# 393605030
"msg_id": "string indexed attribute",
# pol
"net": "text",
# 273534239310479360
"net_id": "string indexed attribute",
# André de Santa Cruz, santa
"nick": "text",
# 773802568324350000
"nick_id": "string indexed attribute",
# 1, 2, 3, 4, 5, 6, ...
"num": "int",
# 12
"replies": "int",
# redacted-hate-thread
"semantic_url": "string indexed attribute",
# -1 -> 1 as float
"sentiment": "float",
# 2022
"since4pass": "int",
# 4ch, irc, dis
"src": "string indexed attribute",
# true, false
"status": "bool",
# 1
"sticky": "int",
# 1000
"sticky_cap": "int",
# Redacted Hate Thread, Gorbachev is dead
"sub": "string indexed attribute",
# Loop
"tag": "string indexed attribute",
# 100
"tail_size": "int",
"time": "timestamp", # LEGACY -> ts
"tokens": "text", # ???
# 2022-09-02T16:10:36
"ts": "timestamp",
# msg, notice, update, who
"type": "string indexed attribute",
# 10
"unique_ips": "int",
# 1662149436
"unix_time": "string indexed attribute",
# Anonymous
"user": "text",
"user_id": "string indexed attribute", # LEGACY -> nick_id
# 1, 2
"version_sentiment": "int",
# 1, 2
"version_tokens": "int",
}

View File

@ -8,11 +8,10 @@ from typing import Any, Dict
import treq import treq
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from siphashc import siphash from siphashc import siphash
from twisted.internet.defer import inlineCallbacks
import db import db
import util import util
from schemas.ch4_s import ATTRMAP
class Chan4(object): class Chan4(object):
""" """
@ -24,10 +23,11 @@ class Chan4(object):
self.log = util.get_logger(name) self.log = util.get_logger(name)
self.api_endpoint = "https://a.4cdn.org" self.api_endpoint = "https://a.4cdn.org"
self.boards = [] self.boards = ["out"]
self.thread_list = {} self.thread_list = {}
self.thread_deferreds = [] #self.thread_deferreds = []
#self.content_deferreds = []
self.log.info(f"Starting crawler bot to {self.api_endpoint}") self.log.info(f"Starting crawler bot to {self.api_endpoint}")
@ -45,61 +45,74 @@ class Chan4(object):
@inlineCallbacks @inlineCallbacks
def run(self): def run(self):
yield self.get_board_list() yield self.get_board_list()
yield self.get_thread_lists()
yield self.get_thread_contents()
def got_thread_lists(self, thread_lists):
print("GOT THREAD LIST", thread_lists)
# Instead of while True, do it again!
d = self.get_thread_lists()
d.addCallback(self.got_thread_lists)
# @inlineCallbacks
# def mainloop(self):
# while True:
# yield self.get_thread_lists()
# yield self.get_thread_contents()
@inlineCallbacks
def get_board_list(self): def get_board_list(self):
self.log.info("Getting board list") self.log.debug("Getting board list")
response = self.api_call("boards.json") response = self.api_call("boards.json")
response.addCallback(self.got_board_list) response.addCallback(self.got_board_list)
return response yield response
@inlineCallbacks
def got_board_list(self, board_list): def got_board_list(self, board_list):
if board_list["success"]: if board_list["success"]:
for board in board_list["response"]["boards"]: for board in board_list["response"]["boards"]:
self.boards.append(board["board"]) self.boards.append(board["board"])
self.log.debug(f"Got boards: {self.boards}")
d = self.get_thread_lists()
d.addCallback(self.got_thread_lists)
yield d
@inlineCallbacks @inlineCallbacks
def get_thread_lists(self): def get_thread_lists(self):
thread_deferreds = []
for board in self.boards: for board in self.boards:
yield self.get_thread_list(board) d = self.get_thread_list(board)
# self.thread_deferreds.append(d) d.addCallback(self.got_thread_list, board)
# yield defer.gatherResults(self.thread_deferreds) thread_deferreds.append(d)
# self.thread_deferreds = []
# self.log.info("Finished getting thread lists")
@inlineCallbacks yield defer.gatherResults(thread_deferreds)
def get_thread_contents(self):
for board in self.thread_list.keys():
for page in self.thread_list[board]:
for threads in page["threads"]:
no = threads["no"]
yield self.get_thread_content(board, no)
# self.content_deferreds.append(d)
# al = yield defer.gatherResults(self.content_deferreds)
# self.content_deferreds = []
# self.log.info("Finished getting content")
def get_thread_list(self, board): def get_thread_list(self, board):
self.log.info(f"Getting thread list for {board}") self.log.debug(f"Getting thread list for {board}")
response = self.api_call(f"{board}/catalog.json") response = self.api_call(f"{board}/catalog.json")
response.addCallback(self.got_thread_list, board)
return response return response
def got_thread_list(self, thread_list, board): def got_thread_list(self, thread_list, board):
if not thread_list:
self.log.error(f"Thread list invalid: {thread_list} {board}")
return
if thread_list["success"]: if thread_list["success"]:
self.thread_list[board] = thread_list["response"] #self.thread_list[board] = thread_list["response"]
for page in thread_list["response"]:
for threads in page["threads"]:
no = threads["no"]
d = self.get_thread_content(board, no)
d.addCallback(self.got_thread_content, board, no)
self.log.info(f"Got thread list for {board}: {len(thread_list)}") self.log.info(f"Got thread list for {board}: {len(thread_list)}")
def get_thread_content(self, board, thread): def get_thread_content(self, board, thread):
self.log.info(f"Getting information for thread {thread} on board {board}") self.log.debug(f"Getting information for thread {thread} on board {board}")
response = self.api_call(f"{board}/thread/{thread}.json") response = self.api_call(f"{board}/thread/{thread}.json")
response.addCallback(self.got_thread_content, board, thread)
return response return response
def got_thread_content(self, thread_content, board, thread): def got_thread_content(self, thread_content, board, thread):
if not thread_content:
self.log.error(f"Thread content invalid: {thread_content} {board} {thread}")
return
if thread_content["success"]: if thread_content["success"]:
self.log.info(f"Got thread content for thread {thread} on board {board}") self.log.debug(f"Got thread content for thread {thread} on board {board}")
for post in thread_content["response"]["posts"]: for post in thread_content["response"]["posts"]:
# print(post) # print(post)
self.handle_post(board, thread, post) self.handle_post(board, thread, post)
@ -112,28 +125,6 @@ class Chan4(object):
) )
def handle_post(self, board, thread, post): def handle_post(self, board, thread, post):
name_map = {
"no": "msg_id",
"now": "ts",
"name": "user",
"trip": "nick",
"id": "nick_id",
"resto": "id_reply",
"com": "msg",
"ext": "file_ext",
"w": "file_w",
"h": "file_h",
"tn_w": "file_tn_w",
"tn_h": "file_tn_h",
"tim": "file_tim",
"fsize": "file_size",
"md5": "file_md5",
"filedeleted": "file_deleted",
"spoiler": "file_spoiler",
"custom_spoiler": "file_custom_spoiler",
"m_img": "file_m_img",
"time": "unix_time",
}
post["type"] = "msg" post["type"] = "msg"
# Calculate hash for post # Calculate hash for post
@ -152,8 +143,8 @@ class Chan4(object):
# Check if hash exists # Check if hash exists
# Store the hash # Store the hash
for key, value in list(post.items()): for key, value in list(post.items()):
if key in name_map: if key in ATTRMAP:
post[name_map[key]] = post[key] post[ATTRMAP[key]] = post[key]
del post[key] del post[key]
if "ts" in post: if "ts" in post:
old_time = post["ts"] old_time = post["ts"]
@ -175,19 +166,25 @@ class Chan4(object):
# print({name_map[name]: val for name, val in post.items()}) # print({name_map[name]: val for name, val in post.items()})
db.store_message(post) db.store_message(post)
def dump(self, *args, **kwargs):
self.log.error(f"Error: {args} {kwargs}")
@inlineCallbacks @inlineCallbacks
def callback_api_call(self, response, result): def callback_api_call(self, response, result):
result["status"] = response.code
try: try:
text = yield response.content() text = yield response.content()
except: # noqa except: # noqa
self.log.error("Error with API call") self.log.error("Error with API call")
return return False
#print("RESP TEXT", text)
try: try:
result["response"] = json.loads(text) result["response"] = json.loads(text)
except json.decoder.JSONDecodeError: except json.decoder.JSONDecodeError:
result["success"] = "ERROR" result["success"] = "ERROR"
result["message"] = "Error parsing JSON." result["message"] = "Error parsing JSON."
return result return result
#print("RESP AFTER JSON", result)
result["status"] = response.code result["status"] = response.code
if response.code == 200: if response.code == 200:
result["success"] = True result["success"] = True
@ -208,9 +205,10 @@ class Chan4(object):
response = treq.get(url, headers=headers) response = treq.get(url, headers=headers)
result: Dict[str, Any] = { result: Dict[str, Any] = {
"success": False, "success": False,
"message": "Invalid Method", "message": "Call not successful",
"response": None, "response": None,
"status": None, "status": None,
} }
response.addCallback(self.callback_api_call, result) response.addCallback(self.callback_api_call, result)
response.addErrback(self.dump, url=url)
return response return response

View File

@ -6,25 +6,7 @@ import discord
import db import db
import util import util
ATTRMAP = { from schemas.dis_s import ATTRMAP
"msg": "content",
"msg_id": "id",
"nick": "author.name",
"host": "author.discriminator",
"ident": "author.nick",
"time": "created_at",
"channel": "channel.name",
"channel_nsfw": "channel.nsfw",
"bot": "author.bot",
"user_id": "author.id",
"channel_id": "channel.id",
"net": "author.guild.name",
"net_id": "author.guild.id",
"guild_member_count": "author.guild.member_count",
"channel_category": "channel.category.name",
"channel_category_id": "channel.category.id",
"channel_category_nsfw": "channel.category.nsfw",
}
class DiscordClient(discord.Client): class DiscordClient(discord.Client):

View File

@ -3,7 +3,7 @@ import logging
log = logging.getLogger("util") log = logging.getLogger("util")
debug = True debug = False
# Color definitions # Color definitions
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8) BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)