diff --git a/db.py b/db.py index 04730d6..42ac771 100644 --- a/db.py +++ b/db.py @@ -41,8 +41,8 @@ def store_message(msg): # print(body_post) try: # Bulk index operations - api_instance.bulk(body_post, async_req=True) - # print(api_response) + api_response = api_instance.bulk(body_post) # , async_req=True + print(api_response) except ApiException as e: print("Exception when calling IndexApi->bulk: %s\n" % e) @@ -82,8 +82,8 @@ def store_message_bulk(data): # print(body_post) try: # Bulk index operations - api_instance.bulk(body_post, async_req=True) - # print(api_response) + api_response = api_instance.bulk(body_post) # , async_req=True + print(api_response) except ApiException as e: print("Exception when calling IndexApi->bulk: %s\n" % e) print("FINISHED PROCESSING SPLIT") diff --git a/sources/ch4.py b/sources/ch4.py index 79083c4..ef6e366 100644 --- a/sources/ch4.py +++ b/sources/ch4.py @@ -4,21 +4,18 @@ import random import string from concurrent.futures import ProcessPoolExecutor from datetime import datetime +from math import ceil import aiohttp import ujson from bs4 import BeautifulSoup +from numpy import array_split from siphashc import siphash import db import util from schemas.ch4_s import ATTRMAP -from numpy import array_split -from math import ceil - - - # CONFIGURATION # # Number of 4chan threads to request at once @@ -40,6 +37,7 @@ CPU_THREADS = 2 p = ProcessPoolExecutor(CPU_THREADS) + class Chan4(object): """ 4chan indexer, crawler and ingester. @@ -107,7 +105,7 @@ class Chan4(object): print("SUBMITTED THREADS FOR", len(threads)) await self.get_threads_content(threads) await asyncio.sleep(THREADS_DELAY) - #await self.get_threads_content(to_get) + # await self.get_threads_content(to_get) def take_items(self, dict_list, n): i = 0 @@ -119,7 +117,7 @@ class Chan4(object): i += 1 if i == n: raise StopIteration - except: + except StopIteration: print("Take items took", i, "items") async def get_threads_content(self, thread_list): @@ -163,7 +161,7 @@ class Chan4(object): # print("SPAWNED THREAD TO PROCESS", len(posts), "POSTS") # await self.handle_posts_thread(posts) - #await self.handle_posts_thread(all_posts) + # await self.handle_posts_thread(all_posts) @asyncio.coroutine def handle_posts_thread(self, posts): @@ -213,6 +211,9 @@ class Chan4(object): posts[key][index]["msg"] = msg posts[key][index]["src"] = "4ch" + posts[key][index]["net"] = board + posts[key][index]["channel"] = thread + to_store.append(posts[key][index]) # print({name_map[name]: val for name, val in post.items()}) diff --git a/sources/dis.py b/sources/dis.py index a3104b5..0b8b2f6 100644 --- a/sources/dis.py +++ b/sources/dis.py @@ -35,7 +35,7 @@ class DiscordClient(discord.Client): return a = self.recurse_dict(message) - #a["ts"] = a["time"].isoformat() + # a["ts"] = a["time"].isoformat() a["ts"] = int(a["time"].timestamp()) del a["time"] a["type"] = "msg"