Compare commits
3 Commits
ab5e85c5c6
...
f774f4c2d2
Author | SHA1 | Date | |
---|---|---|---|
f774f4c2d2 | |||
e32b330ef4 | |||
8c596ec516 |
1
.gitignore
vendored
1
.gitignore
vendored
@ -159,3 +159,4 @@ docker/data
|
||||
*.pem
|
||||
legacy/conf/live/
|
||||
legacy/conf/cert/
|
||||
stack.env
|
||||
|
14
db.py
14
db.py
@ -1,14 +1,19 @@
|
||||
import random
|
||||
from os import getenv
|
||||
|
||||
import aioredis
|
||||
import orjson
|
||||
import redis
|
||||
|
||||
# Kafka
|
||||
from aiokafka import AIOKafkaProducer
|
||||
import redis
|
||||
|
||||
import util
|
||||
|
||||
trues = ("true", "1", "t", True)
|
||||
|
||||
MONOLITH_KAFKA_ENABLED = getenv("MONOLITH_KAFKA_ENABLED", "false").lower() in trues
|
||||
|
||||
# KAFKA_TOPIC = "msg"
|
||||
|
||||
log = util.get_logger("db")
|
||||
@ -44,7 +49,8 @@ KEYNAME = "queue"
|
||||
|
||||
|
||||
async def store_kafka_batch(data):
|
||||
print("FAKE STORE KAFKA BATCH")
|
||||
if not MONOLITH_KAFKA_ENABLED:
|
||||
log.info(f"Not storing Kafka batch of length {len(data)}, Kafka is disabled.")
|
||||
return
|
||||
# log.debug(f"Storing Kafka batch of {len(data)} messages")
|
||||
producer = AIOKafkaProducer(bootstrap_servers="kafka:9092")
|
||||
@ -116,7 +122,7 @@ async def queue_message(msg):
|
||||
Queue a message on the Redis buffer.
|
||||
"""
|
||||
message = orjson.dumps(msg)
|
||||
await ar.zadd(KEYNAME, message)
|
||||
await ar.lpush(KEYNAME, message)
|
||||
|
||||
|
||||
async def queue_message_bulk(data):
|
||||
@ -125,4 +131,4 @@ async def queue_message_bulk(data):
|
||||
"""
|
||||
for msg in data:
|
||||
message = orjson.dumps(msg)
|
||||
await ar.zadd(KEYNAME, message)
|
||||
await ar.lpush(KEYNAME, message)
|
||||
|
@ -9,17 +9,6 @@ services:
|
||||
- ${PORTAINER_GIT_DIR}:/code
|
||||
env_file:
|
||||
- ../stack.env
|
||||
# volumes_from:
|
||||
# - tmp
|
||||
# depends_on:
|
||||
# broker:
|
||||
# condition: service_started
|
||||
# kafka:
|
||||
# condition: service_healthy
|
||||
# tmp:
|
||||
# condition: service_started
|
||||
# redis:
|
||||
# condition: service_healthy
|
||||
|
||||
threshold:
|
||||
image: pathogen/threshold:latest
|
||||
@ -28,7 +17,7 @@ services:
|
||||
volumes:
|
||||
- ${PORTAINER_GIT_DIR}:/code
|
||||
- ${THRESHOLD_CONFIG_DIR}:/code/legacy/conf/live
|
||||
#- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates
|
||||
- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates
|
||||
- ${THRESHOLD_CERT_DIR}:/code/legacy/conf/cert
|
||||
ports:
|
||||
- "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}"
|
||||
|
@ -67,7 +67,7 @@ def parsemeta(numName, c):
|
||||
|
||||
def queue_message(c):
|
||||
message = json.dumps(c)
|
||||
main.g.sadd("queue", message)
|
||||
main.g.lpush("queue", message)
|
||||
|
||||
|
||||
def event(
|
||||
|
@ -15,6 +15,7 @@ from concurrent.futures import ProcessPoolExecutor
|
||||
# For timestamp processing
|
||||
from datetime import datetime
|
||||
from math import ceil
|
||||
from os import getenv
|
||||
|
||||
import orjson
|
||||
import regex
|
||||
@ -51,6 +52,12 @@ import util
|
||||
# 4chan schema
|
||||
from schemas.ch4_s import ATTRMAP
|
||||
|
||||
trues = ("true", "1", "t", True)
|
||||
|
||||
MONOLITH_PROCESS_PERFSTATS = (
|
||||
getenv("MONOLITH_PROCESS_PERFSTATS", "false").lower() in trues
|
||||
)
|
||||
|
||||
CUSTOM_FILTERS = [
|
||||
lambda x: x.lower(),
|
||||
strip_tags, #
|
||||
@ -267,6 +274,8 @@ def process_data(data):
|
||||
# Add the mutated message to the return buffer
|
||||
to_store.append(msg)
|
||||
total_time += (time.process_time() - total_start) * 1000
|
||||
|
||||
if MONOLITH_PROCESS_PERFSTATS:
|
||||
log.debug("=====================================")
|
||||
log.debug(f"Sentiment: {sentiment_time}")
|
||||
log.debug(f"Regex: {regex_time}")
|
||||
|
@ -36,7 +36,10 @@ class Ingest(object):
|
||||
items = []
|
||||
# for source in SOURCES:
|
||||
# key = f"{KEYPREFIX}{source}"
|
||||
chunk = await db.ar.zpop(KEYNAME, CHUNK_SIZE)
|
||||
length = await db.ar.llen(KEYNAME)
|
||||
start_num = length - CHUNK_SIZE
|
||||
chunk = await db.ar.lrange(KEYNAME, start_num, -1)
|
||||
# chunk = await db.ar.rpop(KEYNAME, CHUNK_SIZE)
|
||||
if not chunk:
|
||||
return
|
||||
for item in chunk:
|
||||
|
Loading…
Reference in New Issue
Block a user