From 0d564788b60470db85cdc11f78b990d0c077a74b Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Thu, 9 Feb 2023 19:11:38 +0000 Subject: [PATCH] Implement policy parsing and add batch_id to rules --- core/lib/parsing.py | 1 + core/lib/rules.py | 94 +++++++++++++++++++++++--------- core/static/js/column-shifter.js | 3 +- 3 files changed, 71 insertions(+), 27 deletions(-) diff --git a/core/lib/parsing.py b/core/lib/parsing.py index 31ce031..ed9942a 100644 --- a/core/lib/parsing.py +++ b/core/lib/parsing.py @@ -118,6 +118,7 @@ def parse_source(user, query_params, raise_error=False): if source: sources = [source] else: + print("NOT SOURCE") sources = list(settings.MAIN_SOURCES) if user.has_perm("core.restricted_sources"): for source_iter in settings.SOURCES_RESTRICTED: diff --git a/core/lib/rules.py b/core/lib/rules.py index b44c937..961c897 100644 --- a/core/lib/rules.py +++ b/core/lib/rules.py @@ -8,6 +8,7 @@ try: except ImportError: from yaml import Loader, Dumper +import uuid from datetime import datetime import orjson @@ -286,12 +287,18 @@ class NotificationRuleData(object): matches = [matches] matches_copy = matches.copy() match_ts = datetime.utcnow().isoformat() + batch_id = uuid.uuid4() + + # Filter empty fields in meta + meta = {k: v for k, v in meta.items() if v} + for match_index, _ in enumerate(matches_copy): matches_copy[match_index]["index"] = index - matches_copy[match_index]["rule_uuid"] = self.object.id + matches_copy[match_index]["rule_id"] = str(self.object.id) matches_copy[match_index]["meta"] = meta matches_copy[match_index]["match_ts"] = match_ts matches_copy[match_index]["mode"] = mode + matches_copy[match_index]["batch_id"] = str(batch_id) return matches_copy async def ingest_matches(self, index, matches, meta, mode): @@ -324,15 +331,25 @@ class NotificationRuleData(object): current_match = self.get_match(index, message) log.debug(f"Rule matched: {index} - current match: {current_match}") - # Default policy: Trigger only when results change + last_run_had_matches = current_match is True + + if self.policy in ["change", "default"]: + # Change or Default policy, notifying only on new results + if last_run_had_matches: + # Last run had matches, and this one did too + # We don't need to notify + return + + elif self.policy == "always": + # Only here for completeness, we notify below by default + pass - if current_match is False: - # Matched now, but not before - if "matched" not in meta: - meta["matched"] = self.format_aggs(meta["aggs"]) - rule_notify(self.object, index, message, meta) - self.store_match(index, message) - await self.ingest_matches(index, message, meta, mode) + # We hit the return above if we don't need to notify + if "aggs" in meta and "matched" not in meta: + meta["matched"] = self.format_aggs(meta["aggs"]) + rule_notify(self.object, index, message, meta) + self.store_match(index, message) + await self.ingest_matches(index, message, meta, mode) def rule_matched_sync(self, index, message, meta, mode): """ @@ -346,14 +363,25 @@ class NotificationRuleData(object): current_match = self.get_match(index, message) log.debug(f"Rule matched: {index} - current match: {current_match}") - # Default policy: Trigger only when results change - if current_match is False: - # Matched now, but not before - if "matched" not in meta: - meta["matched"] = self.format_aggs(meta["aggs"]) - rule_notify(self.object, index, message, meta) - self.store_match(index, message) - self.ingest_matches_sync(index, message, meta, mode) + last_run_had_matches = current_match is True + + if self.policy in ["change", "default"]: + # Change or Default policy, notifying only on new results + if last_run_had_matches: + # Last run had matches, and this one did too + # We don't need to notify + return + + elif self.policy == "always": + # Only here for completeness, we notify below by default + pass + + # We hit the return above if we don't need to notify + if "aggs" in meta and "matched" not in meta: + meta["matched"] = self.format_aggs(meta["aggs"]) + rule_notify(self.object, index, message, meta) + self.store_match(index, message) + self.ingest_matches_sync(index, message, meta, mode) # No async helper for this one as we only need it for schedules async def rule_no_match(self, index=None, message=None): @@ -367,15 +395,29 @@ class NotificationRuleData(object): current_match = self.get_match(index) log.debug(f"Rule not matched: {index} - current match: {current_match}") - # Change policy: When there are no results following a successful run - if current_match is True: - # Matched before, but not now - if self.object.send_empty: - rule_notify(self.object, index, "no_match", None) - self.store_match(index, False) - await self.ingest_matches( - index=index, message={}, meta={"msg": message}, mode="schedule" - ) + last_run_had_matches = current_match is True + if self.policy in ["change", "default"]: + print("policy in change or default") + # Change or Default policy, notifying only on new results + if not last_run_had_matches: + print("last run did not have matches") + # Last run did not have matches, nor did this one + # We don't need to notify + return + + elif self.policy == "always": + print("policy is always") + # Only here for completeness, we notify below by default + pass + + # Matched before, but not now + if self.policy in ["change", "always"]: + print("policy in change or always") + rule_notify(self.object, index, "no_match", None) + self.store_match(index, False) + await self.ingest_matches( + index=index, matches=[{"msg": None}], meta={"msg": message}, mode="schedule" + ) async def run_schedule(self): """ diff --git a/core/static/js/column-shifter.js b/core/static/js/column-shifter.js index 1d1180a..7000d00 100644 --- a/core/static/js/column-shifter.js +++ b/core/static/js/column-shifter.js @@ -66,10 +66,11 @@ $(document).ready(function(){ "file_size": "off", "lang_code": "off", "tokens": "off", - "rule_uuid": "off", + "rule_id": "off", "index": "off", "meta": "off", "match_ts": "off", + "batch_id": "off", //"lang_name": "off", // "words_noun": "off", // "words_adj": "off",