Compare commits

...

3 Commits

6 changed files with 40 additions and 32 deletions

3
.gitignore vendored
View File

@ -158,4 +158,5 @@ cython_debug/
docker/data
*.pem
legacy/conf/live/
legacy/conf/cert/
legacy/conf/cert/
stack.env

16
db.py
View File

@ -1,14 +1,19 @@
import random
from os import getenv
import aioredis
import orjson
import redis
# Kafka
from aiokafka import AIOKafkaProducer
import redis
import util
trues = ("true", "1", "t", True)
MONOLITH_KAFKA_ENABLED = getenv("MONOLITH_KAFKA_ENABLED", "false").lower() in trues
# KAFKA_TOPIC = "msg"
log = util.get_logger("db")
@ -44,8 +49,9 @@ KEYNAME = "queue"
async def store_kafka_batch(data):
print("FAKE STORE KAFKA BATCH")
return
if not MONOLITH_KAFKA_ENABLED:
log.info(f"Not storing Kafka batch of length {len(data)}, Kafka is disabled.")
return
# log.debug(f"Storing Kafka batch of {len(data)} messages")
producer = AIOKafkaProducer(bootstrap_servers="kafka:9092")
await producer.start()
@ -116,7 +122,7 @@ async def queue_message(msg):
Queue a message on the Redis buffer.
"""
message = orjson.dumps(msg)
await ar.zadd(KEYNAME, message)
await ar.lpush(KEYNAME, message)
async def queue_message_bulk(data):
@ -125,4 +131,4 @@ async def queue_message_bulk(data):
"""
for msg in data:
message = orjson.dumps(msg)
await ar.zadd(KEYNAME, message)
await ar.lpush(KEYNAME, message)

View File

@ -9,17 +9,6 @@ services:
- ${PORTAINER_GIT_DIR}:/code
env_file:
- ../stack.env
# volumes_from:
# - tmp
# depends_on:
# broker:
# condition: service_started
# kafka:
# condition: service_healthy
# tmp:
# condition: service_started
# redis:
# condition: service_healthy
threshold:
image: pathogen/threshold:latest
@ -28,7 +17,7 @@ services:
volumes:
- ${PORTAINER_GIT_DIR}:/code
- ${THRESHOLD_CONFIG_DIR}:/code/legacy/conf/live
#- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates
- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates
- ${THRESHOLD_CERT_DIR}:/code/legacy/conf/cert
ports:
- "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}"

View File

@ -67,7 +67,7 @@ def parsemeta(numName, c):
def queue_message(c):
message = json.dumps(c)
main.g.sadd("queue", message)
main.g.lpush("queue", message)
def event(

View File

@ -15,6 +15,7 @@ from concurrent.futures import ProcessPoolExecutor
# For timestamp processing
from datetime import datetime
from math import ceil
from os import getenv
import orjson
import regex
@ -51,6 +52,12 @@ import util
# 4chan schema
from schemas.ch4_s import ATTRMAP
trues = ("true", "1", "t", True)
MONOLITH_PROCESS_PERFSTATS = (
getenv("MONOLITH_PROCESS_PERFSTATS", "false").lower() in trues
)
CUSTOM_FILTERS = [
lambda x: x.lower(),
strip_tags, #
@ -267,17 +274,19 @@ def process_data(data):
# Add the mutated message to the return buffer
to_store.append(msg)
total_time += (time.process_time() - total_start) * 1000
log.debug("=====================================")
log.debug(f"Sentiment: {sentiment_time}")
log.debug(f"Regex: {regex_time}")
log.debug(f"Polyglot: {polyglot_time}")
log.debug(f"Date: {date_time}")
log.debug(f"NLP: {nlp_time}")
log.debug(f"Normalise: {normalise_time}")
log.debug(f"Hash: {hash_time}")
log.debug(f"Normal2: {normal2_time}")
log.debug(f"Soup: {soup_time}")
log.debug(f"Total: {total_time}")
log.debug("=====================================")
if MONOLITH_PROCESS_PERFSTATS:
log.debug("=====================================")
log.debug(f"Sentiment: {sentiment_time}")
log.debug(f"Regex: {regex_time}")
log.debug(f"Polyglot: {polyglot_time}")
log.debug(f"Date: {date_time}")
log.debug(f"NLP: {nlp_time}")
log.debug(f"Normalise: {normalise_time}")
log.debug(f"Hash: {hash_time}")
log.debug(f"Normal2: {normal2_time}")
log.debug(f"Soup: {soup_time}")
log.debug(f"Total: {total_time}")
log.debug("=====================================")
return to_store

View File

@ -36,7 +36,10 @@ class Ingest(object):
items = []
# for source in SOURCES:
# key = f"{KEYPREFIX}{source}"
chunk = await db.ar.zpop(KEYNAME, CHUNK_SIZE)
length = await db.ar.llen(KEYNAME)
start_num = length - CHUNK_SIZE
chunk = await db.ar.lrange(KEYNAME, start_num, -1)
# chunk = await db.ar.rpop(KEYNAME, CHUNK_SIZE)
if not chunk:
return
for item in chunk: