Improve data security by mandating token search

This commit is contained in:
2022-08-26 17:16:55 +01:00
parent e85fa910aa
commit 3f02c61463
9 changed files with 309 additions and 143 deletions

View File

@@ -10,6 +10,11 @@ from sortedcontainers import SortedSet
from core import r
class SearchDenied:
def __init__(self, value):
self.value = value
def dedup_list(data, check_keys):
"""
Remove duplicate dictionaries from list.
@@ -90,7 +95,10 @@ def hash_list(user, data, hash_keys=False):
for index, item in enumerate(data_copy):
if isinstance(item, dict):
for key, value in list(item.items()):
if key not in settings.WHITELIST_FIELDS:
if (
key not in settings.WHITELIST_FIELDS
and key not in settings.NO_OBFUSCATE_PARAMS
):
if isinstance(value, int):
value = str(value)
if isinstance(value, bool):
@@ -122,18 +130,35 @@ def hash_list(user, data, hash_keys=False):
r.hmset(cache, hash_table)
def hash_lookup(data_dict):
def hash_lookup(user, data_dict):
cache = "cache.hash"
hash_list = SortedSet()
for key, value in data_dict.items():
if not value:
continue
# hashes = re.findall("\|([^\|]*)\|", value) # noqa
hashes = re.findall("[A-Z0-9]{12,13}", value)
if not hashes:
continue
for hash in hashes:
hash_list.add(hash)
for key, value in list(data_dict.items()):
if (
key not in settings.WHITELIST_FIELDS
and key not in settings.NO_OBFUSCATE_PARAMS
):
if not value:
continue
# hashes = re.findall("\|([^\|]*)\|", value) # noqa
if isinstance(value, str):
hashes = re.findall("[A-Z0-9]{12,13}", value)
elif isinstance(value, dict):
hashes = []
for key, value in value.items():
if not value:
continue
hashes_iter = re.findall("[A-Z0-9]{12,13}", value)
for h in hashes_iter:
hashes.append(h)
if not hashes:
# Otherwise the user could inject plaintext search queries
if not user.has_perm("bypass_hashing"):
data_dict[key] = SearchDenied(value=data_dict[key])
# del data_dict[key]
for hash in hashes:
hash_list.add(hash)
if hash_list:
values = r.hmget(cache, *hash_list)
@@ -147,8 +172,17 @@ def hash_lookup(data_dict):
for key in data_dict.keys():
for hash in total:
if data_dict[key]:
if hash in data_dict[key]:
data_dict[key] = data_dict[key].replace(f"{hash}", total[hash])
if isinstance(data_dict[key], str):
if hash in data_dict[key]:
print("Replacing", data_dict[key], "with", total[hash])
data_dict[key] = data_dict[key].replace(
f"{hash}", total[hash]
)
elif isinstance(data_dict[key], dict):
for k2, v2 in data_dict[key].items():
if hash in v2:
print("Replacing", v2, "with", total[hash])
data_dict[key][k2] = v2.replace(f"{hash}", total[hash])
def encrypt_list(user, data, secret):

View File

@@ -55,6 +55,21 @@ def create_tags(query):
return tags
def parse_tags(tags_pre):
"""
Parse the tags from the variable tags_pre.
"""
tags = {}
tags_spl = tags_pre.split(",")
if tags_spl:
for tag in tags_spl:
tag = tag.split(": ")
if len(tag) == 2:
key, val = tag
tags[key] = val
return tags
def make_table(context):
table = DrilldownTable(context["object_list"])
context["table"] = table
@@ -78,6 +93,8 @@ def make_graph(results):
def drilldown_search(request, return_context=False, template=None):
extra_params = {}
if not template:
template_name = "ui/drilldown/table_results.html"
else:
@@ -105,6 +122,10 @@ def drilldown_search(request, return_context=False, template=None):
query_params.update(tmp_post)
query_params.update(tmp_get)
# URI we're passing to the template for linking
if "csrfmiddlewaretoken" in query_params:
del query_params["csrfmiddlewaretoken"]
# Parse the dates
if "dates" in query_params:
dates = parse_dates(query_params["dates"])
@@ -118,24 +139,34 @@ def drilldown_search(request, return_context=False, template=None):
query_params["to_time"] = dates["to_time"]
if "query" in query_params:
context = query_results(request, query_params)
# Remove null values
if query_params["query"] == "":
del query_params["query"]
# Turn the query into tags for populating the taglist
# tags = create_tags(query_params["query"])
# context["tags"] = tags
# else:
# context = {"object_list": []}
# Turn the query into tags for populating the taglist
tags = create_tags(query_params["query"])
context["tags"] = tags
else:
context = {"object_list": []}
# Remove null values
if "query_full" in query_params:
if query_params["query_full"] == "":
del query_params["query_full"]
if "tags" in query_params:
if query_params["tags"] == "":
del query_params["tags"]
else:
tags = parse_tags(query_params["tags"])
extra_params["tags"] = tags
context = query_results(request, query_params, **extra_params)
# Valid sizes
context["sizes"] = sizes
# URI we're passing to the template for linking
if "csrfmiddlewaretoken" in query_params:
del query_params["csrfmiddlewaretoken"]
url_params = urllib.parse.urlencode(query_params)
context["client_uri"] = url_params
context["params"] = query_params
if "message" in context:
response = render(request, template_name, context)
@@ -158,6 +189,17 @@ def drilldown_search(request, return_context=False, template=None):
clean_url_params = urllib.parse.urlencode(clean_params)
context["uri"] = clean_url_params
# Warn users trying to use query string that the simple query supersedes it
if all([x in query_params for x in ["query", "query_full"]]):
context["message"] = (
"You are searching with both query types. "
"The simple query will be used. "
"The full query will be ignored. "
"Remove the text from the simple query if you wish "
"to use the full query."
)
context["class"] = "warning"
response = render(request, template_name, context)
if request.GET:
if request.htmx:
@@ -260,7 +302,7 @@ class DrilldownContextModal(APIView):
# Lookup the hash values but don't disclose them to the user
if settings.HASHING:
SAFE_PARAMS = deepcopy(query_params)
hash_lookup(SAFE_PARAMS)
hash_lookup(request.user, SAFE_PARAMS)
else:
SAFE_PARAMS = query_params
@@ -383,7 +425,7 @@ class ThresholdInfoModal(APIView):
# Lookup the hash values but don't disclose them to the user
if settings.HASHING:
SAFE_PARAMS = request.data.dict()
hash_lookup(SAFE_PARAMS)
hash_lookup(request.user, SAFE_PARAMS)
safe_net = SAFE_PARAMS["net"]
safe_nick = SAFE_PARAMS["nick"]
safe_channel = SAFE_PARAMS["channel"]