Add extra checks on hash lookups

This commit is contained in:
Mark Veidemanis 2022-08-27 12:20:36 +01:00
parent 850d00de19
commit c4f17dd5fb
Signed by: m
GPG Key ID: 5ACFCEED46C0904F
3 changed files with 68 additions and 14 deletions

View File

@ -36,6 +36,8 @@ OBFUSCATE_DASH_NUM = 2
# DON'T obfuscate the last X fields of values separates by colons
OBFUSCATE_COLON_NUM = 1
SEARCH_FIELDS_DENY = ["ts", "date", "time"]
# Common to encryption and hashing
WHITELIST_FIELDS = [
"ts",
@ -47,15 +49,32 @@ WHITELIST_FIELDS = [
"num_chans",
"num_users",
"online",
"tokens",
"src",
"exemption",
"hidden",
"type",
]
# Don't obfuscate these parameters, or lookup hashes in them
NO_OBFUSCATE_PARAMS = [
"query",
# "query_full",
"size",
"source",
"sorting",
"tags",
"index",
"dedup",
"check_sentiment",
"sentiment_method",
"dates",
"sort",
"page",
]
# Don't allow tag search for these parameters
TAG_SEARCH_DENY = [
"query",
"query_full",
"size",
"source",
@ -66,8 +85,11 @@ NO_OBFUSCATE_PARAMS = [
"check_sentiment",
"sentiment_method",
"dates",
"sort",
"page",
]
OPENSEARCH_BLACKLISTED = {}
# URLs

View File

@ -6,6 +6,7 @@ from opensearchpy.exceptions import NotFoundError, RequestError
from core.lib.threshold import annotate_num_chans, annotate_num_users, annotate_online
from core.views.helpers import (
LookupDenied,
SearchDenied,
dedup_list,
encrypt_list,
@ -309,12 +310,28 @@ def query_results(
query_created = False
# Lookup the hash values but don't disclose them to the user
denied = []
if lookup_hashes:
if settings.HASHING:
query_params = deepcopy(query_params)
hash_lookup(request.user, query_params)
denied_q = hash_lookup(request.user, query_params)
denied.extend(denied_q)
if tags:
hash_lookup(request.user, tags)
denied_t = hash_lookup(request.user, tags)
denied.extend(denied_t)
message = []
for x in denied:
if isinstance(x, SearchDenied):
message.append(f"Permission denied to search by {x.key}: {x.value}")
elif isinstance(x, LookupDenied):
message.append(f"Tag {x.key}: {x.value} not expected here. Nice try.")
if denied:
print("DENIED DICT", message)
message = [f"{i}" for i in message]
message = "\n".join(message)
message_class = "danger"
return {"message": message, "class": message_class}
if request.user.is_anonymous:
sizes = settings.OPENSEARCH_MAIN_SIZES_ANON
@ -440,11 +457,6 @@ def query_results(
# search_query["query"]["bool"] = {"must": []}
for item in add_bool:
k, v = list(item.items())[0]
if isinstance(v, SearchDenied):
message = f"Access denied: search by protected field {k}: {v.value}"
message_class = "danger"
return {"message": message, "class": message_class}
search_query["query"]["bool"]["must"].append({"match_phrase": item})
if add_top:
for item in add_top:

View File

@ -11,7 +11,14 @@ from core import r
class SearchDenied:
def __init__(self, value):
def __init__(self, key, value):
self.key = key
self.value = value
class LookupDenied:
def __init__(self, key, value):
self.key = key
self.value = value
@ -84,7 +91,7 @@ def obfuscate_list(user, data):
"""
Obfuscate data in a list of dictionaries.
"""
if user.has_perm("core.bypass_obfuscation"):
if user.has_perm("bypass_obfuscation"):
return
for index, item in enumerate(data):
for key, value in item.items():
@ -123,7 +130,7 @@ def hash_list(user, data, hash_keys=False):
"""
Hash a list of dicts or a list with SipHash42.
"""
if user.has_perm("core.bypass_hashing"):
if user.has_perm("bypass_hashing"):
return
cache = "cache.hash"
hash_table = {}
@ -172,7 +179,12 @@ def hash_list(user, data, hash_keys=False):
def hash_lookup(user, data_dict):
cache = "cache.hash"
hash_list = SortedSet()
denied = []
for key, value in list(data_dict.items()):
if key in settings.SEARCH_FIELDS_DENY:
if not user.has_perm("bypass_hashing"):
data_dict[key] = SearchDenied(key=key, value=data_dict[key])
denied.append(data_dict[key])
if (
key not in settings.WHITELIST_FIELDS
and key not in settings.NO_OBFUSCATE_PARAMS
@ -193,8 +205,15 @@ def hash_lookup(user, data_dict):
if not hashes:
# Otherwise the user could inject plaintext search queries
if not user.has_perm("bypass_hashing"):
data_dict[key] = SearchDenied(value=data_dict[key])
# del data_dict[key]
data_dict[key] = SearchDenied(key=key, value=data_dict[key])
denied.append(data_dict[key])
continue
else:
# There are hashes here but there shouldn't be!
if key in settings.TAG_SEARCH_DENY:
data_dict[key] = LookupDenied(key=key, value=data_dict[key])
denied.append(data_dict[key])
continue
for hash in hashes:
hash_list.add(hash)
@ -220,10 +239,11 @@ def hash_lookup(user, data_dict):
for k2, v2 in data_dict[key].items():
if hash in v2:
data_dict[key][k2] = v2.replace(f"{hash}", total[hash])
return denied
def encrypt_list(user, data, secret):
if user.has_perm("core.bypass_encryption"):
if user.has_perm("bypass_encryption"):
return
cipher = Cipher(algorithms.AES(secret), ECB())
for index, item in enumerate(data):