Remove redaction stuff

This commit is contained in:
2022-08-26 07:20:30 +01:00
parent cc20c545dd
commit bdee5a2aae
5 changed files with 404 additions and 396 deletions

View File

@@ -1,14 +1,14 @@
import re
from base64 import b64encode
from random import randint
# import re
# from base64 import b64encode
# from random import randint
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
from cryptography.hazmat.primitives.ciphers.modes import ECB
from django.conf import settings
from siphashc import siphash
from sortedcontainers import SortedSet
# from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
# from cryptography.hazmat.primitives.ciphers.modes import ECB
# from django.conf import settings
# from siphashc import siphash
# from sortedcontainers import SortedSet
from core import r
# from core import r
class SearchDenied:
@@ -62,248 +62,249 @@ def dedup_list(data, check_keys):
# # 1.0805372429895215
def base36encode(number, alphabet="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"):
"""Converts an integer to a base36 string."""
if not isinstance(number, (int)):
raise TypeError("number must be an integer")
# def base36encode(number, alphabet="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"):
# """Converts an integer to a base36 string."""
# if not isinstance(number, (int)):
# raise TypeError("number must be an integer")
base36 = ""
sign = ""
# base36 = ""
# sign = ""
if number < 0:
sign = "-"
number = -number
# if number < 0:
# sign = "-"
# number = -number
if 0 <= number < len(alphabet):
return sign + alphabet[number]
# if 0 <= number < len(alphabet):
# return sign + alphabet[number]
while number != 0:
number, i = divmod(number, len(alphabet))
base36 = alphabet[i] + base36
# while number != 0:
# number, i = divmod(number, len(alphabet))
# base36 = alphabet[i] + base36
return sign + base36
# return sign + base36
def base36decode(number):
return int(number, 36)
# def base36decode(number):
# return int(number, 36)
def randomise_list(user, data):
"""
Randomise data in a list of dictionaries.
"""
if user.has_perm("core.bypass_randomisation"):
return
if isinstance(data, list):
for index, item in enumerate(data):
for key, value in item.items():
if key in settings.RANDOMISE_FIELDS:
if isinstance(value, int):
min_val = value - (value * settings.RANDOMISE_RATIO)
max_val = value + (value * settings.RANDOMISE_RATIO)
new_val = randint(int(min_val), int(max_val))
data[index][key] = new_val
elif isinstance(data, dict):
for key, value in data.items():
# if key in settings.RANDOMISE_FIELDS:
if isinstance(value, int):
min_val = value - (value * settings.RANDOMISE_RATIO)
max_val = value + (value * settings.RANDOMISE_RATIO)
new_val = randint(int(min_val), int(max_val))
data[key] = new_val
# def randomise_list(user, data):
# """
# Randomise data in a list of dictionaries.
# """
# if user.has_perm("core.bypass_randomisation"):
# return
# if isinstance(data, list):
# for index, item in enumerate(data):
# for key, value in item.items():
# if key in settings.RANDOMISE_FIELDS:
# if isinstance(value, int):
# min_val = value - (value * settings.RANDOMISE_RATIO)
# max_val = value + (value * settings.RANDOMISE_RATIO)
# new_val = randint(int(min_val), int(max_val))
# data[index][key] = new_val
# elif isinstance(data, dict):
# for key, value in data.items():
# # if key in settings.RANDOMISE_FIELDS:
# if isinstance(value, int):
# min_val = value - (value * settings.RANDOMISE_RATIO)
# max_val = value + (value * settings.RANDOMISE_RATIO)
# new_val = randint(int(min_val), int(max_val))
# data[key] = new_val
def obfuscate_list(user, data):
"""
Obfuscate data in a list of dictionaries.
"""
if user.has_perm("core.bypass_obfuscation"):
return
for index, item in enumerate(data):
for key, value in item.items():
# Obfuscate a ratio of the field
if key in settings.OBFUSCATE_FIELDS:
length = len(value) - 1
split = int(length * settings.OBFUSCATE_KEEP_RATIO)
first_part = value[:split]
second_part = value[split:]
second_len = len(second_part)
second_part = "*" * second_len
data[index][key] = first_part + second_part
# Obfuscate value based on fields
# Example: 2022-02-02 -> 2022-02-**
# 14:11:12 -> 14:11:**
elif key in settings.OBFUSCATE_FIELDS_SEP:
if "-" in value:
sep = "-"
value_spl = value.split("-")
hide_num = settings.OBFUSCATE_DASH_NUM
elif ":" in value:
sep = ":"
value_spl = value.split(":")
hide_num = settings.OBFUSCATE_COLON_NUM
# def obfuscate_list(user, data):
# """
# Obfuscate data in a list of dictionaries.
# """
# if user.has_perm("core.bypass_obfuscation"):
# return
# for index, item in enumerate(data):
# for key, value in item.items():
# # Obfuscate a ratio of the field
# if key in settings.OBFUSCATE_FIELDS:
# length = len(value) - 1
# split = int(length * settings.OBFUSCATE_KEEP_RATIO)
# first_part = value[:split]
# second_part = value[split:]
# second_len = len(second_part)
# second_part = "*" * second_len
# data[index][key] = first_part + second_part
# # Obfuscate value based on fields
# # Example: 2022-02-02 -> 2022-02-**
# # 14:11:12 -> 14:11:**
# elif key in settings.OBFUSCATE_FIELDS_SEP:
# if "-" in value:
# sep = "-"
# value_spl = value.split("-")
# hide_num = settings.OBFUSCATE_DASH_NUM
# elif ":" in value:
# sep = ":"
# value_spl = value.split(":")
# hide_num = settings.OBFUSCATE_COLON_NUM
first_part = value_spl[:hide_num]
second_part = value_spl[hide_num:]
for index_x, x in enumerate(second_part):
x_len = len(x)
second_part[index_x] = "*" * x_len
result = sep.join([*first_part, *second_part])
data[index][key] = result
for key in settings.COMBINE_FIELDS:
for index, item in enumerate(data):
if key in item:
k1, k2 = settings.COMBINE_FIELDS[key]
if k1 in item and k2 in item:
data[index][key] = item[k1] + item[k2]
# first_part = value_spl[:hide_num]
# second_part = value_spl[hide_num:]
# for index_x, x in enumerate(second_part):
# x_len = len(x)
# second_part[index_x] = "*" * x_len
# result = sep.join([*first_part, *second_part])
# data[index][key] = result
# for key in settings.COMBINE_FIELDS:
# for index, item in enumerate(data):
# if key in item:
# k1, k2 = settings.COMBINE_FIELDS[key]
# if k1 in item and k2 in item:
# data[index][key] = item[k1] + item[k2]
def hash_list(user, data, hash_keys=False):
"""
Hash a list of dicts or a list with SipHash42.
"""
if user.has_perm("core.bypass_hashing"):
return
cache = "cache.hash"
hash_table = {}
if isinstance(data, dict):
data_copy = [{x: data[x]} for x in data]
else:
data_copy = type(data)((data))
for index, item in enumerate(data_copy):
if "src" in item:
if item["src"] in settings.SAFE_SOURCES:
continue
if isinstance(item, dict):
for key, value in list(item.items()):
if (
key not in settings.WHITELIST_FIELDS
and key not in settings.NO_OBFUSCATE_PARAMS
):
if isinstance(value, int):
value = str(value)
if isinstance(value, bool):
continue
if value is None:
continue
if hash_keys:
hashed = siphash(settings.HASHING_KEY, key)
else:
hashed = siphash(settings.HASHING_KEY, value)
encoded = base36encode(hashed)
if encoded not in hash_table:
if hash_keys:
hash_table[encoded] = key
else:
hash_table[encoded] = value
if hash_keys:
# Rename the dict key
data[encoded] = data.pop(key)
else:
data[index][key] = encoded
elif isinstance(item, str):
hashed = siphash(settings.HASHING_KEY, item)
encoded = base36encode(hashed)
if encoded not in hash_table:
hash_table[encoded] = item
data[index] = encoded
if hash_table:
r.hmset(cache, hash_table)
# def hash_list(user, data, hash_keys=False):
# """
# Hash a list of dicts or a list with SipHash42.
# """
# if user.has_perm("core.bypass_hashing"):
# return
# cache = "cache.hash"
# hash_table = {}
# if isinstance(data, dict):
# data_copy = [{x: data[x]} for x in data]
# else:
# data_copy = type(data)((data))
# for index, item in enumerate(data_copy):
# if "src" in item:
# if item["src"] in settings.SAFE_SOURCES:
# continue
# if isinstance(item, dict):
# for key, value in list(item.items()):
# if (
# key not in settings.WHITELIST_FIELDS
# and key not in settings.NO_OBFUSCATE_PARAMS
# ):
# if isinstance(value, int):
# value = str(value)
# if isinstance(value, bool):
# continue
# if value is None:
# continue
# if hash_keys:
# hashed = siphash(settings.HASHING_KEY, key)
# else:
# hashed = siphash(settings.HASHING_KEY, value)
# encoded = base36encode(hashed)
# if encoded not in hash_table:
# if hash_keys:
# hash_table[encoded] = key
# else:
# hash_table[encoded] = value
# if hash_keys:
# # Rename the dict key
# data[encoded] = data.pop(key)
# else:
# data[index][key] = encoded
# elif isinstance(item, str):
# hashed = siphash(settings.HASHING_KEY, item)
# encoded = base36encode(hashed)
# if encoded not in hash_table:
# hash_table[encoded] = item
# data[index] = encoded
# if hash_table:
# r.hmset(cache, hash_table)
def hash_lookup(user, data_dict, supplementary_data=None):
cache = "cache.hash"
hash_list = SortedSet()
denied = []
for key, value in list(data_dict.items()):
if "source" in data_dict:
if data_dict["source"] in settings.SAFE_SOURCES:
continue
if "src" in data_dict:
if data_dict["src"] in settings.SAFE_SOURCES:
continue
if supplementary_data:
if "source" in supplementary_data:
if supplementary_data["source"] in settings.SAFE_SOURCES:
continue
if key in settings.SEARCH_FIELDS_DENY:
if not user.has_perm("core.bypass_hashing"):
data_dict[key] = SearchDenied(key=key, value=data_dict[key])
denied.append(data_dict[key])
if (
key not in settings.WHITELIST_FIELDS
and key not in settings.NO_OBFUSCATE_PARAMS
):
if not value:
continue
# hashes = re.findall("\|([^\|]*)\|", value) # noqa
if isinstance(value, str):
hashes = re.findall("[A-Z0-9]{12,13}", value)
elif isinstance(value, dict):
hashes = []
for key, value in value.items():
if not value:
continue
hashes_iter = re.findall("[A-Z0-9]{12,13}", value)
for h in hashes_iter:
hashes.append(h)
if not hashes:
# Otherwise the user could inject plaintext search queries
if not user.has_perm("core.bypass_hashing"):
data_dict[key] = SearchDenied(key=key, value=data_dict[key])
denied.append(data_dict[key])
continue
else:
# There are hashes here but there shouldn't be!
if key in settings.TAG_SEARCH_DENY:
data_dict[key] = LookupDenied(key=key, value=data_dict[key])
denied.append(data_dict[key])
continue
# def hash_lookup(user, data_dict, supplementary_data=None):
# cache = "cache.hash"
# hash_list = SortedSet()
# denied = []
# for key, value in list(data_dict.items()):
# if "source" in data_dict:
# if data_dict["source"] in settings.SAFE_SOURCES:
# continue
# if "src" in data_dict:
# if data_dict["src"] in settings.SAFE_SOURCES:
# continue
# if supplementary_data:
# if "source" in supplementary_data:
# if supplementary_data["source"] in settings.SAFE_SOURCES:
# continue
# if key in settings.SEARCH_FIELDS_DENY:
# if not user.has_perm("core.bypass_hashing"):
# data_dict[key] = SearchDenied(key=key, value=data_dict[key])
# denied.append(data_dict[key])
# if (
# key not in settings.WHITELIST_FIELDS
# and key not in settings.NO_OBFUSCATE_PARAMS
# ):
# if not value:
# continue
# # hashes = re.findall("\|([^\|]*)\|", value) # noqa
# if isinstance(value, str):
# hashes = re.findall("[A-Z0-9]{12,13}", value)
# elif isinstance(value, dict):
# hashes = []
# for key, value in value.items():
# if not value:
# continue
# hashes_iter = re.findall("[A-Z0-9]{12,13}", value)
# for h in hashes_iter:
# hashes.append(h)
# if not hashes:
# # Otherwise the user could inject plaintext search queries
# if not user.has_perm("core.bypass_hashing"):
# data_dict[key] = SearchDenied(key=key, value=data_dict[key])
# denied.append(data_dict[key])
# continue
# else:
# # There are hashes here but there shouldn't be!
# if key in settings.TAG_SEARCH_DENY:
# data_dict[key] = LookupDenied(key=key, value=data_dict[key])
# denied.append(data_dict[key])
# continue
for hash in hashes:
hash_list.add(hash)
# for hash in hashes:
# hash_list.add(hash)
if hash_list:
values = r.hmget(cache, *hash_list)
if not values:
return
for index, val in enumerate(values):
if val is None:
values[index] = b"ERR"
values = [x.decode() for x in values]
total = dict(zip(hash_list, values))
for key in data_dict.keys():
for hash in total:
if data_dict[key]:
if isinstance(data_dict[key], str):
if hash in data_dict[key]:
data_dict[key] = data_dict[key].replace(
f"{hash}", total[hash]
)
elif isinstance(data_dict[key], dict):
for k2, v2 in data_dict[key].items():
if hash in v2:
data_dict[key][k2] = v2.replace(f"{hash}", total[hash])
return denied
# if hash_list:
# values = r.hmget(cache, *hash_list)
# if not values:
# return
# for index, val in enumerate(values):
# if val is None:
# values[index] = b"ERR"
# values = [x.decode() for x in values]
# total = dict(zip(hash_list, values))
# for key in data_dict.keys():
# for hash in total:
# if data_dict[key]:
# if isinstance(data_dict[key], str):
# if hash in data_dict[key]:
# data_dict[key] = data_dict[key].replace(
# f"{hash}", total[hash]
# )
# elif isinstance(data_dict[key], dict):
# for k2, v2 in data_dict[key].items():
# if hash in v2:
# data_dict[key][k2] = v2.repl
# ace(f"{hash}", total[hash])
# return denied
def encrypt_list(user, data, secret):
if user.has_perm("core.bypass_encryption"):
return
cipher = Cipher(algorithms.AES(secret), ECB())
for index, item in enumerate(data):
for key, value in item.items():
if key not in settings.WHITELIST_FIELDS:
encryptor = cipher.encryptor()
if isinstance(value, int):
value = str(value)
if isinstance(value, bool):
continue
if value is None:
continue
decoded = value.encode("utf8", "replace")
length = 16 - (len(decoded) % 16)
decoded += bytes([length]) * length
ct = encryptor.update(decoded) + encryptor.finalize()
final_str = b64encode(ct)
data[index][key] = final_str.decode("utf-8", "replace")
# def encrypt_list(user, data, secret):
# if user.has_perm("core.bypass_encryption"):
# return
# cipher = Cipher(algorithms.AES(secret), ECB())
# for index, item in enumerate(data):
# for key, value in item.items():
# if key not in settings.WHITELIST_FIELDS:
# encryptor = cipher.encryptor()
# if isinstance(value, int):
# value = str(value)
# if isinstance(value, bool):
# continue
# if value is None:
# continue
# decoded = value.encode("utf8", "replace")
# length = 16 - (len(decoded) % 16)
# decoded += bytes([length]) * length
# ct = encryptor.update(decoded) + encryptor.finalize()
# final_str = b64encode(ct)
# data[index][key] = final_str.decode("utf-8", "replace")