From 49f46c33bae17a9a1cdf3af8dbced0d583aef855 Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Tue, 22 Nov 2022 20:15:02 +0000 Subject: [PATCH] Fully implement Elasticsearch indexing --- db.py | 113 ++++++++++++++++++--------------- docker/docker-compose.prod.yml | 17 ++++- processing/process.py | 2 +- requirements.txt | 3 +- 4 files changed, 79 insertions(+), 56 deletions(-) diff --git a/db.py b/db.py index f6b3774..7140b4d 100644 --- a/db.py +++ b/db.py @@ -1,20 +1,17 @@ -import random from os import getenv import aioredis import orjson import redis -# Kafka -from aiokafka import AIOKafkaProducer +# Elasticsearch +from elasticsearch import AsyncElasticsearch import util trues = ("true", "1", "t", True) -MONOLITH_KAFKA_ENABLED = getenv("MONOLITH_KAFKA_ENABLED", "false").lower() in trues - -# KAFKA_TOPIC = "msg" +# INDEX = "msg" log = util.get_logger("db") @@ -47,15 +44,54 @@ TYPES_META = ["who"] TYPES_INT = ["conn", "highlight", "znc", "query", "self"] KEYNAME = "queue" +ELASTICSEARCH_USERNAME = getenv("ELASTICSEARCH_USERNAME", "elastic") +ELASTICSEARCH_PASSWORD = getenv("ELASTICSEARCH_PASSWORD", "changeme") +ELASTICSEARCH_HOST = getenv("ELASTICSEARCH_HOST", "localhost") +ELASTICSEARCH_PORT = int(getenv("ELASTICSEARCH_PORT", "9200")) +ELASTICSEARCH_TLS = getenv("ELASTICSEARCH_TLS", "false") in trues + +client = None + +# These are sometimes numeric, sometimes strings. +# If they are seen to be numeric first, ES will erroneously +# index them as "long" and then subsequently fail to index messages +# with strings in the field. +keyword_fields = ["nick_id", "user_id", "net_id"] + +mapping = { + "mappings": { + "properties": { + "ts": {"type": "date", "format": "epoch_second"}, + "file_tim": {"type": "date", "format": "epoch_millis"}, + } + } +} +for field in keyword_fields: + mapping["mappings"]["properties"][field] = {"type": "text"} + + +async def initialise_elasticsearch(): + """ + Initialise the Elasticsearch client. + """ + auth = (ELASTICSEARCH_USERNAME, ELASTICSEARCH_PASSWORD) + client = AsyncElasticsearch(ELASTICSEARCH_HOST, http_auth=auth, verify_certs=False) + for index in ("main", "restricted"): + if await client.indices.exists(index=index): + # update index with mapping + await client.indices.put_mapping( + index=index, properties=mapping["mappings"]["properties"] + ) + else: + await client.indices.create(index=index, mappings=mapping["mappings"]) + return client + -async def store_kafka_batch(data): - if not MONOLITH_KAFKA_ENABLED: - log.info(f"Not storing Kafka batch of length {len(data)}, Kafka is disabled.") - return - # log.debug(f"Storing Kafka batch of {len(data)} messages") - producer = AIOKafkaProducer(bootstrap_servers="kafka:9092") - await producer.start() - topicmap = {} +async def store_batch(data): + global client + if not client: + client = await initialise_elasticsearch() + indexmap = {} for msg in data: if msg["type"] in TYPES_MAIN: # index = "main" @@ -68,7 +104,7 @@ async def store_kafka_batch(data): index = "internal" # schema = mc_s.schema_int - KAFKA_TOPIC = index + INDEX = index # if key in schema: # if isinstance(value, int): @@ -76,45 +112,20 @@ async def store_kafka_batch(data): # "text" # ): # msg[key] = str(value) - body = orjson.dumps(msg) + # body = orjson.dumps(msg) if "ts" not in msg: raise Exception("No TS in msg") - if KAFKA_TOPIC not in topicmap: - topicmap[KAFKA_TOPIC] = [body] + if INDEX not in indexmap: + indexmap[INDEX] = [msg] else: - topicmap[KAFKA_TOPIC].append(body) - - for topic, messages in topicmap.items(): - batch = producer.create_batch() - for body in messages: - metadata = batch.append(key=None, value=body, timestamp=msg["ts"]) - if metadata is None: - partitions = await producer.partitions_for(topic) - partition = random.choice(tuple(partitions)) - await producer.send_batch(batch, topic, partition=partition) - # log.debug( - # ( - # f"{batch.record_count()} messages sent to topic " - # f"{topic} partition {partition}" - # ) - # ) - batch = producer.create_batch() - continue - - partitions = await producer.partitions_for(topic) - partition = random.choice(tuple(partitions)) - await producer.send_batch(batch, topic, partition=partition) - # log.debug( - # ( - # f"{batch.record_count()} messages sent to topic " - # f"{topic} partition {partition}" - # ) - # ) - log.debug( - "Kafka batches sent: " - + ", ".join([tpc + ": " + str(len(topicmap[tpc])) for tpc in topicmap]) - ) - await producer.stop() + indexmap[INDEX].append(msg) + + for index, index_messages in indexmap.items(): + for message in index_messages: + result = await client.index(index=index, body=message) + if not result["result"] == "created": + log.error(f"Indexing failed: {result}") + log.debug(f"Indexed {len(data)} messages in ES") async def queue_message(msg): diff --git a/docker/docker-compose.prod.yml b/docker/docker-compose.prod.yml index dac8c23..98eed7c 100644 --- a/docker/docker-compose.prod.yml +++ b/docker/docker-compose.prod.yml @@ -9,6 +9,10 @@ services: - ${PORTAINER_GIT_DIR}:/code env_file: - ../stack.env + networks: + - default + - pathogen + - elastic threshold: image: pathogen/threshold:latest @@ -30,6 +34,8 @@ services: # for development extra_hosts: - "host.docker.internal:host-gateway" + networks: + - default ssdb: image: tsl0922/ssdb @@ -38,6 +44,8 @@ services: - "1289:1289" environment: - SSDB_PORT=1289 + networks: + - default tmp: image: busybox @@ -67,9 +75,12 @@ services: retries: 15 networks: - default: - external: - name: pathogen + default: + driver: bridge + pathogen: + external: true + elastic: + external: true volumes: redis_data: diff --git a/processing/process.py b/processing/process.py index e9802c2..20f2eed 100644 --- a/processing/process.py +++ b/processing/process.py @@ -135,7 +135,7 @@ async def spawn_processing_threads(chunk, length): f"{cores} threads: {len(flat_list)}" ) ) - await db.store_kafka_batch(flat_list) + await db.store_batch(flat_list) # log.debug(f"Finished processing {len_data} messages") diff --git a/requirements.txt b/requirements.txt index 5bc11b4..398d175 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,7 +8,7 @@ python-dotenv #manticoresearch numpy aioredis[hiredis] -aiokafka +#aiokafka vaderSentiment polyglot pyicu @@ -22,3 +22,4 @@ python-Levenshtein orjson uvloop numba +elasticsearch[async]