from django.conf import settings from opensearchpy import OpenSearch def initialise_opensearch(): auth = (settings.OPENSEARCH_USERNAME, settings.OPENSEARCH_PASSWORD) client = OpenSearch( # fmt: off hosts=[{"host": settings.OPENSEARCH_URL, "port": settings.OPENSEARCH_PORT}], http_compress=False, # enables gzip compression for request bodies http_auth=auth, # client_cert = client_cert_path, # client_key = client_key_path, use_ssl=settings.OPENSEARCH_TLS, verify_certs=False, ssl_assert_hostname=False, ssl_show_warn=False, # a_certs=ca_certs_path, ) return client def construct_query(query, fields, results): if not fields: fields = settings.OPENSEARCH_MAIN_SEARCH_FIELDS if not results: results = 5 query = { "size": results, "query": { "query_string": { "query": query, "fields": fields, # "default_field": "msg", # "type": "best_fields", "fuzziness": "AUTO", "fuzzy_transpositions": True, "fuzzy_max_expansions": 50, "fuzzy_prefix_length": 0, # "minimum_should_match": 1, "default_operator": "or", "analyzer": "standard", "lenient": True, "boost": 1, "allow_leading_wildcard": True, # "enable_position_increments": False, "phrase_slop": 3, # "max_determinized_states": 10000, "quote_field_suffix": "", "quote_analyzer": "standard", "analyze_wildcard": False, "auto_generate_synonyms_phrase_query": True, } }, } return query def run_main_query(client, query, fields=None, results=None): search_query = construct_query(query, fields, results) # fmt: off response = client.search(body=search_query, index=settings.OPENSEARCH_INDEX_MAIN) return response