Compare commits
3 Commits
ab5e85c5c6
...
f774f4c2d2
Author | SHA1 | Date | |
---|---|---|---|
f774f4c2d2 | |||
e32b330ef4 | |||
8c596ec516 |
3
.gitignore
vendored
3
.gitignore
vendored
@ -158,4 +158,5 @@ cython_debug/
|
|||||||
docker/data
|
docker/data
|
||||||
*.pem
|
*.pem
|
||||||
legacy/conf/live/
|
legacy/conf/live/
|
||||||
legacy/conf/cert/
|
legacy/conf/cert/
|
||||||
|
stack.env
|
||||||
|
16
db.py
16
db.py
@ -1,14 +1,19 @@
|
|||||||
import random
|
import random
|
||||||
|
from os import getenv
|
||||||
|
|
||||||
import aioredis
|
import aioredis
|
||||||
import orjson
|
import orjson
|
||||||
|
import redis
|
||||||
|
|
||||||
# Kafka
|
# Kafka
|
||||||
from aiokafka import AIOKafkaProducer
|
from aiokafka import AIOKafkaProducer
|
||||||
import redis
|
|
||||||
|
|
||||||
import util
|
import util
|
||||||
|
|
||||||
|
trues = ("true", "1", "t", True)
|
||||||
|
|
||||||
|
MONOLITH_KAFKA_ENABLED = getenv("MONOLITH_KAFKA_ENABLED", "false").lower() in trues
|
||||||
|
|
||||||
# KAFKA_TOPIC = "msg"
|
# KAFKA_TOPIC = "msg"
|
||||||
|
|
||||||
log = util.get_logger("db")
|
log = util.get_logger("db")
|
||||||
@ -44,8 +49,9 @@ KEYNAME = "queue"
|
|||||||
|
|
||||||
|
|
||||||
async def store_kafka_batch(data):
|
async def store_kafka_batch(data):
|
||||||
print("FAKE STORE KAFKA BATCH")
|
if not MONOLITH_KAFKA_ENABLED:
|
||||||
return
|
log.info(f"Not storing Kafka batch of length {len(data)}, Kafka is disabled.")
|
||||||
|
return
|
||||||
# log.debug(f"Storing Kafka batch of {len(data)} messages")
|
# log.debug(f"Storing Kafka batch of {len(data)} messages")
|
||||||
producer = AIOKafkaProducer(bootstrap_servers="kafka:9092")
|
producer = AIOKafkaProducer(bootstrap_servers="kafka:9092")
|
||||||
await producer.start()
|
await producer.start()
|
||||||
@ -116,7 +122,7 @@ async def queue_message(msg):
|
|||||||
Queue a message on the Redis buffer.
|
Queue a message on the Redis buffer.
|
||||||
"""
|
"""
|
||||||
message = orjson.dumps(msg)
|
message = orjson.dumps(msg)
|
||||||
await ar.zadd(KEYNAME, message)
|
await ar.lpush(KEYNAME, message)
|
||||||
|
|
||||||
|
|
||||||
async def queue_message_bulk(data):
|
async def queue_message_bulk(data):
|
||||||
@ -125,4 +131,4 @@ async def queue_message_bulk(data):
|
|||||||
"""
|
"""
|
||||||
for msg in data:
|
for msg in data:
|
||||||
message = orjson.dumps(msg)
|
message = orjson.dumps(msg)
|
||||||
await ar.zadd(KEYNAME, message)
|
await ar.lpush(KEYNAME, message)
|
||||||
|
@ -9,17 +9,6 @@ services:
|
|||||||
- ${PORTAINER_GIT_DIR}:/code
|
- ${PORTAINER_GIT_DIR}:/code
|
||||||
env_file:
|
env_file:
|
||||||
- ../stack.env
|
- ../stack.env
|
||||||
# volumes_from:
|
|
||||||
# - tmp
|
|
||||||
# depends_on:
|
|
||||||
# broker:
|
|
||||||
# condition: service_started
|
|
||||||
# kafka:
|
|
||||||
# condition: service_healthy
|
|
||||||
# tmp:
|
|
||||||
# condition: service_started
|
|
||||||
# redis:
|
|
||||||
# condition: service_healthy
|
|
||||||
|
|
||||||
threshold:
|
threshold:
|
||||||
image: pathogen/threshold:latest
|
image: pathogen/threshold:latest
|
||||||
@ -28,7 +17,7 @@ services:
|
|||||||
volumes:
|
volumes:
|
||||||
- ${PORTAINER_GIT_DIR}:/code
|
- ${PORTAINER_GIT_DIR}:/code
|
||||||
- ${THRESHOLD_CONFIG_DIR}:/code/legacy/conf/live
|
- ${THRESHOLD_CONFIG_DIR}:/code/legacy/conf/live
|
||||||
#- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates
|
- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates
|
||||||
- ${THRESHOLD_CERT_DIR}:/code/legacy/conf/cert
|
- ${THRESHOLD_CERT_DIR}:/code/legacy/conf/cert
|
||||||
ports:
|
ports:
|
||||||
- "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}"
|
- "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}"
|
||||||
|
@ -67,7 +67,7 @@ def parsemeta(numName, c):
|
|||||||
|
|
||||||
def queue_message(c):
|
def queue_message(c):
|
||||||
message = json.dumps(c)
|
message = json.dumps(c)
|
||||||
main.g.sadd("queue", message)
|
main.g.lpush("queue", message)
|
||||||
|
|
||||||
|
|
||||||
def event(
|
def event(
|
||||||
|
@ -15,6 +15,7 @@ from concurrent.futures import ProcessPoolExecutor
|
|||||||
# For timestamp processing
|
# For timestamp processing
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from math import ceil
|
from math import ceil
|
||||||
|
from os import getenv
|
||||||
|
|
||||||
import orjson
|
import orjson
|
||||||
import regex
|
import regex
|
||||||
@ -51,6 +52,12 @@ import util
|
|||||||
# 4chan schema
|
# 4chan schema
|
||||||
from schemas.ch4_s import ATTRMAP
|
from schemas.ch4_s import ATTRMAP
|
||||||
|
|
||||||
|
trues = ("true", "1", "t", True)
|
||||||
|
|
||||||
|
MONOLITH_PROCESS_PERFSTATS = (
|
||||||
|
getenv("MONOLITH_PROCESS_PERFSTATS", "false").lower() in trues
|
||||||
|
)
|
||||||
|
|
||||||
CUSTOM_FILTERS = [
|
CUSTOM_FILTERS = [
|
||||||
lambda x: x.lower(),
|
lambda x: x.lower(),
|
||||||
strip_tags, #
|
strip_tags, #
|
||||||
@ -267,17 +274,19 @@ def process_data(data):
|
|||||||
# Add the mutated message to the return buffer
|
# Add the mutated message to the return buffer
|
||||||
to_store.append(msg)
|
to_store.append(msg)
|
||||||
total_time += (time.process_time() - total_start) * 1000
|
total_time += (time.process_time() - total_start) * 1000
|
||||||
log.debug("=====================================")
|
|
||||||
log.debug(f"Sentiment: {sentiment_time}")
|
if MONOLITH_PROCESS_PERFSTATS:
|
||||||
log.debug(f"Regex: {regex_time}")
|
log.debug("=====================================")
|
||||||
log.debug(f"Polyglot: {polyglot_time}")
|
log.debug(f"Sentiment: {sentiment_time}")
|
||||||
log.debug(f"Date: {date_time}")
|
log.debug(f"Regex: {regex_time}")
|
||||||
log.debug(f"NLP: {nlp_time}")
|
log.debug(f"Polyglot: {polyglot_time}")
|
||||||
log.debug(f"Normalise: {normalise_time}")
|
log.debug(f"Date: {date_time}")
|
||||||
log.debug(f"Hash: {hash_time}")
|
log.debug(f"NLP: {nlp_time}")
|
||||||
log.debug(f"Normal2: {normal2_time}")
|
log.debug(f"Normalise: {normalise_time}")
|
||||||
log.debug(f"Soup: {soup_time}")
|
log.debug(f"Hash: {hash_time}")
|
||||||
log.debug(f"Total: {total_time}")
|
log.debug(f"Normal2: {normal2_time}")
|
||||||
log.debug("=====================================")
|
log.debug(f"Soup: {soup_time}")
|
||||||
|
log.debug(f"Total: {total_time}")
|
||||||
|
log.debug("=====================================")
|
||||||
|
|
||||||
return to_store
|
return to_store
|
||||||
|
@ -36,7 +36,10 @@ class Ingest(object):
|
|||||||
items = []
|
items = []
|
||||||
# for source in SOURCES:
|
# for source in SOURCES:
|
||||||
# key = f"{KEYPREFIX}{source}"
|
# key = f"{KEYPREFIX}{source}"
|
||||||
chunk = await db.ar.zpop(KEYNAME, CHUNK_SIZE)
|
length = await db.ar.llen(KEYNAME)
|
||||||
|
start_num = length - CHUNK_SIZE
|
||||||
|
chunk = await db.ar.lrange(KEYNAME, start_num, -1)
|
||||||
|
# chunk = await db.ar.rpop(KEYNAME, CHUNK_SIZE)
|
||||||
if not chunk:
|
if not chunk:
|
||||||
return
|
return
|
||||||
for item in chunk:
|
for item in chunk:
|
||||||
|
Loading…
Reference in New Issue
Block a user