diff --git a/core/lib/rules.py b/core/lib/rules.py index 6b71863..607ff5a 100644 --- a/core/lib/rules.py +++ b/core/lib/rules.py @@ -275,7 +275,7 @@ class NotificationRuleData(object): return - async def ingest_matches(self, index, matches, meta): + async def ingest_matches(self, index, matches, meta, mode): """ Store all matches for an index. :param index: the index to store the matches for @@ -290,9 +290,10 @@ class NotificationRuleData(object): matches_copy[match_index]["rule_uuid"] = self.object.id matches_copy[match_index]["meta"] = meta matches_copy[match_index]["match_ts"] = match_ts + matches_copy[match_index]["mode"] = mode await self.db.async_store_matches(matches_copy) - async def rule_matched(self, index, message, meta): + async def rule_matched(self, index, message, meta, mode): """ A rule has matched. If the previous run did not match, send a notification after formatting @@ -305,10 +306,11 @@ class NotificationRuleData(object): log.debug(f"Rule matched: {index} - current match: {current_match}") if current_match is False: # Matched now, but not before - meta["matched"] = self.format_aggs(meta["aggs"]) + if "matched" not in meta: + meta["matched"] = self.format_aggs(meta["aggs"]) rule_notify(self.object, index, message, meta) self.store_match(index, message) - await self.ingest_matches(index, message, meta) + await self.ingest_matches(index, message, meta, mode) async def rule_no_match(self, index=None): """ @@ -352,7 +354,9 @@ class NotificationRuleData(object): if len(aggs_for_index) == len(self.aggs.keys()): if all(aggs_for_index): # All aggs have matched - await self.rule_matched(index, results[: self.object.amount], meta) + await self.rule_matched( + index, results[: self.object.amount], meta, mode="schedule" + ) continue # Default branch, since the happy path has a continue keyword await self.rule_no_match(index) diff --git a/core/management/commands/processing.py b/core/management/commands/processing.py index 6bf522f..45a3d54 100644 --- a/core/management/commands/processing.py +++ b/core/management/commands/processing.py @@ -1,8 +1,10 @@ import msgpack +from asgiref.sync import async_to_sync from django.core.management.base import BaseCommand from redis import StrictRedis -from core.lib.rules import rule_notify +from core.db.storage import db +from core.lib.rules import NotificationRuleData from core.models import NotificationRule from core.util import logs @@ -79,7 +81,15 @@ def process_rules(data): # Subtract 2, 1 for source and 1 for index if matched_field_number == rule_field_length - 2: meta = {"matched": matched, "total_hits": 1} - rule_notify(rule, index, message, meta=meta) + + # Parse the rule, we saved some work above to avoid doing this, + # but it makes delivering messages significantly easier as we ca + # use the same code as for scheduling. + rule_data_object = NotificationRuleData(rule.user, rule, db=db) + # rule_notify(rule, index, message, meta=meta) + print("ABOUT TO RUN ASYNC TO SYNC") + rule_matched = async_to_sync(rule_data_object.rule_matched) + rule_matched(index, message, meta=meta, mode="ondemand") class Command(BaseCommand): diff --git a/core/views/ui/tables.py b/core/views/ui/tables.py index 35acd8f..eb6c639 100644 --- a/core/views/ui/tables.py +++ b/core/views/ui/tables.py @@ -83,6 +83,7 @@ class DrilldownTable(Table): index = Column() meta = Column() match_ts = Column() + mode = Column() template_name = "ui/drilldown/table_results.html" paginate_by = settings.DRILLDOWN_RESULTS_PER_PAGE diff --git a/docker-compose.yml b/docker-compose.yml index b4a77e3..7e1b865 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -57,6 +57,8 @@ services: condition: service_started networks: - default + - pathogen + - elastic scheduling: image: pathogen/neptune:latest