Fully implement Elasticsearch indexing

master
Mark Veidemanis 1 year ago
parent 052631c71f
commit 49f46c33ba
Signed by: m
GPG Key ID: 5ACFCEED46C0904F

113
db.py

@ -1,20 +1,17 @@
import random
from os import getenv
import aioredis
import orjson
import redis
# Kafka
from aiokafka import AIOKafkaProducer
# Elasticsearch
from elasticsearch import AsyncElasticsearch
import util
trues = ("true", "1", "t", True)
MONOLITH_KAFKA_ENABLED = getenv("MONOLITH_KAFKA_ENABLED", "false").lower() in trues
# KAFKA_TOPIC = "msg"
# INDEX = "msg"
log = util.get_logger("db")
@ -47,15 +44,54 @@ TYPES_META = ["who"]
TYPES_INT = ["conn", "highlight", "znc", "query", "self"]
KEYNAME = "queue"
ELASTICSEARCH_USERNAME = getenv("ELASTICSEARCH_USERNAME", "elastic")
ELASTICSEARCH_PASSWORD = getenv("ELASTICSEARCH_PASSWORD", "changeme")
ELASTICSEARCH_HOST = getenv("ELASTICSEARCH_HOST", "localhost")
ELASTICSEARCH_PORT = int(getenv("ELASTICSEARCH_PORT", "9200"))
ELASTICSEARCH_TLS = getenv("ELASTICSEARCH_TLS", "false") in trues
client = None
# These are sometimes numeric, sometimes strings.
# If they are seen to be numeric first, ES will erroneously
# index them as "long" and then subsequently fail to index messages
# with strings in the field.
keyword_fields = ["nick_id", "user_id", "net_id"]
mapping = {
"mappings": {
"properties": {
"ts": {"type": "date", "format": "epoch_second"},
"file_tim": {"type": "date", "format": "epoch_millis"},
}
}
}
for field in keyword_fields:
mapping["mappings"]["properties"][field] = {"type": "text"}
async def initialise_elasticsearch():
"""
Initialise the Elasticsearch client.
"""
auth = (ELASTICSEARCH_USERNAME, ELASTICSEARCH_PASSWORD)
client = AsyncElasticsearch(ELASTICSEARCH_HOST, http_auth=auth, verify_certs=False)
for index in ("main", "restricted"):
if await client.indices.exists(index=index):
# update index with mapping
await client.indices.put_mapping(
index=index, properties=mapping["mappings"]["properties"]
)
else:
await client.indices.create(index=index, mappings=mapping["mappings"])
return client
async def store_kafka_batch(data):
if not MONOLITH_KAFKA_ENABLED:
log.info(f"Not storing Kafka batch of length {len(data)}, Kafka is disabled.")
return
# log.debug(f"Storing Kafka batch of {len(data)} messages")
producer = AIOKafkaProducer(bootstrap_servers="kafka:9092")
await producer.start()
topicmap = {}
async def store_batch(data):
global client
if not client:
client = await initialise_elasticsearch()
indexmap = {}
for msg in data:
if msg["type"] in TYPES_MAIN:
# index = "main"
@ -68,7 +104,7 @@ async def store_kafka_batch(data):
index = "internal"
# schema = mc_s.schema_int
KAFKA_TOPIC = index
INDEX = index
# if key in schema:
# if isinstance(value, int):
@ -76,45 +112,20 @@ async def store_kafka_batch(data):
# "text"
# ):
# msg[key] = str(value)
body = orjson.dumps(msg)
# body = orjson.dumps(msg)
if "ts" not in msg:
raise Exception("No TS in msg")
if KAFKA_TOPIC not in topicmap:
topicmap[KAFKA_TOPIC] = [body]
if INDEX not in indexmap:
indexmap[INDEX] = [msg]
else:
topicmap[KAFKA_TOPIC].append(body)
for topic, messages in topicmap.items():
batch = producer.create_batch()
for body in messages:
metadata = batch.append(key=None, value=body, timestamp=msg["ts"])
if metadata is None:
partitions = await producer.partitions_for(topic)
partition = random.choice(tuple(partitions))
await producer.send_batch(batch, topic, partition=partition)
# log.debug(
# (
# f"{batch.record_count()} messages sent to topic "
# f"{topic} partition {partition}"
# )
# )
batch = producer.create_batch()
continue
partitions = await producer.partitions_for(topic)
partition = random.choice(tuple(partitions))
await producer.send_batch(batch, topic, partition=partition)
# log.debug(
# (
# f"{batch.record_count()} messages sent to topic "
# f"{topic} partition {partition}"
# )
# )
log.debug(
"Kafka batches sent: "
+ ", ".join([tpc + ": " + str(len(topicmap[tpc])) for tpc in topicmap])
)
await producer.stop()
indexmap[INDEX].append(msg)
for index, index_messages in indexmap.items():
for message in index_messages:
result = await client.index(index=index, body=message)
if not result["result"] == "created":
log.error(f"Indexing failed: {result}")
log.debug(f"Indexed {len(data)} messages in ES")
async def queue_message(msg):

@ -9,6 +9,10 @@ services:
- ${PORTAINER_GIT_DIR}:/code
env_file:
- ../stack.env
networks:
- default
- pathogen
- elastic
threshold:
image: pathogen/threshold:latest
@ -30,6 +34,8 @@ services:
# for development
extra_hosts:
- "host.docker.internal:host-gateway"
networks:
- default
ssdb:
image: tsl0922/ssdb
@ -38,6 +44,8 @@ services:
- "1289:1289"
environment:
- SSDB_PORT=1289
networks:
- default
tmp:
image: busybox
@ -67,9 +75,12 @@ services:
retries: 15
networks:
default:
external:
name: pathogen
default:
driver: bridge
pathogen:
external: true
elastic:
external: true
volumes:
redis_data:

@ -135,7 +135,7 @@ async def spawn_processing_threads(chunk, length):
f"{cores} threads: {len(flat_list)}"
)
)
await db.store_kafka_batch(flat_list)
await db.store_batch(flat_list)
# log.debug(f"Finished processing {len_data} messages")

@ -8,7 +8,7 @@ python-dotenv
#manticoresearch
numpy
aioredis[hiredis]
aiokafka
#aiokafka
vaderSentiment
polyglot
pyicu
@ -22,3 +22,4 @@ python-Levenshtein
orjson
uvloop
numba
elasticsearch[async]

Loading…
Cancel
Save