diff --git a/core/db/elastic.py b/core/db/elastic.py index 03107e2..c05fc42 100644 --- a/core/db/elastic.py +++ b/core/db/elastic.py @@ -309,20 +309,21 @@ class ElasticsearchBackend(StorageBackend): if len(response["hits"]["hits"]) == 0: # No results, skip continue - aggs, response = self.parse(response, aggs=True) + meta, response = self.parse(response, meta=True) + print("Parsed response", response) if "message" in response: self.log.error(f"Error running scheduled search: {response['message']}") continue - result_map[index] = (aggs, response) + result_map[index] = (meta, response) # Average aggregation check # Could probably do this in elasticsearch - for index, (aggs, result) in result_map.items(): + for index, (meta, result) in result_map.items(): # Default to true, if no aggs are found, we still want to match match = True for agg_name, (operator, number) in rule_object.aggs.items(): - if agg_name in aggs: - agg_value = aggs[agg_name]["value"] + if agg_name in meta: + agg_value = meta["aggs"][agg_name]["value"] # TODO: simplify this, match is default to True if operator == ">": @@ -345,7 +346,7 @@ class ElasticsearchBackend(StorageBackend): else: # No aggregation found, but it is required match = False - result_map[index][0][agg_name]["match"] = match + result_map[index][0]["aggs"][agg_name]["match"] = match return result_map diff --git a/core/db/processing.py b/core/db/processing.py index 11b1656..0f41a7b 100644 --- a/core/db/processing.py +++ b/core/db/processing.py @@ -58,7 +58,7 @@ def annotate_results(results): item["num_chans"] = num_chans[item["nick"]] -def parse_results(results, aggs=None): +def parse_results(results, meta=None): results_parsed = [] stringify = ["host", "channel"] if "hits" in results.keys(): @@ -110,13 +110,13 @@ def parse_results(results, aggs=None): else: element["time"] = time results_parsed.append(element) - if aggs: - aggregations = {} + if meta: + meta = {"aggs": {}} if "aggregations" in results: for field in ["avg_sentiment"]: # Add other number fields here if field in results["aggregations"]: - aggregations[field] = results["aggregations"][field] - return (aggregations, results_parsed) + meta["aggs"][field] = results["aggregations"][field] + return (meta, results_parsed) return results_parsed diff --git a/core/lib/rules.py b/core/lib/rules.py index fa1b26e..1159c0b 100644 --- a/core/lib/rules.py +++ b/core/lib/rules.py @@ -292,7 +292,7 @@ class NotificationRuleData(object): if not response: # No results in the result_map self.rule_no_match() - for index, (aggs, results) in response.items(): + for index, (meta, results) in response.items(): if not results: # Falsy results, no matches self.rule_not_matched(index) @@ -300,15 +300,17 @@ class NotificationRuleData(object): # Add the match values of all aggregations to a list aggs_for_index = [] for agg_name in self.aggs.keys(): - if agg_name in aggs: - if "match" in aggs[agg_name]: - aggs_for_index.append(aggs[agg_name]["match"]) + if agg_name in meta["aggs"]: + if "match" in meta["aggs"][agg_name]: + aggs_for_index.append(meta["aggs"][agg_name]["match"]) # All required aggs are present if len(aggs_for_index) == len(self.aggs.keys()): if all(aggs_for_index): # All aggs have matched - self.rule_matched(index, results[: self.object.amount], aggs) + self.rule_matched( + index, results[: self.object.amount], meta["aggs"] + ) continue # Default branch, since the happy path has a continue keyword self.rule_no_match(index) @@ -536,7 +538,7 @@ class NotificationRuleData(object): try: self.parsed = load(self.data, Loader=Loader) except (ScannerError, ParserError) as e: - raise RuleParseError("data", f"Invalid YAML: {e}") + raise RuleParseError(f"Invalid YAML: {e}", "data") def __str__(self): """