diff --git a/core/db/elastic.py b/core/db/elastic.py index 0ed9668..c0326e2 100644 --- a/core/db/elastic.py +++ b/core/db/elastic.py @@ -280,6 +280,20 @@ class ElasticsearchBackend(StorageBackend): self.log.error(f"Indexing failed: {result}") self.log.debug(f"Indexed {len(matches)} messages in ES") + def store_matches(self, matches): + """ + Store a list of matches in Elasticsearch. + :param index: The index to store the matches in. + :param matches: A list of matches to store. + """ + if self.client is None: + self.initialise() + for match in matches: + result = self.client.index(index=settings.INDEX_RULE_STORAGE, body=match) + if not result["result"] == "created": + self.log.error(f"Indexing failed: {result}") + self.log.debug(f"Indexed {len(matches)} messages in ES") + async def schedule_query_results(self, rule_object): """ Helper to run a scheduled query with reduced functionality and async. diff --git a/core/lib/rules.py b/core/lib/rules.py index 607ff5a..2a83353 100644 --- a/core/lib/rules.py +++ b/core/lib/rules.py @@ -275,12 +275,7 @@ class NotificationRuleData(object): return - async def ingest_matches(self, index, matches, meta, mode): - """ - Store all matches for an index. - :param index: the index to store the matches for - :param matches: the matches to store - """ + def reform_matches(self, index, matches, meta, mode): if not isinstance(matches, list): matches = [matches] matches_copy = matches.copy() @@ -291,7 +286,25 @@ class NotificationRuleData(object): matches_copy[match_index]["meta"] = meta matches_copy[match_index]["match_ts"] = match_ts matches_copy[match_index]["mode"] = mode - await self.db.async_store_matches(matches_copy) + return matches_copy + + async def ingest_matches(self, index, matches, meta, mode): + """ + Store all matches for an index. + :param index: the index to store the matches for + :param matches: the matches to store + """ + new_matches = self.reform_matches(index, matches, meta, mode) + await self.db.async_store_matches(new_matches) + + def ingest_matches_sync(self, index, matches, meta, mode): + """ + Store all matches for an index. + :param index: the index to store the matches for + :param matches: the matches to store + """ + new_matches = self.reform_matches(index, matches, meta, mode) + self.db.store_matches(new_matches) async def rule_matched(self, index, message, meta, mode): """ @@ -312,6 +325,25 @@ class NotificationRuleData(object): self.store_match(index, message) await self.ingest_matches(index, message, meta, mode) + def rule_matched_sync(self, index, message, meta, mode): + """ + A rule has matched. + If the previous run did not match, send a notification after formatting + the aggregations. + :param index: the index the rule matched on + :param message: the message object that matched + :param aggs: the aggregations that matched + """ + current_match = self.get_match(index, message) + log.debug(f"Rule matched: {index} - current match: {current_match}") + if current_match is False: + # Matched now, but not before + if "matched" not in meta: + meta["matched"] = self.format_aggs(meta["aggs"]) + rule_notify(self.object, index, message, meta) + self.store_match(index, message) + self.ingest_matches_sync(index, message, meta, mode) + async def rule_no_match(self, index=None): """ A rule has not matched. diff --git a/core/management/commands/processing.py b/core/management/commands/processing.py index 45a3d54..88e261f 100644 --- a/core/management/commands/processing.py +++ b/core/management/commands/processing.py @@ -1,5 +1,4 @@ import msgpack -from asgiref.sync import async_to_sync from django.core.management.base import BaseCommand from redis import StrictRedis @@ -83,13 +82,13 @@ def process_rules(data): meta = {"matched": matched, "total_hits": 1} # Parse the rule, we saved some work above to avoid doing this, - # but it makes delivering messages significantly easier as we ca + # but it makes delivering messages significantly easier as we can # use the same code as for scheduling. rule_data_object = NotificationRuleData(rule.user, rule, db=db) # rule_notify(rule, index, message, meta=meta) - print("ABOUT TO RUN ASYNC TO SYNC") - rule_matched = async_to_sync(rule_data_object.rule_matched) - rule_matched(index, message, meta=meta, mode="ondemand") + rule_data_object.rule_matched_sync( + index, message, meta=meta, mode="ondemand" + ) class Command(BaseCommand):