From be0cf231b49ed85911411c4ee9b04e2c3cd14ac4 Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Tue, 22 Nov 2022 21:42:35 +0000 Subject: [PATCH] Fix mapping and make Threshold talk to SSDB --- db.py | 20 ++++++++++++++++---- legacy/main.py | 5 ++++- legacy/modules/monitor.py | 2 +- 3 files changed, 21 insertions(+), 6 deletions(-) diff --git a/db.py b/db.py index 8576aef..4a82cd1 100644 --- a/db.py +++ b/db.py @@ -57,7 +57,7 @@ client = None # with strings in the field. keyword_fields = ["nick_id", "user_id", "net_id"] -mapping = { +mapping_int = { "mappings": { "properties": { "ts": {"type": "date", "format": "epoch_second"}, @@ -65,24 +65,32 @@ mapping = { } } } +mapping = dict(mapping_int) for field in keyword_fields: mapping["mappings"]["properties"][field] = {"type": "text"} +del mapping_int["mappings"]["properties"]["file_tim"] + + async def initialise_elasticsearch(): """ Initialise the Elasticsearch client. """ auth = (ELASTICSEARCH_USERNAME, ELASTICSEARCH_PASSWORD) client = AsyncElasticsearch(ELASTICSEARCH_HOST, http_auth=auth, verify_certs=False) - for index in ("main", "restricted"): + for index in ("main", "restricted", "internal"): + if index == "internal": + map_dict = mapping_int + else: + map_dict = mapping if await client.indices.exists(index=index): # update index with mapping await client.indices.put_mapping( - index=index, properties=mapping["mappings"]["properties"] + index=index, properties=map_dict["mappings"]["properties"] ) else: - await client.indices.create(index=index, mappings=mapping["mappings"]) + await client.indices.create(index=index, mappings=map_dict["mappings"]) return client @@ -119,9 +127,13 @@ async def store_batch(data): else: indexmap[INDEX].append(msg) + print("KEYS", indexmap.keys()) + for index, index_messages in indexmap.items(): for message in index_messages: result = await client.index(index=index, body=message) + if index == "internal": + print("INTERNAL RES", result) if not result["result"] == "created": log.error(f"Indexing failed: {result}") log.debug(f"Indexed {len(data)} messages in ES") diff --git a/legacy/main.py b/legacy/main.py index 142b1fa..3239227 100644 --- a/legacy/main.py +++ b/legacy/main.py @@ -4,6 +4,7 @@ from os import urandom from os.path import exists from string import digits +import redis from redis import StrictRedis # List of errors ZNC can give us @@ -121,7 +122,7 @@ def initConf(): def initMain(): - global r, g + global r, g, x initConf() r = StrictRedis( unix_socket_path=config["RedisSocket"], db=config["RedisDBEphemeral"] # noqa @@ -129,3 +130,5 @@ def initMain(): g = StrictRedis( unix_socket_path=config["RedisSocket"], db=config["RedisDBPersistent"] ) # noqa + # SSDB for communication with Monolith + x = redis.from_url("redis://ssdb:1289", db=0) diff --git a/legacy/modules/monitor.py b/legacy/modules/monitor.py index c9189b3..fb57253 100644 --- a/legacy/modules/monitor.py +++ b/legacy/modules/monitor.py @@ -67,7 +67,7 @@ def parsemeta(numName, c): def queue_message(c): message = json.dumps(c) - main.g.lpush("queue", message) + main.x.lpush("queue", message) def event(