From 4dd8224a77d4e85df2c0fe004d1c93394c35b0cd Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Thu, 12 Jan 2023 07:20:48 +0000 Subject: [PATCH] Finish implementing notification rules --- core/lib/notify.py | 38 ++++++++++ core/lib/rules.py | 74 ++++++++++++++++++-- core/management/__init__.py | 0 core/management/commands/__init__.py | 0 core/management/commands/processing.py | 22 ++++++ core/migrations/0012_notificationrule.py | 2 +- core/migrations/0013_notificationsettings.py | 2 +- core/models.py | 19 ++++- core/views/notifications.py | 6 +- docker-compose.yml | 35 ++++++++- docker/redis.conf | 5 +- requirements.txt | 1 + 12 files changed, 192 insertions(+), 12 deletions(-) create mode 100644 core/lib/notify.py create mode 100644 core/management/__init__.py create mode 100644 core/management/commands/__init__.py create mode 100644 core/management/commands/processing.py diff --git a/core/lib/notify.py b/core/lib/notify.py new file mode 100644 index 0000000..8546b51 --- /dev/null +++ b/core/lib/notify.py @@ -0,0 +1,38 @@ +import requests + +from core.util import logs + +NTFY_URL = "https://ntfy.sh" + +log = logs.get_logger(__name__) + + +# Actual function to send a message to a topic +def raw_sendmsg(msg, title=None, priority=None, tags=None, url=None, topic=None): + if url is None: + url = NTFY_URL + headers = {"Title": "Fisk"} + if title: + headers["Title"] = title + if priority: + headers["Priority"] = priority + if tags: + headers["Tags"] = tags + requests.post( + f"{url}/{topic}", + data=msg, + headers=headers, + ) + + +# Sendmsg helper to send a message to a user's notification settings +def sendmsg(user, *args, **kwargs): + notification_settings = user.get_notification_settings() + + if notification_settings.ntfy_topic is None: + # No topic set, so don't send + return + else: + topic = notification_settings.ntfy_topic + + raw_sendmsg(*args, **kwargs, url=notification_settings.ntfy_url, topic=topic) diff --git a/core/lib/rules.py b/core/lib/rules.py index f3ba41f..741ce34 100644 --- a/core/lib/rules.py +++ b/core/lib/rules.py @@ -1,12 +1,76 @@ -from core.db.storage import db -from yaml import load, dump -from yaml.scanner import ScannerError +from yaml import dump, load from yaml.parser import ParserError +from yaml.scanner import ScannerError + +from core.db.storage import db +from core.models import NotificationRule + try: - from yaml import CLoader as Loader, CDumper as Dumper + from yaml import CDumper as Dumper + from yaml import CLoader as Loader except ImportError: from yaml import Loader, Dumper +from core.lib.notify import sendmsg +from core.util import logs + +log = logs.get_logger("rules") + + +def rule_matched(rule, message, matched_fields): + title = f"Rule {rule.name} matched" + + # Dump the message in YAML for readability + message = dump(message, Dumper=Dumper, default_flow_style=False) + matched_fields = ", ".join(matched_fields) + + notify_message = f"{rule.name} matched on {matched_fields}\n{message}" + notify_message = notify_message.encode("utf-8", "replace") + sendmsg(rule.user, notify_message, title=title) + + +def process_rules(data): + all_rules = NotificationRule.objects.filter(enabled=True) + + for index, index_messages in data.items(): + for message in index_messages: + for rule in all_rules: + parsed_rule = rule.parse() + if "index" not in parsed_rule: + log.debug("No index specified in rule, skipping") + continue + if "source" not in parsed_rule: + log.debug("No source specified in rule, skipping") + continue + rule_index = parsed_rule["index"] + rule_source = parsed_rule["source"] + if not type(rule_index) == list: + rule_index = [rule_index] + if not type(rule_source) == list: + rule_source = [rule_source] + if index not in rule_index: + log.debug(f"{index} not in {rule_index}") + continue + if message["src"] not in rule_source: + log.debug(f"{message['src']} not in {rule_source}") + continue + + rule_field_length = len(parsed_rule.keys()) + matched_field_number = 0 + matched_fields = [] + for field, value in parsed_rule.items(): + if not type(value) == list: + value = [value] + if field == "src": + continue + if field in message and message[field] in value: + matched_field_number += 1 + matched_fields.append(field) + print("Matched field", field, message[field], value) + if matched_field_number == rule_field_length - 2: + rule_matched(rule, message, matched_fields) + + class NotificationRuleData(object): def __init__(self, user, data): self.user = user @@ -57,4 +121,4 @@ class NotificationRuleData(object): return dump(self.parsed, Dumper=Dumper) def get_data(self): - return self.parsed \ No newline at end of file + return self.parsed diff --git a/core/management/__init__.py b/core/management/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/core/management/commands/__init__.py b/core/management/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/core/management/commands/processing.py b/core/management/commands/processing.py new file mode 100644 index 0000000..ae019df --- /dev/null +++ b/core/management/commands/processing.py @@ -0,0 +1,22 @@ +from django.core.management.base import BaseCommand, CommandError +from redis import StrictRedis +from core.util import logs +from core.lib.rules import process_rules +import msgpack + +log = logs.get_logger("processing") + +class Command(BaseCommand): + def handle(self, *args, **options): + r = StrictRedis(unix_socket_path="/var/run/socks/redis.sock", db=0) + p = r.pubsub() + p.psubscribe("messages") + for message in p.listen(): + if message: + if message["channel"] == b"messages": + data = message["data"] + try: + unpacked = msgpack.unpackb(data, raw=False) + except TypeError: + continue + process_rules(unpacked) diff --git a/core/migrations/0012_notificationrule.py b/core/migrations/0012_notificationrule.py index 6aea12a..db69bb3 100644 --- a/core/migrations/0012_notificationrule.py +++ b/core/migrations/0012_notificationrule.py @@ -1,8 +1,8 @@ # Generated by Django 4.1.3 on 2023-01-12 15:12 +import django.db.models.deletion from django.conf import settings from django.db import migrations, models -import django.db.models.deletion class Migration(migrations.Migration): diff --git a/core/migrations/0013_notificationsettings.py b/core/migrations/0013_notificationsettings.py index 4350968..ee44954 100644 --- a/core/migrations/0013_notificationsettings.py +++ b/core/migrations/0013_notificationsettings.py @@ -1,8 +1,8 @@ # Generated by Django 4.1.3 on 2023-01-12 15:25 +import django.db.models.deletion from django.conf import settings from django.db import migrations, models -import django.db.models.deletion class Migration(migrations.Migration): diff --git a/core/models.py b/core/models.py index 9281104..13b0c92 100644 --- a/core/models.py +++ b/core/models.py @@ -3,9 +3,16 @@ import logging import stripe from django.contrib.auth.models import AbstractUser from django.db import models +from yaml import load +from yaml.parser import ParserError +from yaml.scanner import ScannerError from core.lib.customers import get_or_create, update_customer_fields +try: + from yaml import CLoader as Loader +except ImportError: + from yaml import Loader logger = logging.getLogger(__name__) @@ -60,6 +67,9 @@ class User(AbstractUser): plan_list = [plan.name for plan in self.plans.all()] return plan in plan_list + def get_notification_settings(self): + return NotificationSettings.objects.get_or_create(user=self)[0] + class Session(models.Model): user = models.ForeignKey(User, on_delete=models.CASCADE) @@ -131,7 +141,14 @@ class NotificationRule(models.Model): data = models.TextField() def __str__(self): - return f"{self.user} - {self.rule}" + return f"{self.user} - {self.name}" + + def parse(self): + try: + parsed = load(self.data, Loader=Loader) + except (ScannerError, ParserError) as e: + raise ValueError(f"Invalid YAML: {e}") + return parsed class NotificationSettings(models.Model): diff --git a/core/views/notifications.py b/core/views/notifications.py index 565589e..0275b36 100644 --- a/core/views/notifications.py +++ b/core/views/notifications.py @@ -1,8 +1,9 @@ from django.contrib.auth.mixins import LoginRequiredMixin + from core.forms import NotificationRuleForm, NotificationSettingsForm from core.models import NotificationRule, NotificationSettings +from core.views.helpers import ObjectCreate, ObjectDelete, ObjectList, ObjectUpdate -from core.views.helpers import ObjectCreate, ObjectList, ObjectUpdate, ObjectDelete # Notifications - we create a new notification settings object if there isn't one # Hence, there is only an update view, not a create view. @@ -28,6 +29,7 @@ class NotificationsUpdate(LoginRequiredMixin, ObjectUpdate): ) return notification_settings + class RuleList(LoginRequiredMixin, ObjectList): list_template = "partials/rule-list.html" model = NotificationRule @@ -54,4 +56,4 @@ class RuleUpdate(LoginRequiredMixin, ObjectUpdate): class RuleDelete(LoginRequiredMixin, ObjectDelete): - model = NotificationRule \ No newline at end of file + model = NotificationRule diff --git a/docker-compose.yml b/docker-compose.yml index 04830f3..b222c71 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -30,6 +30,34 @@ services: - pathogen - elastic + processing: + image: pathogen/neptune:latest + container_name: processing_neptune + build: + context: . + args: + OPERATION: ${OPERATION} + command: sh -c '. /venv/bin/activate && python manage.py processing' + volumes: + - ${PORTAINER_GIT_DIR}:/code + - ${PORTAINER_GIT_DIR}/docker/uwsgi.ini:/conf/uwsgi.ini + #- ${APP_LOCAL_SETTINGS}:/code/app/local_settings.py + - ${APP_DATABASE_FILE}:/code/db.sqlite3 + - neptune_static:${STATIC_ROOT} + env_file: + - stack.env + volumes_from: + - tmp + depends_on: + redis: + condition: service_healthy + migration: + condition: service_started + collectstatic: + condition: service_started + networks: + - default + migration: image: pathogen/neptune:latest container_name: migration_neptune @@ -62,6 +90,8 @@ services: - ${APP_LOCAL_SETTINGS}:/code/app/local_settings.py - ${APP_DATABASE_FILE}:/code/db.sqlite3 - neptune_static:${STATIC_ROOT} + volumes_from: + - tmp env_file: - stack.env depends_on: @@ -108,7 +138,7 @@ services: soft: 65535 hard: 65535 volumes: - - ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf + - ${PORTAINER_GIT_DIR}/docker/redis.conf:/etc/redis.conf volumes_from: - tmp healthcheck: @@ -116,6 +146,9 @@ services: interval: 2s timeout: 2s retries: 15 + networks: + - default + - pathogen networks: default: diff --git a/docker/redis.conf b/docker/redis.conf index 424a612..7f886a2 100644 --- a/docker/redis.conf +++ b/docker/redis.conf @@ -1,2 +1,5 @@ unixsocket /var/run/socks/redis.sock -unixsocketperm 777 \ No newline at end of file +unixsocketperm 777 + +# For Monolith PubSub +port 6379 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index e3e698a..0d27460 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,3 +18,4 @@ sortedcontainers django-debug-toolbar django-debug-toolbar-template-profiler orjson +msgpack