Compare commits

...

3 Commits

6 changed files with 40 additions and 32 deletions

3
.gitignore vendored
View File

@ -158,4 +158,5 @@ cython_debug/
docker/data docker/data
*.pem *.pem
legacy/conf/live/ legacy/conf/live/
legacy/conf/cert/ legacy/conf/cert/
stack.env

16
db.py
View File

@ -1,14 +1,19 @@
import random import random
from os import getenv
import aioredis import aioredis
import orjson import orjson
import redis
# Kafka # Kafka
from aiokafka import AIOKafkaProducer from aiokafka import AIOKafkaProducer
import redis
import util import util
trues = ("true", "1", "t", True)
MONOLITH_KAFKA_ENABLED = getenv("MONOLITH_KAFKA_ENABLED", "false").lower() in trues
# KAFKA_TOPIC = "msg" # KAFKA_TOPIC = "msg"
log = util.get_logger("db") log = util.get_logger("db")
@ -44,8 +49,9 @@ KEYNAME = "queue"
async def store_kafka_batch(data): async def store_kafka_batch(data):
print("FAKE STORE KAFKA BATCH") if not MONOLITH_KAFKA_ENABLED:
return log.info(f"Not storing Kafka batch of length {len(data)}, Kafka is disabled.")
return
# log.debug(f"Storing Kafka batch of {len(data)} messages") # log.debug(f"Storing Kafka batch of {len(data)} messages")
producer = AIOKafkaProducer(bootstrap_servers="kafka:9092") producer = AIOKafkaProducer(bootstrap_servers="kafka:9092")
await producer.start() await producer.start()
@ -116,7 +122,7 @@ async def queue_message(msg):
Queue a message on the Redis buffer. Queue a message on the Redis buffer.
""" """
message = orjson.dumps(msg) message = orjson.dumps(msg)
await ar.zadd(KEYNAME, message) await ar.lpush(KEYNAME, message)
async def queue_message_bulk(data): async def queue_message_bulk(data):
@ -125,4 +131,4 @@ async def queue_message_bulk(data):
""" """
for msg in data: for msg in data:
message = orjson.dumps(msg) message = orjson.dumps(msg)
await ar.zadd(KEYNAME, message) await ar.lpush(KEYNAME, message)

View File

@ -9,17 +9,6 @@ services:
- ${PORTAINER_GIT_DIR}:/code - ${PORTAINER_GIT_DIR}:/code
env_file: env_file:
- ../stack.env - ../stack.env
# volumes_from:
# - tmp
# depends_on:
# broker:
# condition: service_started
# kafka:
# condition: service_healthy
# tmp:
# condition: service_started
# redis:
# condition: service_healthy
threshold: threshold:
image: pathogen/threshold:latest image: pathogen/threshold:latest
@ -28,7 +17,7 @@ services:
volumes: volumes:
- ${PORTAINER_GIT_DIR}:/code - ${PORTAINER_GIT_DIR}:/code
- ${THRESHOLD_CONFIG_DIR}:/code/legacy/conf/live - ${THRESHOLD_CONFIG_DIR}:/code/legacy/conf/live
#- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates - ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates
- ${THRESHOLD_CERT_DIR}:/code/legacy/conf/cert - ${THRESHOLD_CERT_DIR}:/code/legacy/conf/cert
ports: ports:
- "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}" - "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}"

View File

@ -67,7 +67,7 @@ def parsemeta(numName, c):
def queue_message(c): def queue_message(c):
message = json.dumps(c) message = json.dumps(c)
main.g.sadd("queue", message) main.g.lpush("queue", message)
def event( def event(

View File

@ -15,6 +15,7 @@ from concurrent.futures import ProcessPoolExecutor
# For timestamp processing # For timestamp processing
from datetime import datetime from datetime import datetime
from math import ceil from math import ceil
from os import getenv
import orjson import orjson
import regex import regex
@ -51,6 +52,12 @@ import util
# 4chan schema # 4chan schema
from schemas.ch4_s import ATTRMAP from schemas.ch4_s import ATTRMAP
trues = ("true", "1", "t", True)
MONOLITH_PROCESS_PERFSTATS = (
getenv("MONOLITH_PROCESS_PERFSTATS", "false").lower() in trues
)
CUSTOM_FILTERS = [ CUSTOM_FILTERS = [
lambda x: x.lower(), lambda x: x.lower(),
strip_tags, # strip_tags, #
@ -267,17 +274,19 @@ def process_data(data):
# Add the mutated message to the return buffer # Add the mutated message to the return buffer
to_store.append(msg) to_store.append(msg)
total_time += (time.process_time() - total_start) * 1000 total_time += (time.process_time() - total_start) * 1000
log.debug("=====================================")
log.debug(f"Sentiment: {sentiment_time}") if MONOLITH_PROCESS_PERFSTATS:
log.debug(f"Regex: {regex_time}") log.debug("=====================================")
log.debug(f"Polyglot: {polyglot_time}") log.debug(f"Sentiment: {sentiment_time}")
log.debug(f"Date: {date_time}") log.debug(f"Regex: {regex_time}")
log.debug(f"NLP: {nlp_time}") log.debug(f"Polyglot: {polyglot_time}")
log.debug(f"Normalise: {normalise_time}") log.debug(f"Date: {date_time}")
log.debug(f"Hash: {hash_time}") log.debug(f"NLP: {nlp_time}")
log.debug(f"Normal2: {normal2_time}") log.debug(f"Normalise: {normalise_time}")
log.debug(f"Soup: {soup_time}") log.debug(f"Hash: {hash_time}")
log.debug(f"Total: {total_time}") log.debug(f"Normal2: {normal2_time}")
log.debug("=====================================") log.debug(f"Soup: {soup_time}")
log.debug(f"Total: {total_time}")
log.debug("=====================================")
return to_store return to_store

View File

@ -36,7 +36,10 @@ class Ingest(object):
items = [] items = []
# for source in SOURCES: # for source in SOURCES:
# key = f"{KEYPREFIX}{source}" # key = f"{KEYPREFIX}{source}"
chunk = await db.ar.zpop(KEYNAME, CHUNK_SIZE) length = await db.ar.llen(KEYNAME)
start_num = length - CHUNK_SIZE
chunk = await db.ar.lrange(KEYNAME, start_num, -1)
# chunk = await db.ar.rpop(KEYNAME, CHUNK_SIZE)
if not chunk: if not chunk:
return return
for item in chunk: for item in chunk: