Finish implementing notification rules

master
Mark Veidemanis 1 year ago
parent f93d37d1c0
commit 4dd8224a77
Signed by: m
GPG Key ID: 5ACFCEED46C0904F

@ -0,0 +1,38 @@
import requests
from core.util import logs
NTFY_URL = "https://ntfy.sh"
log = logs.get_logger(__name__)
# Actual function to send a message to a topic
def raw_sendmsg(msg, title=None, priority=None, tags=None, url=None, topic=None):
if url is None:
url = NTFY_URL
headers = {"Title": "Fisk"}
if title:
headers["Title"] = title
if priority:
headers["Priority"] = priority
if tags:
headers["Tags"] = tags
requests.post(
f"{url}/{topic}",
data=msg,
headers=headers,
)
# Sendmsg helper to send a message to a user's notification settings
def sendmsg(user, *args, **kwargs):
notification_settings = user.get_notification_settings()
if notification_settings.ntfy_topic is None:
# No topic set, so don't send
return
else:
topic = notification_settings.ntfy_topic
raw_sendmsg(*args, **kwargs, url=notification_settings.ntfy_url, topic=topic)

@ -1,12 +1,76 @@
from core.db.storage import db
from yaml import load, dump
from yaml.scanner import ScannerError
from yaml import dump, load
from yaml.parser import ParserError
from yaml.scanner import ScannerError
from core.db.storage import db
from core.models import NotificationRule
try:
from yaml import CLoader as Loader, CDumper as Dumper
from yaml import CDumper as Dumper
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader, Dumper
from core.lib.notify import sendmsg
from core.util import logs
log = logs.get_logger("rules")
def rule_matched(rule, message, matched_fields):
title = f"Rule {rule.name} matched"
# Dump the message in YAML for readability
message = dump(message, Dumper=Dumper, default_flow_style=False)
matched_fields = ", ".join(matched_fields)
notify_message = f"{rule.name} matched on {matched_fields}\n{message}"
notify_message = notify_message.encode("utf-8", "replace")
sendmsg(rule.user, notify_message, title=title)
def process_rules(data):
all_rules = NotificationRule.objects.filter(enabled=True)
for index, index_messages in data.items():
for message in index_messages:
for rule in all_rules:
parsed_rule = rule.parse()
if "index" not in parsed_rule:
log.debug("No index specified in rule, skipping")
continue
if "source" not in parsed_rule:
log.debug("No source specified in rule, skipping")
continue
rule_index = parsed_rule["index"]
rule_source = parsed_rule["source"]
if not type(rule_index) == list:
rule_index = [rule_index]
if not type(rule_source) == list:
rule_source = [rule_source]
if index not in rule_index:
log.debug(f"{index} not in {rule_index}")
continue
if message["src"] not in rule_source:
log.debug(f"{message['src']} not in {rule_source}")
continue
rule_field_length = len(parsed_rule.keys())
matched_field_number = 0
matched_fields = []
for field, value in parsed_rule.items():
if not type(value) == list:
value = [value]
if field == "src":
continue
if field in message and message[field] in value:
matched_field_number += 1
matched_fields.append(field)
print("Matched field", field, message[field], value)
if matched_field_number == rule_field_length - 2:
rule_matched(rule, message, matched_fields)
class NotificationRuleData(object):
def __init__(self, user, data):
self.user = user
@ -57,4 +121,4 @@ class NotificationRuleData(object):
return dump(self.parsed, Dumper=Dumper)
def get_data(self):
return self.parsed
return self.parsed

@ -0,0 +1,22 @@
from django.core.management.base import BaseCommand, CommandError
from redis import StrictRedis
from core.util import logs
from core.lib.rules import process_rules
import msgpack
log = logs.get_logger("processing")
class Command(BaseCommand):
def handle(self, *args, **options):
r = StrictRedis(unix_socket_path="/var/run/socks/redis.sock", db=0)
p = r.pubsub()
p.psubscribe("messages")
for message in p.listen():
if message:
if message["channel"] == b"messages":
data = message["data"]
try:
unpacked = msgpack.unpackb(data, raw=False)
except TypeError:
continue
process_rules(unpacked)

