@ -294,11 +294,10 @@ class ElasticsearchBackend(StorageBackend):
self . log . error ( f " Indexing failed: { result } " )
self . log . debug ( f " Indexed { len ( matches ) } messages in ES " )
async def schedule_query_results ( self , rule_object ) :
def prepare_ schedule_query( self , rule_object ) :
"""
Helper to run a scheduled query with reduced functionality and async .
Helper to run a scheduled query with reduced functionality .
"""
data = rule_object . parsed
if " tags " in data :
@ -310,8 +309,6 @@ class ElasticsearchBackend(StorageBackend):
query = data [ " query " ] [ 0 ]
data [ " query " ] = query
result_map = { }
add_bool = [ ]
add_top = [ ]
if " source " in data :
@ -352,34 +349,13 @@ class ElasticsearchBackend(StorageBackend):
" avg " : { " field " : " sentiment " } ,
}
}
for index in data [ " index " ] :
if " message " in search_query :
self . log . error ( f " Error parsing query: { search_query [ ' message ' ] } " )
continue
response = await self . async_run_query (
rule_object . user ,
search_query ,
index = index ,
)
self . log . debug ( f " Running scheduled query on { index } : { search_query } " )
# self.log.debug(f"Response from scheduled query: {response}")
if isinstance ( response , Exception ) :
error = response . info [ " error " ] [ " root_cause " ] [ 0 ] [ " reason " ]
self . log . error ( f " Error running scheduled search: { error } " )
raise QueryError ( error )
if len ( response [ " hits " ] [ " hits " ] ) == 0 :
# No results, skip
continue
meta , response = self . parse ( response , meta = True )
# print("Parsed response", response)
if " message " in response :
self . log . error ( f " Error running scheduled search: { response [ ' message ' ] } " )
continue
result_map [ index ] = ( meta , response )
return search_query
# Average aggregation check
# Could probably do this in elasticsearch
def schedule_check_aggregations ( self , rule_object , result_map ) :
"""
Check the results of a scheduled query for aggregations .
"""
for index , ( meta , result ) in result_map . items ( ) :
# Default to true, if no aggs are found, we still want to match
match = True
@ -412,6 +388,71 @@ class ElasticsearchBackend(StorageBackend):
return result_map
def schedule_query_results_test_sync ( self , rule_object ) :
"""
Helper to run a scheduled query test with reduced functionality .
Sync version for running from Django forms .
Does not return results .
"""
data = rule_object . parsed
search_query = self . prepare_schedule_query ( rule_object )
for index in data [ " index " ] :
if " message " in search_query :
self . log . error ( f " Error parsing test query: { search_query [ ' message ' ] } " )
continue
response = self . run_query (
rule_object . user ,
search_query ,
index = index ,
)
self . log . debug ( f " Running scheduled test query on { index } : { search_query } " )
# self.log.debug(f"Response from scheduled query: {response}")
if isinstance ( response , Exception ) :
error = response . info [ " error " ] [ " root_cause " ] [ 0 ] [ " reason " ]
self . log . error ( f " Error running test scheduled search: { error } " )
raise QueryError ( error )
async def schedule_query_results ( self , rule_object ) :
"""
Helper to run a scheduled query with reduced functionality and async .
"""
result_map = { }
data = rule_object . parsed
search_query = self . prepare_schedule_query ( rule_object )
for index in data [ " index " ] :
if " message " in search_query :
self . log . error ( f " Error parsing query: { search_query [ ' message ' ] } " )
continue
response = await self . async_run_query (
rule_object . user ,
search_query ,
index = index ,
)
self . log . debug ( f " Running scheduled query on { index } : { search_query } " )
# self.log.debug(f"Response from scheduled query: {response}")
if isinstance ( response , Exception ) :
error = response . info [ " error " ] [ " root_cause " ] [ 0 ] [ " reason " ]
self . log . error ( f " Error running scheduled search: { error } " )
raise QueryError ( error )
if len ( response [ " hits " ] [ " hits " ] ) == 0 :
# No results, skip
continue
meta , response = self . parse ( response , meta = True )
# print("Parsed response", response)
if " message " in response :
self . log . error ( f " Error running scheduled search: { response [ ' message ' ] } " )
continue
result_map [ index ] = ( meta , response )
# Average aggregation check
# Could probably do this in elasticsearch
result_map = self . schedule_check_aggregations ( rule_object , result_map )
return result_map
def query_results (
self ,
request ,
@ -424,7 +465,6 @@ class ElasticsearchBackend(StorageBackend):
dedup_fields = None ,
tags = None ,
) :
add_bool = [ ]
add_top = [ ]
add_top_negative = [ ]