Ingest notification matches to ES

This commit is contained in:
Mark Veidemanis 2023-02-02 20:04:55 +00:00
parent 79b4512546
commit df1e82c5f2
Signed by: m
GPG Key ID: 5ACFCEED46C0904F
5 changed files with 80 additions and 9 deletions

1
.gitignore vendored
View File

@ -154,4 +154,5 @@ cython_debug/
.idea/ .idea/
.bash_history .bash_history
.python_history
.vscode/ .vscode/

View File

@ -20,6 +20,24 @@ from core.lib.parsing import (
parse_source, parse_source,
) )
# These are sometimes numeric, sometimes strings.
# If they are seen to be numeric first, ES will erroneously
# index them as "long" and then subsequently fail to index messages
# with strings in the field.
keyword_fields = ["nick_id", "user_id", "net_id"]
mapping = {
"mappings": {
"properties": {
"ts": {"type": "date", "format": "epoch_second"},
"match_ts": {"type": "date", "format": "iso8601"},
"file_tim": {"type": "date", "format": "epoch_millis"},
}
}
}
for field in keyword_fields:
mapping["mappings"]["properties"][field] = {"type": "text"}
class ElasticsearchBackend(StorageBackend): class ElasticsearchBackend(StorageBackend):
def __init__(self): def __init__(self):
@ -41,12 +59,24 @@ class ElasticsearchBackend(StorageBackend):
""" """
Inititialise the Elasticsearch API endpoint in async mode. Inititialise the Elasticsearch API endpoint in async mode.
""" """
global mapping
auth = (settings.ELASTICSEARCH_USERNAME, settings.ELASTICSEARCH_PASSWORD) auth = (settings.ELASTICSEARCH_USERNAME, settings.ELASTICSEARCH_PASSWORD)
client = AsyncElasticsearch( client = AsyncElasticsearch(
settings.ELASTICSEARCH_URL, http_auth=auth, verify_certs=False settings.ELASTICSEARCH_URL, http_auth=auth, verify_certs=False
) )
self.async_client = client self.async_client = client
# Create the rule storage indices
if await client.indices.exists(index=settings.INDEX_RULE_STORAGE):
await client.indices.put_mapping(
index=settings.INDEX_RULE_STORAGE,
properties=mapping["mappings"]["properties"],
)
else:
await client.indices.create(
index=settings.INDEX_RULE_STORAGE, mappings=mapping["mappings"]
)
def construct_context_query( def construct_context_query(
self, index, net, channel, src, num, size, type=None, nicks=None self, index, net, channel, src, num, size, type=None, nicks=None
): ):
@ -232,6 +262,23 @@ class ElasticsearchBackend(StorageBackend):
return err return err
return response return response
async def async_store_matches(self, matches):
"""
Store a list of matches in Elasticsearch.
:param index: The index to store the matches in.
:param matches: A list of matches to store.
"""
if self.async_client is None:
await self.async_initialise()
for match in matches:
print("INDEXING", match)
result = await self.async_client.index(
index=settings.INDEX_RULE_STORAGE, body=match
)
if not result["result"] == "created":
self.log.error(f"Indexing failed: {result}")
self.log.debug(f"Indexed {len(matches)} messages in ES")
async def schedule_query_results(self, rule_object): async def schedule_query_results(self, rule_object):
""" """
Helper to run a scheduled query with reduced functionality and async. Helper to run a scheduled query with reduced functionality and async.

View File

@ -8,6 +8,8 @@ try:
except ImportError: except ImportError:
from yaml import Loader, Dumper from yaml import Loader, Dumper
from datetime import datetime
import orjson import orjson
from asgiref.sync import async_to_sync from asgiref.sync import async_to_sync
from siphashc import siphash from siphashc import siphash
@ -271,9 +273,27 @@ class NotificationRuleData(object):
op, value = self.aggs[agg_name] op, value = self.aggs[agg_name]
new_aggs[agg_name] = f"{agg['value']}{op}{value}" new_aggs[agg_name] = f"{agg['value']}{op}{value}"
return new_aggs return
def rule_matched(self, index, message, meta): async def ingest_matches(self, index, matches, meta):
"""
Store all matches for an index.
:param index: the index to store the matches for
:param matches: the matches to store
"""
if not isinstance(matches, list):
matches = [matches]
matches_copy = matches.copy()
print("MATHCES COPY: ", matches_copy)
match_ts = datetime.utcnow().isoformat()
for match_index, _ in enumerate(matches_copy):
matches_copy[match_index]["index"] = index
matches_copy[match_index]["rule_uuid"] = self.object.id
matches_copy[match_index]["meta"] = meta
matches_copy[match_index]["match_ts"] = match_ts
await self.db.async_store_matches(matches_copy)
async def rule_matched(self, index, message, meta):
""" """
A rule has matched. A rule has matched.
If the previous run did not match, send a notification after formatting If the previous run did not match, send a notification after formatting
@ -289,8 +309,9 @@ class NotificationRuleData(object):
meta["matched"] = self.format_aggs(meta["aggs"]) meta["matched"] = self.format_aggs(meta["aggs"])
rule_notify(self.object, index, message, meta) rule_notify(self.object, index, message, meta)
self.store_match(index, message) self.store_match(index, message)
await self.ingest_matches(index, message, meta)
def rule_no_match(self, index=None): async def rule_no_match(self, index=None):
""" """
A rule has not matched. A rule has not matched.
If the previous run did match, send a notification if configured to notify If the previous run did match, send a notification if configured to notify
@ -315,11 +336,11 @@ class NotificationRuleData(object):
response = await self.db.schedule_query_results(self) response = await self.db.schedule_query_results(self)
if not response: if not response:
# No results in the result_map # No results in the result_map
self.rule_no_match() await self.rule_no_match()
for index, (meta, results) in response.items(): for index, (meta, results) in response.items():
if not results: if not results:
# Falsy results, no matches # Falsy results, no matches
self.rule_not_matched(index) await self.rule_no_match(index)
# Add the match values of all aggregations to a list # Add the match values of all aggregations to a list
aggs_for_index = [] aggs_for_index = []
@ -332,10 +353,10 @@ class NotificationRuleData(object):
if len(aggs_for_index) == len(self.aggs.keys()): if len(aggs_for_index) == len(self.aggs.keys()):
if all(aggs_for_index): if all(aggs_for_index):
# All aggs have matched # All aggs have matched
self.rule_matched(index, results[: self.object.amount], meta) await self.rule_matched(index, results[: self.object.amount], meta)
continue continue
# Default branch, since the happy path has a continue keyword # Default branch, since the happy path has a continue keyword
self.rule_no_match(index) await self.rule_no_match(index)
def test_schedule(self): def test_schedule(self):
""" """

View File

@ -1,8 +1,9 @@
# Generated by Django 4.1.5 on 2023-02-02 19:08 # Generated by Django 4.1.5 on 2023-02-02 19:08
from django.db import migrations, models
import uuid import uuid
from django.db import migrations, models
class Migration(migrations.Migration): class Migration(migrations.Migration):

View File

@ -1,8 +1,9 @@
# Generated by Django 4.1.5 on 2023-02-02 19:35 # Generated by Django 4.1.5 on 2023-02-02 19:35
from django.db import migrations, models
import uuid import uuid
from django.db import migrations, models
class Migration(migrations.Migration): class Migration(migrations.Migration):