Implement indexing into Apache Druid #1

Closed
m wants to merge 263 commits from druid into master
3 changed files with 14 additions and 13 deletions
Showing only changes of commit 9c9d49dcd2 - Show all commits

8
db.py
View File

@ -41,8 +41,8 @@ def store_message(msg):
# print(body_post) # print(body_post)
try: try:
# Bulk index operations # Bulk index operations
api_instance.bulk(body_post, async_req=True) api_response = api_instance.bulk(body_post) # , async_req=True
# print(api_response) print(api_response)
except ApiException as e: except ApiException as e:
print("Exception when calling IndexApi->bulk: %s\n" % e) print("Exception when calling IndexApi->bulk: %s\n" % e)
@ -82,8 +82,8 @@ def store_message_bulk(data):
# print(body_post) # print(body_post)
try: try:
# Bulk index operations # Bulk index operations
api_instance.bulk(body_post, async_req=True) api_response = api_instance.bulk(body_post) # , async_req=True
# print(api_response) print(api_response)
except ApiException as e: except ApiException as e:
print("Exception when calling IndexApi->bulk: %s\n" % e) print("Exception when calling IndexApi->bulk: %s\n" % e)
print("FINISHED PROCESSING SPLIT") print("FINISHED PROCESSING SPLIT")

View File

@ -4,21 +4,18 @@ import random
import string import string
from concurrent.futures import ProcessPoolExecutor from concurrent.futures import ProcessPoolExecutor
from datetime import datetime from datetime import datetime
from math import ceil
import aiohttp import aiohttp
import ujson import ujson
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from numpy import array_split
from siphashc import siphash from siphashc import siphash
import db import db
import util import util
from schemas.ch4_s import ATTRMAP from schemas.ch4_s import ATTRMAP
from numpy import array_split
from math import ceil
# CONFIGURATION # # CONFIGURATION #
# Number of 4chan threads to request at once # Number of 4chan threads to request at once
@ -40,6 +37,7 @@ CPU_THREADS = 2
p = ProcessPoolExecutor(CPU_THREADS) p = ProcessPoolExecutor(CPU_THREADS)
class Chan4(object): class Chan4(object):
""" """
4chan indexer, crawler and ingester. 4chan indexer, crawler and ingester.
@ -107,7 +105,7 @@ class Chan4(object):
print("SUBMITTED THREADS FOR", len(threads)) print("SUBMITTED THREADS FOR", len(threads))
await self.get_threads_content(threads) await self.get_threads_content(threads)
await asyncio.sleep(THREADS_DELAY) await asyncio.sleep(THREADS_DELAY)
#await self.get_threads_content(to_get) # await self.get_threads_content(to_get)
def take_items(self, dict_list, n): def take_items(self, dict_list, n):
i = 0 i = 0
@ -119,7 +117,7 @@ class Chan4(object):
i += 1 i += 1
if i == n: if i == n:
raise StopIteration raise StopIteration
except: except StopIteration:
print("Take items took", i, "items") print("Take items took", i, "items")
async def get_threads_content(self, thread_list): async def get_threads_content(self, thread_list):
@ -163,7 +161,7 @@ class Chan4(object):
# print("SPAWNED THREAD TO PROCESS", len(posts), "POSTS") # print("SPAWNED THREAD TO PROCESS", len(posts), "POSTS")
# await self.handle_posts_thread(posts) # await self.handle_posts_thread(posts)
#await self.handle_posts_thread(all_posts) # await self.handle_posts_thread(all_posts)
@asyncio.coroutine @asyncio.coroutine
def handle_posts_thread(self, posts): def handle_posts_thread(self, posts):
@ -213,6 +211,9 @@ class Chan4(object):
posts[key][index]["msg"] = msg posts[key][index]["msg"] = msg
posts[key][index]["src"] = "4ch" posts[key][index]["src"] = "4ch"
posts[key][index]["net"] = board
posts[key][index]["channel"] = thread
to_store.append(posts[key][index]) to_store.append(posts[key][index])
# print({name_map[name]: val for name, val in post.items()}) # print({name_map[name]: val for name, val in post.items()})

View File

@ -35,7 +35,7 @@ class DiscordClient(discord.Client):
return return
a = self.recurse_dict(message) a = self.recurse_dict(message)
#a["ts"] = a["time"].isoformat() # a["ts"] = a["time"].isoformat()
a["ts"] = int(a["time"].timestamp()) a["ts"] = int(a["time"].timestamp())
del a["time"] del a["time"]
a["type"] = "msg" a["type"] = "msg"