Create sync versions of pathway to ingest messages to work around sync-only Django management commands
This commit is contained in:
parent
1b1dbbc76c
commit
f0455984ef
|
@ -280,6 +280,20 @@ class ElasticsearchBackend(StorageBackend):
|
||||||
self.log.error(f"Indexing failed: {result}")
|
self.log.error(f"Indexing failed: {result}")
|
||||||
self.log.debug(f"Indexed {len(matches)} messages in ES")
|
self.log.debug(f"Indexed {len(matches)} messages in ES")
|
||||||
|
|
||||||
|
def store_matches(self, matches):
|
||||||
|
"""
|
||||||
|
Store a list of matches in Elasticsearch.
|
||||||
|
:param index: The index to store the matches in.
|
||||||
|
:param matches: A list of matches to store.
|
||||||
|
"""
|
||||||
|
if self.client is None:
|
||||||
|
self.initialise()
|
||||||
|
for match in matches:
|
||||||
|
result = self.client.index(index=settings.INDEX_RULE_STORAGE, body=match)
|
||||||
|
if not result["result"] == "created":
|
||||||
|
self.log.error(f"Indexing failed: {result}")
|
||||||
|
self.log.debug(f"Indexed {len(matches)} messages in ES")
|
||||||
|
|
||||||
async def schedule_query_results(self, rule_object):
|
async def schedule_query_results(self, rule_object):
|
||||||
"""
|
"""
|
||||||
Helper to run a scheduled query with reduced functionality and async.
|
Helper to run a scheduled query with reduced functionality and async.
|
||||||
|
|
|
@ -275,12 +275,7 @@ class NotificationRuleData(object):
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
async def ingest_matches(self, index, matches, meta, mode):
|
def reform_matches(self, index, matches, meta, mode):
|
||||||
"""
|
|
||||||
Store all matches for an index.
|
|
||||||
:param index: the index to store the matches for
|
|
||||||
:param matches: the matches to store
|
|
||||||
"""
|
|
||||||
if not isinstance(matches, list):
|
if not isinstance(matches, list):
|
||||||
matches = [matches]
|
matches = [matches]
|
||||||
matches_copy = matches.copy()
|
matches_copy = matches.copy()
|
||||||
|
@ -291,7 +286,25 @@ class NotificationRuleData(object):
|
||||||
matches_copy[match_index]["meta"] = meta
|
matches_copy[match_index]["meta"] = meta
|
||||||
matches_copy[match_index]["match_ts"] = match_ts
|
matches_copy[match_index]["match_ts"] = match_ts
|
||||||
matches_copy[match_index]["mode"] = mode
|
matches_copy[match_index]["mode"] = mode
|
||||||
await self.db.async_store_matches(matches_copy)
|
return matches_copy
|
||||||
|
|
||||||
|
async def ingest_matches(self, index, matches, meta, mode):
|
||||||
|
"""
|
||||||
|
Store all matches for an index.
|
||||||
|
:param index: the index to store the matches for
|
||||||
|
:param matches: the matches to store
|
||||||
|
"""
|
||||||
|
new_matches = self.reform_matches(index, matches, meta, mode)
|
||||||
|
await self.db.async_store_matches(new_matches)
|
||||||
|
|
||||||
|
def ingest_matches_sync(self, index, matches, meta, mode):
|
||||||
|
"""
|
||||||
|
Store all matches for an index.
|
||||||
|
:param index: the index to store the matches for
|
||||||
|
:param matches: the matches to store
|
||||||
|
"""
|
||||||
|
new_matches = self.reform_matches(index, matches, meta, mode)
|
||||||
|
self.db.store_matches(new_matches)
|
||||||
|
|
||||||
async def rule_matched(self, index, message, meta, mode):
|
async def rule_matched(self, index, message, meta, mode):
|
||||||
"""
|
"""
|
||||||
|
@ -312,6 +325,25 @@ class NotificationRuleData(object):
|
||||||
self.store_match(index, message)
|
self.store_match(index, message)
|
||||||
await self.ingest_matches(index, message, meta, mode)
|
await self.ingest_matches(index, message, meta, mode)
|
||||||
|
|
||||||
|
def rule_matched_sync(self, index, message, meta, mode):
|
||||||
|
"""
|
||||||
|
A rule has matched.
|
||||||
|
If the previous run did not match, send a notification after formatting
|
||||||
|
the aggregations.
|
||||||
|
:param index: the index the rule matched on
|
||||||
|
:param message: the message object that matched
|
||||||
|
:param aggs: the aggregations that matched
|
||||||
|
"""
|
||||||
|
current_match = self.get_match(index, message)
|
||||||
|
log.debug(f"Rule matched: {index} - current match: {current_match}")
|
||||||
|
if current_match is False:
|
||||||
|
# Matched now, but not before
|
||||||
|
if "matched" not in meta:
|
||||||
|
meta["matched"] = self.format_aggs(meta["aggs"])
|
||||||
|
rule_notify(self.object, index, message, meta)
|
||||||
|
self.store_match(index, message)
|
||||||
|
self.ingest_matches_sync(index, message, meta, mode)
|
||||||
|
|
||||||
async def rule_no_match(self, index=None):
|
async def rule_no_match(self, index=None):
|
||||||
"""
|
"""
|
||||||
A rule has not matched.
|
A rule has not matched.
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
import msgpack
|
import msgpack
|
||||||
from asgiref.sync import async_to_sync
|
|
||||||
from django.core.management.base import BaseCommand
|
from django.core.management.base import BaseCommand
|
||||||
from redis import StrictRedis
|
from redis import StrictRedis
|
||||||
|
|
||||||
|
@ -83,13 +82,13 @@ def process_rules(data):
|
||||||
meta = {"matched": matched, "total_hits": 1}
|
meta = {"matched": matched, "total_hits": 1}
|
||||||
|
|
||||||
# Parse the rule, we saved some work above to avoid doing this,
|
# Parse the rule, we saved some work above to avoid doing this,
|
||||||
# but it makes delivering messages significantly easier as we ca
|
# but it makes delivering messages significantly easier as we can
|
||||||
# use the same code as for scheduling.
|
# use the same code as for scheduling.
|
||||||
rule_data_object = NotificationRuleData(rule.user, rule, db=db)
|
rule_data_object = NotificationRuleData(rule.user, rule, db=db)
|
||||||
# rule_notify(rule, index, message, meta=meta)
|
# rule_notify(rule, index, message, meta=meta)
|
||||||
print("ABOUT TO RUN ASYNC TO SYNC")
|
rule_data_object.rule_matched_sync(
|
||||||
rule_matched = async_to_sync(rule_data_object.rule_matched)
|
index, message, meta=meta, mode="ondemand"
|
||||||
rule_matched(index, message, meta=meta, mode="ondemand")
|
)
|
||||||
|
|
||||||
|
|
||||||
class Command(BaseCommand):
|
class Command(BaseCommand):
|
||||||
|
|
Loading…
Reference in New Issue