Fix mapping and make Threshold talk to SSDB

master
Mark Veidemanis 1 year ago
parent 1993c8f1d2
commit be0cf231b4
Signed by: m
GPG Key ID: 5ACFCEED46C0904F

20
db.py

@ -57,7 +57,7 @@ client = None
# with strings in the field. # with strings in the field.
keyword_fields = ["nick_id", "user_id", "net_id"] keyword_fields = ["nick_id", "user_id", "net_id"]
mapping = { mapping_int = {
"mappings": { "mappings": {
"properties": { "properties": {
"ts": {"type": "date", "format": "epoch_second"}, "ts": {"type": "date", "format": "epoch_second"},
@ -65,24 +65,32 @@ mapping = {
} }
} }
} }
mapping = dict(mapping_int)
for field in keyword_fields: for field in keyword_fields:
mapping["mappings"]["properties"][field] = {"type": "text"} mapping["mappings"]["properties"][field] = {"type": "text"}
del mapping_int["mappings"]["properties"]["file_tim"]
async def initialise_elasticsearch(): async def initialise_elasticsearch():
""" """
Initialise the Elasticsearch client. Initialise the Elasticsearch client.
""" """
auth = (ELASTICSEARCH_USERNAME, ELASTICSEARCH_PASSWORD) auth = (ELASTICSEARCH_USERNAME, ELASTICSEARCH_PASSWORD)
client = AsyncElasticsearch(ELASTICSEARCH_HOST, http_auth=auth, verify_certs=False) client = AsyncElasticsearch(ELASTICSEARCH_HOST, http_auth=auth, verify_certs=False)
for index in ("main", "restricted"): for index in ("main", "restricted", "internal"):
if index == "internal":
map_dict = mapping_int
else:
map_dict = mapping
if await client.indices.exists(index=index): if await client.indices.exists(index=index):
# update index with mapping # update index with mapping
await client.indices.put_mapping( await client.indices.put_mapping(
index=index, properties=mapping["mappings"]["properties"] index=index, properties=map_dict["mappings"]["properties"]
) )
else: else:
await client.indices.create(index=index, mappings=mapping["mappings"]) await client.indices.create(index=index, mappings=map_dict["mappings"])
return client return client
@ -119,9 +127,13 @@ async def store_batch(data):
else: else:
indexmap[INDEX].append(msg) indexmap[INDEX].append(msg)
print("KEYS", indexmap.keys())
for index, index_messages in indexmap.items(): for index, index_messages in indexmap.items():
for message in index_messages: for message in index_messages:
result = await client.index(index=index, body=message) result = await client.index(index=index, body=message)
if index == "internal":
print("INTERNAL RES", result)
if not result["result"] == "created": if not result["result"] == "created":
log.error(f"Indexing failed: {result}") log.error(f"Indexing failed: {result}")
log.debug(f"Indexed {len(data)} messages in ES") log.debug(f"Indexed {len(data)} messages in ES")

@ -4,6 +4,7 @@ from os import urandom
from os.path import exists from os.path import exists
from string import digits from string import digits
import redis
from redis import StrictRedis from redis import StrictRedis
# List of errors ZNC can give us # List of errors ZNC can give us
@ -121,7 +122,7 @@ def initConf():
def initMain(): def initMain():
global r, g global r, g, x
initConf() initConf()
r = StrictRedis( r = StrictRedis(
unix_socket_path=config["RedisSocket"], db=config["RedisDBEphemeral"] # noqa unix_socket_path=config["RedisSocket"], db=config["RedisDBEphemeral"] # noqa
@ -129,3 +130,5 @@ def initMain():
g = StrictRedis( g = StrictRedis(
unix_socket_path=config["RedisSocket"], db=config["RedisDBPersistent"] unix_socket_path=config["RedisSocket"], db=config["RedisDBPersistent"]
) # noqa ) # noqa
# SSDB for communication with Monolith
x = redis.from_url("redis://ssdb:1289", db=0)

@ -67,7 +67,7 @@ def parsemeta(numName, c):
def queue_message(c): def queue_message(c):
message = json.dumps(c) message = json.dumps(c)
main.g.lpush("queue", message) main.x.lpush("queue", message)
def event( def event(

Loading…
Cancel
Save