Implement ingesting to Redis from Threshold

This commit is contained in:
Mark Veidemanis 2022-09-07 07:20:30 +01:00
parent e3b2e1f36d
commit 79b1bee9e4
10 changed files with 46 additions and 50 deletions

4
db.py
View File

@ -41,7 +41,7 @@ def store_message(msg):
try:
# Bulk index operations
api_response = api_instance.bulk(body_post) # , async_req=True
print(api_response)
# print(api_response)
except ApiException as e:
print("Exception when calling IndexApi->bulk: %s\n" % e)
@ -80,7 +80,7 @@ def store_message_bulk(data):
try:
# Bulk index operations
api_response = api_instance.bulk(body_post) # , async_req=True
print(api_response)
# print(api_response)
except ApiException as e:
print("Exception when calling IndexApi->bulk: %s\n" % e)

View File

@ -13,6 +13,25 @@ services:
depends_on:
- db
threshold:
image: pathogen/threshold:latest
build: ./legacy/docker
volumes:
- ${PORTAINER_GIT_DIR}:/code
- ${THRESHOLD_CONFIG_DIR}:/code/legacy/conf/live
#- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates
- ${THRESHOLD_CERT_DIR}:/code/legacy/conf/cert
ports:
- "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}"
- "${THRESHOLD_RELAY_PORT}:${THRESHOLD_RELAY_PORT}"
- "${THRESHOLD_API_PORT}:${THRESHOLD_API_PORT}"
env_file:
- .env
# for development
extra_hosts:
- "host.docker.internal:host-gateway"
volumes_from:
- tmp
db:
image: manticoresearch/manticore

View File

@ -18,8 +18,8 @@
"Key": "key.pem",
"Certificate": "cert.pem",
"RedisSocket": "/var/run/redis/redis.sock",
"RedisDBEphemeral": 2,
"RedisDBPersistent": 3,
"RedisDBEphemeral": 1,
"RedisDBPersistent": 0,
"UsePassword": false,
"ConnectOnCreate": false,
"AutoReg": false,
@ -31,12 +31,13 @@
"User": "x",
"Password": "x"
},
"Logstash": {
"Host": "127.0.0.1",
"Port": "4000"
"Ingest": {
"Key": "queue.irc",
"Enabled": true
},
"ChanKeep": {
"Enabled": false,
"Provision": false,
"MaxRelay": 30,
"SigSwitch": 20
},

View File

@ -1,27 +0,0 @@
import logging
from json import dumps
import logstash
import main
logger = None
def init_logstash():
global logger
logger = logging.getLogger("ingest")
logger.setLevel(logging.INFO)
logger.addHandler(
logstash.TCPLogstashHandler(
main.config["Logstash"]["Host"],
int(main.config["Logstash"]["Port"]),
version=1,
)
)
def sendLogstashNotification(text):
if logger is not None:
logger.info(dumps(text))
return True
return False

View File

@ -11,8 +11,8 @@ RUN chown pathogen:pathogen /venv
USER pathogen
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
WORKDIR /code
COPY requirements.prod.txt /code/
WORKDIR /code/legacy
COPY requirements.prod.txt /code/legacy
RUN python -m venv /venv
RUN . /venv/bin/activate && pip install -r requirements.prod.txt
CMD . /venv/bin/activate && exec python /code/threshold
RUN . /venv/bin/activate && pip install -r /code/legacy/requirements.prod.txt
CMD . /venv/bin/activate && exec python /code/legacy/threshold

View File

@ -1,2 +0,0 @@
unixsocket /var/run/redis/redis.sock
unixsocketperm 777

View File

@ -3,7 +3,6 @@ twisted
pyOpenSSL
redis
pyYaML
python-logstash
service_identity
siphashc
Klein

View File

@ -1,5 +1,6 @@
import json
import main
from core.logstash import sendLogstashNotification
from core.relay import sendRelayNotification
from modules import userinfo
from utils.dedup import dedup
@ -64,6 +65,12 @@ def parsemeta(numName, c):
)
def queue_message(c):
message = json.dumps(c)
print("APPENDING MESSAGE", message)
main.g.sadd(main.config["Ingest"]["Key"], message)
def event(
numName, c
): # yes I'm using a short variable because otherwise it goes off the screen
@ -75,8 +82,9 @@ def event(
if "muser" in c.keys():
del c["muser"]
sortedKeys = {k: c[k] for k in order if k in c} # Sort dict keys according to order
sortedKeys["src"] = "irc"
if main.config["Logstash"]["Enabled"]:
sendLogstashNotification(sortedKeys)
sendRelayNotification(sortedKeys)
# sortedKeys = {k: c[k] for k in order if k in c} # Sort dict keys according to order
# sortedKeys["src"] = "irc"
c["src"] = "irc"
if main.config["Ingest"]["Enabled"]:
queue_message(c)
sendRelayNotification(c)

View File

@ -4,7 +4,6 @@ twisted
pyOpenSSL
redis
pyYaML
python-logstash
service_identity
siphashc
Klein

View File

@ -5,7 +5,6 @@ from os import getenv
from signal import SIGINT, signal
from sys import stderr, stdout
import core.logstash
import main
import modules.counters
from api.views import API
@ -36,7 +35,7 @@ if "--migrate" in sys.argv:
loadCommands()
core.logstash.init_logstash()
# core.logstash.init_logstash()
signal(SIGINT, handler) # Handle Ctrl-C and run the cleanup routine
stdout = getwriter("utf8")(stdout) # this is a generic fix but we all know
stderr = getwriter("utf8")(stderr) # it's just for the retards on Rizon using