Implement policy parsing and add batch_id to rules
This commit is contained in:
parent
fd10a4ba8e
commit
0d564788b6
|
@ -118,6 +118,7 @@ def parse_source(user, query_params, raise_error=False):
|
||||||
if source:
|
if source:
|
||||||
sources = [source]
|
sources = [source]
|
||||||
else:
|
else:
|
||||||
|
print("NOT SOURCE")
|
||||||
sources = list(settings.MAIN_SOURCES)
|
sources = list(settings.MAIN_SOURCES)
|
||||||
if user.has_perm("core.restricted_sources"):
|
if user.has_perm("core.restricted_sources"):
|
||||||
for source_iter in settings.SOURCES_RESTRICTED:
|
for source_iter in settings.SOURCES_RESTRICTED:
|
||||||
|
|
|
@ -8,6 +8,7 @@ try:
|
||||||
except ImportError:
|
except ImportError:
|
||||||
from yaml import Loader, Dumper
|
from yaml import Loader, Dumper
|
||||||
|
|
||||||
|
import uuid
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
import orjson
|
import orjson
|
||||||
|
@ -286,12 +287,18 @@ class NotificationRuleData(object):
|
||||||
matches = [matches]
|
matches = [matches]
|
||||||
matches_copy = matches.copy()
|
matches_copy = matches.copy()
|
||||||
match_ts = datetime.utcnow().isoformat()
|
match_ts = datetime.utcnow().isoformat()
|
||||||
|
batch_id = uuid.uuid4()
|
||||||
|
|
||||||
|
# Filter empty fields in meta
|
||||||
|
meta = {k: v for k, v in meta.items() if v}
|
||||||
|
|
||||||
for match_index, _ in enumerate(matches_copy):
|
for match_index, _ in enumerate(matches_copy):
|
||||||
matches_copy[match_index]["index"] = index
|
matches_copy[match_index]["index"] = index
|
||||||
matches_copy[match_index]["rule_uuid"] = self.object.id
|
matches_copy[match_index]["rule_id"] = str(self.object.id)
|
||||||
matches_copy[match_index]["meta"] = meta
|
matches_copy[match_index]["meta"] = meta
|
||||||
matches_copy[match_index]["match_ts"] = match_ts
|
matches_copy[match_index]["match_ts"] = match_ts
|
||||||
matches_copy[match_index]["mode"] = mode
|
matches_copy[match_index]["mode"] = mode
|
||||||
|
matches_copy[match_index]["batch_id"] = str(batch_id)
|
||||||
return matches_copy
|
return matches_copy
|
||||||
|
|
||||||
async def ingest_matches(self, index, matches, meta, mode):
|
async def ingest_matches(self, index, matches, meta, mode):
|
||||||
|
@ -324,15 +331,25 @@ class NotificationRuleData(object):
|
||||||
current_match = self.get_match(index, message)
|
current_match = self.get_match(index, message)
|
||||||
log.debug(f"Rule matched: {index} - current match: {current_match}")
|
log.debug(f"Rule matched: {index} - current match: {current_match}")
|
||||||
|
|
||||||
# Default policy: Trigger only when results change
|
last_run_had_matches = current_match is True
|
||||||
|
|
||||||
if current_match is False:
|
if self.policy in ["change", "default"]:
|
||||||
# Matched now, but not before
|
# Change or Default policy, notifying only on new results
|
||||||
if "matched" not in meta:
|
if last_run_had_matches:
|
||||||
meta["matched"] = self.format_aggs(meta["aggs"])
|
# Last run had matches, and this one did too
|
||||||
rule_notify(self.object, index, message, meta)
|
# We don't need to notify
|
||||||
self.store_match(index, message)
|
return
|
||||||
await self.ingest_matches(index, message, meta, mode)
|
|
||||||
|
elif self.policy == "always":
|
||||||
|
# Only here for completeness, we notify below by default
|
||||||
|
pass
|
||||||
|
|
||||||
|
# We hit the return above if we don't need to notify
|
||||||
|
if "aggs" in meta and "matched" not in meta:
|
||||||
|
meta["matched"] = self.format_aggs(meta["aggs"])
|
||||||
|
rule_notify(self.object, index, message, meta)
|
||||||
|
self.store_match(index, message)
|
||||||
|
await self.ingest_matches(index, message, meta, mode)
|
||||||
|
|
||||||
def rule_matched_sync(self, index, message, meta, mode):
|
def rule_matched_sync(self, index, message, meta, mode):
|
||||||
"""
|
"""
|
||||||
|
@ -346,14 +363,25 @@ class NotificationRuleData(object):
|
||||||
current_match = self.get_match(index, message)
|
current_match = self.get_match(index, message)
|
||||||
log.debug(f"Rule matched: {index} - current match: {current_match}")
|
log.debug(f"Rule matched: {index} - current match: {current_match}")
|
||||||
|
|
||||||
# Default policy: Trigger only when results change
|
last_run_had_matches = current_match is True
|
||||||
if current_match is False:
|
|
||||||
# Matched now, but not before
|
if self.policy in ["change", "default"]:
|
||||||
if "matched" not in meta:
|
# Change or Default policy, notifying only on new results
|
||||||
meta["matched"] = self.format_aggs(meta["aggs"])
|
if last_run_had_matches:
|
||||||
rule_notify(self.object, index, message, meta)
|
# Last run had matches, and this one did too
|
||||||
self.store_match(index, message)
|
# We don't need to notify
|
||||||
self.ingest_matches_sync(index, message, meta, mode)
|
return
|
||||||
|
|
||||||
|
elif self.policy == "always":
|
||||||
|
# Only here for completeness, we notify below by default
|
||||||
|
pass
|
||||||
|
|
||||||
|
# We hit the return above if we don't need to notify
|
||||||
|
if "aggs" in meta and "matched" not in meta:
|
||||||
|
meta["matched"] = self.format_aggs(meta["aggs"])
|
||||||
|
rule_notify(self.object, index, message, meta)
|
||||||
|
self.store_match(index, message)
|
||||||
|
self.ingest_matches_sync(index, message, meta, mode)
|
||||||
|
|
||||||
# No async helper for this one as we only need it for schedules
|
# No async helper for this one as we only need it for schedules
|
||||||
async def rule_no_match(self, index=None, message=None):
|
async def rule_no_match(self, index=None, message=None):
|
||||||
|
@ -367,15 +395,29 @@ class NotificationRuleData(object):
|
||||||
current_match = self.get_match(index)
|
current_match = self.get_match(index)
|
||||||
log.debug(f"Rule not matched: {index} - current match: {current_match}")
|
log.debug(f"Rule not matched: {index} - current match: {current_match}")
|
||||||
|
|
||||||
# Change policy: When there are no results following a successful run
|
last_run_had_matches = current_match is True
|
||||||
if current_match is True:
|
if self.policy in ["change", "default"]:
|
||||||
# Matched before, but not now
|
print("policy in change or default")
|
||||||
if self.object.send_empty:
|
# Change or Default policy, notifying only on new results
|
||||||
rule_notify(self.object, index, "no_match", None)
|
if not last_run_had_matches:
|
||||||
self.store_match(index, False)
|
print("last run did not have matches")
|
||||||
await self.ingest_matches(
|
# Last run did not have matches, nor did this one
|
||||||
index=index, message={}, meta={"msg": message}, mode="schedule"
|
# We don't need to notify
|
||||||
)
|
return
|
||||||
|
|
||||||
|
elif self.policy == "always":
|
||||||
|
print("policy is always")
|
||||||
|
# Only here for completeness, we notify below by default
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Matched before, but not now
|
||||||
|
if self.policy in ["change", "always"]:
|
||||||
|
print("policy in change or always")
|
||||||
|
rule_notify(self.object, index, "no_match", None)
|
||||||
|
self.store_match(index, False)
|
||||||
|
await self.ingest_matches(
|
||||||
|
index=index, matches=[{"msg": None}], meta={"msg": message}, mode="schedule"
|
||||||
|
)
|
||||||
|
|
||||||
async def run_schedule(self):
|
async def run_schedule(self):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -66,10 +66,11 @@ $(document).ready(function(){
|
||||||
"file_size": "off",
|
"file_size": "off",
|
||||||
"lang_code": "off",
|
"lang_code": "off",
|
||||||
"tokens": "off",
|
"tokens": "off",
|
||||||
"rule_uuid": "off",
|
"rule_id": "off",
|
||||||
"index": "off",
|
"index": "off",
|
||||||
"meta": "off",
|
"meta": "off",
|
||||||
"match_ts": "off",
|
"match_ts": "off",
|
||||||
|
"batch_id": "off",
|
||||||
//"lang_name": "off",
|
//"lang_name": "off",
|
||||||
// "words_noun": "off",
|
// "words_noun": "off",
|
||||||
// "words_adj": "off",
|
// "words_adj": "off",
|
||||||
|
|
Loading…
Reference in New Issue