@ -1,8 +1,8 @@
# Generated by Django 4.1.3 on 2023-01-12 15:12
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):

@ -1,8 +1,8 @@
# Generated by Django 4.1.3 on 2023-01-12 15:25
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):

@ -3,9 +3,16 @@ import logging
import stripe
from django.contrib.auth.models import AbstractUser
from django.db import models
from yaml import load
from yaml.parser import ParserError
from yaml.scanner import ScannerError
from core.lib.customers import get_or_create, update_customer_fields
try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader
logger = logging.getLogger(__name__)
@ -60,6 +67,9 @@ class User(AbstractUser):
plan_list = [plan.name for plan in self.plans.all()]
return plan in plan_list
def get_notification_settings(self):
return NotificationSettings.objects.get_or_create(user=self)[0]
class Session(models.Model):
user = models.ForeignKey(User, on_delete=models.CASCADE)
@ -131,7 +141,14 @@ class NotificationRule(models.Model):
data = models.TextField()
def __str__(self):
return f"{self.user} - {self.rule}"
return f"{self.user} - {self.name}"
def parse(self):
try:
parsed = load(self.data, Loader=Loader)
except (ScannerError, ParserError) as e:
raise ValueError(f"Invalid YAML: {e}")
return parsed
class NotificationSettings(models.Model):

@ -1,8 +1,9 @@
from django.contrib.auth.mixins import LoginRequiredMixin
from core.forms import NotificationRuleForm, NotificationSettingsForm
from core.models import NotificationRule, NotificationSettings
from core.views.helpers import ObjectCreate, ObjectDelete, ObjectList, ObjectUpdate
from core.views.helpers import ObjectCreate, ObjectList, ObjectUpdate, ObjectDelete
# Notifications - we create a new notification settings object if there isn't one
# Hence, there is only an update view, not a create view.
@ -28,6 +29,7 @@ class NotificationsUpdate(LoginRequiredMixin, ObjectUpdate):
)
return notification_settings
class RuleList(LoginRequiredMixin, ObjectList):
list_template = "partials/rule-list.html"
model = NotificationRule
@ -54,4 +56,4 @@ class RuleUpdate(LoginRequiredMixin, ObjectUpdate):
class RuleDelete(LoginRequiredMixin, ObjectDelete):
model = NotificationRule
model = NotificationRule

@ -30,6 +30,34 @@ services:
- pathogen
- elastic
processing:
image: pathogen/neptune:latest
container_name: processing_neptune
build:
context: .
args:
OPERATION: ${OPERATION}
command: sh -c '. /venv/bin/activate && python manage.py processing'
volumes:
- ${PORTAINER_GIT_DIR}:/code
- ${PORTAINER_GIT_DIR}/docker/uwsgi.ini:/conf/uwsgi.ini
#- ${APP_LOCAL_SETTINGS}:/code/app/local_settings.py
- ${APP_DATABASE_FILE}:/code/db.sqlite3
- neptune_static:${STATIC_ROOT}
env_file:
- stack.env
volumes_from:
- tmp
depends_on:
redis:
condition: service_healthy
migration:
condition: service_started
collectstatic:
condition: service_started
networks:
- default
migration:
image: pathogen/neptune:latest
container_name: migration_neptune
@ -62,6 +90,8 @@ services:
- ${APP_LOCAL_SETTINGS}:/code/app/local_settings.py
- ${APP_DATABASE_FILE}:/code/db.sqlite3
- neptune_static:${STATIC_ROOT}
volumes_from:
- tmp
env_file:
- stack.env
depends_on:
@ -108,7 +138,7 @@ services:
soft: 65535
hard: 65535
volumes:
- ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf
- ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf
volumes_from:
- tmp
healthcheck:
@ -116,6 +146,9 @@ services:
interval: 2s
timeout: 2s
retries: 15
networks:
- default
- pathogen
networks:
default:

@ -1,2 +1,5 @@
unixsocket /var/run/socks/redis.sock
unixsocketperm 777
unixsocketperm 777
# For Monolith PubSub
port 6379

@ -18,3 +18,4 @@ sortedcontainers
django-debug-toolbar
django-debug-toolbar-template-profiler
orjson
msgpack

Loading…
Cancel
Save