From c4f17dd5fbc3f1f785d998dee8d7f8d27bbcba4e Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Sat, 27 Aug 2022 12:20:36 +0100 Subject: [PATCH] Add extra checks on hash lookups --- app/local_settings.example.py | 24 +++++++++++++++++++++++- core/lib/opensearch.py | 26 +++++++++++++++++++------- core/views/helpers.py | 32 ++++++++++++++++++++++++++------ 3 files changed, 68 insertions(+), 14 deletions(-) diff --git a/app/local_settings.example.py b/app/local_settings.example.py index 2e24b81..6e2169e 100644 --- a/app/local_settings.example.py +++ b/app/local_settings.example.py @@ -36,6 +36,8 @@ OBFUSCATE_DASH_NUM = 2 # DON'T obfuscate the last X fields of values separates by colons OBFUSCATE_COLON_NUM = 1 +SEARCH_FIELDS_DENY = ["ts", "date", "time"] + # Common to encryption and hashing WHITELIST_FIELDS = [ "ts", @@ -47,15 +49,32 @@ WHITELIST_FIELDS = [ "num_chans", "num_users", "online", - "tokens", "src", "exemption", "hidden", + "type", ] # Don't obfuscate these parameters, or lookup hashes in them NO_OBFUSCATE_PARAMS = [ "query", +# "query_full", + "size", + "source", + "sorting", + "tags", + "index", + "dedup", + "check_sentiment", + "sentiment_method", + "dates", + "sort", + "page", +] + +# Don't allow tag search for these parameters +TAG_SEARCH_DENY = [ + "query", "query_full", "size", "source", @@ -66,8 +85,11 @@ NO_OBFUSCATE_PARAMS = [ "check_sentiment", "sentiment_method", "dates", + "sort", + "page", ] + OPENSEARCH_BLACKLISTED = {} # URLs diff --git a/core/lib/opensearch.py b/core/lib/opensearch.py index 503a86c..d9fcd18 100644 --- a/core/lib/opensearch.py +++ b/core/lib/opensearch.py @@ -6,6 +6,7 @@ from opensearchpy.exceptions import NotFoundError, RequestError from core.lib.threshold import annotate_num_chans, annotate_num_users, annotate_online from core.views.helpers import ( + LookupDenied, SearchDenied, dedup_list, encrypt_list, @@ -309,12 +310,28 @@ def query_results( query_created = False # Lookup the hash values but don't disclose them to the user + denied = [] if lookup_hashes: if settings.HASHING: query_params = deepcopy(query_params) - hash_lookup(request.user, query_params) + denied_q = hash_lookup(request.user, query_params) + denied.extend(denied_q) if tags: - hash_lookup(request.user, tags) + denied_t = hash_lookup(request.user, tags) + denied.extend(denied_t) + + message = [] + for x in denied: + if isinstance(x, SearchDenied): + message.append(f"Permission denied to search by {x.key}: {x.value}") + elif isinstance(x, LookupDenied): + message.append(f"Tag {x.key}: {x.value} not expected here. Nice try.") + if denied: + print("DENIED DICT", message) + message = [f"{i}" for i in message] + message = "\n".join(message) + message_class = "danger" + return {"message": message, "class": message_class} if request.user.is_anonymous: sizes = settings.OPENSEARCH_MAIN_SIZES_ANON @@ -440,11 +457,6 @@ def query_results( # search_query["query"]["bool"] = {"must": []} for item in add_bool: - k, v = list(item.items())[0] - if isinstance(v, SearchDenied): - message = f"Access denied: search by protected field {k}: {v.value}" - message_class = "danger" - return {"message": message, "class": message_class} search_query["query"]["bool"]["must"].append({"match_phrase": item}) if add_top: for item in add_top: diff --git a/core/views/helpers.py b/core/views/helpers.py index 85052f7..ea1af94 100644 --- a/core/views/helpers.py +++ b/core/views/helpers.py @@ -11,7 +11,14 @@ from core import r class SearchDenied: - def __init__(self, value): + def __init__(self, key, value): + self.key = key + self.value = value + + +class LookupDenied: + def __init__(self, key, value): + self.key = key self.value = value @@ -84,7 +91,7 @@ def obfuscate_list(user, data): """ Obfuscate data in a list of dictionaries. """ - if user.has_perm("core.bypass_obfuscation"): + if user.has_perm("bypass_obfuscation"): return for index, item in enumerate(data): for key, value in item.items(): @@ -123,7 +130,7 @@ def hash_list(user, data, hash_keys=False): """ Hash a list of dicts or a list with SipHash42. """ - if user.has_perm("core.bypass_hashing"): + if user.has_perm("bypass_hashing"): return cache = "cache.hash" hash_table = {} @@ -172,7 +179,12 @@ def hash_list(user, data, hash_keys=False): def hash_lookup(user, data_dict): cache = "cache.hash" hash_list = SortedSet() + denied = [] for key, value in list(data_dict.items()): + if key in settings.SEARCH_FIELDS_DENY: + if not user.has_perm("bypass_hashing"): + data_dict[key] = SearchDenied(key=key, value=data_dict[key]) + denied.append(data_dict[key]) if ( key not in settings.WHITELIST_FIELDS and key not in settings.NO_OBFUSCATE_PARAMS @@ -193,8 +205,15 @@ def hash_lookup(user, data_dict): if not hashes: # Otherwise the user could inject plaintext search queries if not user.has_perm("bypass_hashing"): - data_dict[key] = SearchDenied(value=data_dict[key]) - # del data_dict[key] + data_dict[key] = SearchDenied(key=key, value=data_dict[key]) + denied.append(data_dict[key]) + continue + else: + # There are hashes here but there shouldn't be! + if key in settings.TAG_SEARCH_DENY: + data_dict[key] = LookupDenied(key=key, value=data_dict[key]) + denied.append(data_dict[key]) + continue for hash in hashes: hash_list.add(hash) @@ -220,10 +239,11 @@ def hash_lookup(user, data_dict): for k2, v2 in data_dict[key].items(): if hash in v2: data_dict[key][k2] = v2.replace(f"{hash}", total[hash]) + return denied def encrypt_list(user, data, secret): - if user.has_perm("core.bypass_encryption"): + if user.has_perm("bypass_encryption"): return cipher = Cipher(algorithms.AES(secret), ECB()) for index, item in enumerate(data):