Implement ingesting to Redis from Threshold
This commit is contained in:
parent
a6b5348224
commit
49784dfbe5
4
db.py
4
db.py
|
@ -41,7 +41,7 @@ def store_message(msg):
|
||||||
try:
|
try:
|
||||||
# Bulk index operations
|
# Bulk index operations
|
||||||
api_response = api_instance.bulk(body_post) # , async_req=True
|
api_response = api_instance.bulk(body_post) # , async_req=True
|
||||||
print(api_response)
|
# print(api_response)
|
||||||
except ApiException as e:
|
except ApiException as e:
|
||||||
print("Exception when calling IndexApi->bulk: %s\n" % e)
|
print("Exception when calling IndexApi->bulk: %s\n" % e)
|
||||||
|
|
||||||
|
@ -80,7 +80,7 @@ def store_message_bulk(data):
|
||||||
try:
|
try:
|
||||||
# Bulk index operations
|
# Bulk index operations
|
||||||
api_response = api_instance.bulk(body_post) # , async_req=True
|
api_response = api_instance.bulk(body_post) # , async_req=True
|
||||||
print(api_response)
|
# print(api_response)
|
||||||
except ApiException as e:
|
except ApiException as e:
|
||||||
print("Exception when calling IndexApi->bulk: %s\n" % e)
|
print("Exception when calling IndexApi->bulk: %s\n" % e)
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,25 @@ services:
|
||||||
depends_on:
|
depends_on:
|
||||||
- db
|
- db
|
||||||
|
|
||||||
|
threshold:
|
||||||
|
image: pathogen/threshold:latest
|
||||||
|
build: ./legacy/docker
|
||||||
|
volumes:
|
||||||
|
- ${PORTAINER_GIT_DIR}:/code
|
||||||
|
- ${THRESHOLD_CONFIG_DIR}:/code/legacy/conf/live
|
||||||
|
#- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates
|
||||||
|
- ${THRESHOLD_CERT_DIR}:/code/legacy/conf/cert
|
||||||
|
ports:
|
||||||
|
- "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}"
|
||||||
|
- "${THRESHOLD_RELAY_PORT}:${THRESHOLD_RELAY_PORT}"
|
||||||
|
- "${THRESHOLD_API_PORT}:${THRESHOLD_API_PORT}"
|
||||||
|
env_file:
|
||||||
|
- .env
|
||||||
|
# for development
|
||||||
|
extra_hosts:
|
||||||
|
- "host.docker.internal:host-gateway"
|
||||||
|
volumes_from:
|
||||||
|
- tmp
|
||||||
|
|
||||||
db:
|
db:
|
||||||
image: manticoresearch/manticore
|
image: manticoresearch/manticore
|
||||||
|
|
|
@ -18,8 +18,8 @@
|
||||||
"Key": "key.pem",
|
"Key": "key.pem",
|
||||||
"Certificate": "cert.pem",
|
"Certificate": "cert.pem",
|
||||||
"RedisSocket": "/var/run/redis/redis.sock",
|
"RedisSocket": "/var/run/redis/redis.sock",
|
||||||
"RedisDBEphemeral": 2,
|
"RedisDBEphemeral": 1,
|
||||||
"RedisDBPersistent": 3,
|
"RedisDBPersistent": 0,
|
||||||
"UsePassword": false,
|
"UsePassword": false,
|
||||||
"ConnectOnCreate": false,
|
"ConnectOnCreate": false,
|
||||||
"AutoReg": false,
|
"AutoReg": false,
|
||||||
|
@ -31,12 +31,13 @@
|
||||||
"User": "x",
|
"User": "x",
|
||||||
"Password": "x"
|
"Password": "x"
|
||||||
},
|
},
|
||||||
"Logstash": {
|
"Ingest": {
|
||||||
"Host": "127.0.0.1",
|
"Key": "queue.irc",
|
||||||
"Port": "4000"
|
"Enabled": true
|
||||||
},
|
},
|
||||||
"ChanKeep": {
|
"ChanKeep": {
|
||||||
"Enabled": false,
|
"Enabled": false,
|
||||||
|
"Provision": false,
|
||||||
"MaxRelay": 30,
|
"MaxRelay": 30,
|
||||||
"SigSwitch": 20
|
"SigSwitch": 20
|
||||||
},
|
},
|
||||||
|
|
|
@ -1,27 +0,0 @@
|
||||||
import logging
|
|
||||||
from json import dumps
|
|
||||||
|
|
||||||
import logstash
|
|
||||||
import main
|
|
||||||
|
|
||||||
logger = None
|
|
||||||
|
|
||||||
|
|
||||||
def init_logstash():
|
|
||||||
global logger
|
|
||||||
logger = logging.getLogger("ingest")
|
|
||||||
logger.setLevel(logging.INFO)
|
|
||||||
logger.addHandler(
|
|
||||||
logstash.TCPLogstashHandler(
|
|
||||||
main.config["Logstash"]["Host"],
|
|
||||||
int(main.config["Logstash"]["Port"]),
|
|
||||||
version=1,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def sendLogstashNotification(text):
|
|
||||||
if logger is not None:
|
|
||||||
logger.info(dumps(text))
|
|
||||||
return True
|
|
||||||
return False
|
|
|
@ -11,8 +11,8 @@ RUN chown pathogen:pathogen /venv
|
||||||
USER pathogen
|
USER pathogen
|
||||||
ENV PYTHONDONTWRITEBYTECODE=1
|
ENV PYTHONDONTWRITEBYTECODE=1
|
||||||
ENV PYTHONUNBUFFERED=1
|
ENV PYTHONUNBUFFERED=1
|
||||||
WORKDIR /code
|
WORKDIR /code/legacy
|
||||||
COPY requirements.prod.txt /code/
|
COPY requirements.prod.txt /code/legacy
|
||||||
RUN python -m venv /venv
|
RUN python -m venv /venv
|
||||||
RUN . /venv/bin/activate && pip install -r requirements.prod.txt
|
RUN . /venv/bin/activate && pip install -r /code/legacy/requirements.prod.txt
|
||||||
CMD . /venv/bin/activate && exec python /code/threshold
|
CMD . /venv/bin/activate && exec python /code/legacy/threshold
|
|
@ -1,2 +0,0 @@
|
||||||
unixsocket /var/run/redis/redis.sock
|
|
||||||
unixsocketperm 777
|
|
|
@ -3,7 +3,6 @@ twisted
|
||||||
pyOpenSSL
|
pyOpenSSL
|
||||||
redis
|
redis
|
||||||
pyYaML
|
pyYaML
|
||||||
python-logstash
|
|
||||||
service_identity
|
service_identity
|
||||||
siphashc
|
siphashc
|
||||||
Klein
|
Klein
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
|
import json
|
||||||
|
|
||||||
import main
|
import main
|
||||||
from core.logstash import sendLogstashNotification
|
|
||||||
from core.relay import sendRelayNotification
|
from core.relay import sendRelayNotification
|
||||||
from modules import userinfo
|
from modules import userinfo
|
||||||
from utils.dedup import dedup
|
from utils.dedup import dedup
|
||||||
|
@ -64,6 +65,12 @@ def parsemeta(numName, c):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def queue_message(c):
|
||||||
|
message = json.dumps(c)
|
||||||
|
print("APPENDING MESSAGE", message)
|
||||||
|
main.g.sadd(main.config["Ingest"]["Key"], message)
|
||||||
|
|
||||||
|
|
||||||
def event(
|
def event(
|
||||||
numName, c
|
numName, c
|
||||||
): # yes I'm using a short variable because otherwise it goes off the screen
|
): # yes I'm using a short variable because otherwise it goes off the screen
|
||||||
|
@ -75,8 +82,9 @@ def event(
|
||||||
|
|
||||||
if "muser" in c.keys():
|
if "muser" in c.keys():
|
||||||
del c["muser"]
|
del c["muser"]
|
||||||
sortedKeys = {k: c[k] for k in order if k in c} # Sort dict keys according to order
|
# sortedKeys = {k: c[k] for k in order if k in c} # Sort dict keys according to order
|
||||||
sortedKeys["src"] = "irc"
|
# sortedKeys["src"] = "irc"
|
||||||
if main.config["Logstash"]["Enabled"]:
|
c["src"] = "irc"
|
||||||
sendLogstashNotification(sortedKeys)
|
if main.config["Ingest"]["Enabled"]:
|
||||||
sendRelayNotification(sortedKeys)
|
queue_message(c)
|
||||||
|
sendRelayNotification(c)
|
||||||
|
|
|
@ -4,7 +4,6 @@ twisted
|
||||||
pyOpenSSL
|
pyOpenSSL
|
||||||
redis
|
redis
|
||||||
pyYaML
|
pyYaML
|
||||||
python-logstash
|
|
||||||
service_identity
|
service_identity
|
||||||
siphashc
|
siphashc
|
||||||
Klein
|
Klein
|
||||||
|
|
|
@ -5,7 +5,6 @@ from os import getenv
|
||||||
from signal import SIGINT, signal
|
from signal import SIGINT, signal
|
||||||
from sys import stderr, stdout
|
from sys import stderr, stdout
|
||||||
|
|
||||||
import core.logstash
|
|
||||||
import main
|
import main
|
||||||
import modules.counters
|
import modules.counters
|
||||||
from api.views import API
|
from api.views import API
|
||||||
|
@ -36,7 +35,7 @@ if "--migrate" in sys.argv:
|
||||||
loadCommands()
|
loadCommands()
|
||||||
|
|
||||||
|
|
||||||
core.logstash.init_logstash()
|
# core.logstash.init_logstash()
|
||||||
signal(SIGINT, handler) # Handle Ctrl-C and run the cleanup routine
|
signal(SIGINT, handler) # Handle Ctrl-C and run the cleanup routine
|
||||||
stdout = getwriter("utf8")(stdout) # this is a generic fix but we all know
|
stdout = getwriter("utf8")(stdout) # this is a generic fix but we all know
|
||||||
stderr = getwriter("utf8")(stderr) # it's just for the retards on Rizon using
|
stderr = getwriter("utf8")(stderr) # it's just for the retards on Rizon using
|
||||||
|
|
Loading…
Reference in New Issue