diff --git a/.gitignore b/.gitignore index e1105e8..b63f740 100644 --- a/.gitignore +++ b/.gitignore @@ -154,4 +154,5 @@ cython_debug/ .idea/ .bash_history +.python_history .vscode/ diff --git a/core/db/elastic.py b/core/db/elastic.py index 7f24079..177f037 100644 --- a/core/db/elastic.py +++ b/core/db/elastic.py @@ -20,6 +20,24 @@ from core.lib.parsing import ( parse_source, ) +# These are sometimes numeric, sometimes strings. +# If they are seen to be numeric first, ES will erroneously +# index them as "long" and then subsequently fail to index messages +# with strings in the field. +keyword_fields = ["nick_id", "user_id", "net_id"] + +mapping = { + "mappings": { + "properties": { + "ts": {"type": "date", "format": "epoch_second"}, + "match_ts": {"type": "date", "format": "iso8601"}, + "file_tim": {"type": "date", "format": "epoch_millis"}, + } + } +} +for field in keyword_fields: + mapping["mappings"]["properties"][field] = {"type": "text"} + class ElasticsearchBackend(StorageBackend): def __init__(self): @@ -41,12 +59,24 @@ class ElasticsearchBackend(StorageBackend): """ Inititialise the Elasticsearch API endpoint in async mode. """ + global mapping auth = (settings.ELASTICSEARCH_USERNAME, settings.ELASTICSEARCH_PASSWORD) client = AsyncElasticsearch( settings.ELASTICSEARCH_URL, http_auth=auth, verify_certs=False ) self.async_client = client + # Create the rule storage indices + if await client.indices.exists(index=settings.INDEX_RULE_STORAGE): + await client.indices.put_mapping( + index=settings.INDEX_RULE_STORAGE, + properties=mapping["mappings"]["properties"], + ) + else: + await client.indices.create( + index=settings.INDEX_RULE_STORAGE, mappings=mapping["mappings"] + ) + def construct_context_query( self, index, net, channel, src, num, size, type=None, nicks=None ): @@ -232,6 +262,23 @@ class ElasticsearchBackend(StorageBackend): return err return response + async def async_store_matches(self, matches): + """ + Store a list of matches in Elasticsearch. + :param index: The index to store the matches in. + :param matches: A list of matches to store. + """ + if self.async_client is None: + await self.async_initialise() + for match in matches: + print("INDEXING", match) + result = await self.async_client.index( + index=settings.INDEX_RULE_STORAGE, body=match + ) + if not result["result"] == "created": + self.log.error(f"Indexing failed: {result}") + self.log.debug(f"Indexed {len(matches)} messages in ES") + async def schedule_query_results(self, rule_object): """ Helper to run a scheduled query with reduced functionality and async. diff --git a/core/lib/rules.py b/core/lib/rules.py index 4f36d61..ca94bd1 100644 --- a/core/lib/rules.py +++ b/core/lib/rules.py @@ -8,6 +8,8 @@ try: except ImportError: from yaml import Loader, Dumper +from datetime import datetime + import orjson from asgiref.sync import async_to_sync from siphashc import siphash @@ -271,9 +273,27 @@ class NotificationRuleData(object): op, value = self.aggs[agg_name] new_aggs[agg_name] = f"{agg['value']}{op}{value}" - return new_aggs + return + + async def ingest_matches(self, index, matches, meta): + """ + Store all matches for an index. + :param index: the index to store the matches for + :param matches: the matches to store + """ + if not isinstance(matches, list): + matches = [matches] + matches_copy = matches.copy() + print("MATHCES COPY: ", matches_copy) + match_ts = datetime.utcnow().isoformat() + for match_index, _ in enumerate(matches_copy): + matches_copy[match_index]["index"] = index + matches_copy[match_index]["rule_uuid"] = self.object.id + matches_copy[match_index]["meta"] = meta + matches_copy[match_index]["match_ts"] = match_ts + await self.db.async_store_matches(matches_copy) - def rule_matched(self, index, message, meta): + async def rule_matched(self, index, message, meta): """ A rule has matched. If the previous run did not match, send a notification after formatting @@ -289,8 +309,9 @@ class NotificationRuleData(object): meta["matched"] = self.format_aggs(meta["aggs"]) rule_notify(self.object, index, message, meta) self.store_match(index, message) + await self.ingest_matches(index, message, meta) - def rule_no_match(self, index=None): + async def rule_no_match(self, index=None): """ A rule has not matched. If the previous run did match, send a notification if configured to notify @@ -315,11 +336,11 @@ class NotificationRuleData(object): response = await self.db.schedule_query_results(self) if not response: # No results in the result_map - self.rule_no_match() + await self.rule_no_match() for index, (meta, results) in response.items(): if not results: # Falsy results, no matches - self.rule_not_matched(index) + await self.rule_no_match(index) # Add the match values of all aggregations to a list aggs_for_index = [] @@ -332,10 +353,10 @@ class NotificationRuleData(object): if len(aggs_for_index) == len(self.aggs.keys()): if all(aggs_for_index): # All aggs have matched - self.rule_matched(index, results[: self.object.amount], meta) + await self.rule_matched(index, results[: self.object.amount], meta) continue # Default branch, since the happy path has a continue keyword - self.rule_no_match(index) + await self.rule_no_match(index) def test_schedule(self): """ diff --git a/core/migrations/0024_alter_notificationrule_id.py b/core/migrations/0024_alter_notificationrule_id.py index c7faa82..8aa34f8 100644 --- a/core/migrations/0024_alter_notificationrule_id.py +++ b/core/migrations/0024_alter_notificationrule_id.py @@ -1,8 +1,9 @@ # Generated by Django 4.1.5 on 2023-02-02 19:08 -from django.db import migrations, models import uuid +from django.db import migrations, models + class Migration(migrations.Migration): diff --git a/core/migrations/0025_alter_notificationrule_id.py b/core/migrations/0025_alter_notificationrule_id.py index d9fa6dc..6ef374f 100644 --- a/core/migrations/0025_alter_notificationrule_id.py +++ b/core/migrations/0025_alter_notificationrule_id.py @@ -1,8 +1,9 @@ # Generated by Django 4.1.5 on 2023-02-02 19:35 -from django.db import migrations, models import uuid +from django.db import migrations, models + class Migration(migrations.Migration):