299 lines
10 KiB
Python
299 lines
10 KiB
Python
import re
|
|
from base64 import b64encode
|
|
from random import randint
|
|
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
|
|
from cryptography.hazmat.primitives.ciphers.modes import ECB
|
|
from django.conf import settings
|
|
from siphashc import siphash
|
|
from sortedcontainers import SortedSet
|
|
|
|
from core import r
|
|
|
|
|
|
class SearchDenied:
|
|
def __init__(self, key, value):
|
|
self.key = key
|
|
self.value = value
|
|
|
|
|
|
class LookupDenied:
|
|
def __init__(self, key, value):
|
|
self.key = key
|
|
self.value = value
|
|
|
|
|
|
def dedup_list(data, check_keys):
|
|
"""
|
|
Remove duplicate dictionaries from list.
|
|
"""
|
|
seen = set()
|
|
out = []
|
|
|
|
dup_count = 0
|
|
for x in data:
|
|
dedupeKey = tuple(x[k] for k in check_keys if k in x)
|
|
if dedupeKey in seen:
|
|
dup_count += 1
|
|
continue
|
|
if dup_count > 0:
|
|
out.append({"type": "control", "hidden": dup_count})
|
|
dup_count = 0
|
|
out.append(x)
|
|
seen.add(dedupeKey)
|
|
if dup_count > 0:
|
|
out.append({"type": "control", "hidden": dup_count})
|
|
return out
|
|
|
|
|
|
# from random import randint
|
|
# from timeit import timeit
|
|
# entries = 10000
|
|
# a = [
|
|
# {'ts': "sss", 'msg': randint(1, 2), str(randint(1, 2)): \
|
|
# randint(1, 2)} for x in range(entries)
|
|
# ]
|
|
# kk = ["msg", "nick"]
|
|
# call = lambda: dedup_list(a, kk)
|
|
# #print(timeit(call, number=10))
|
|
# print(dedup_list(a, kk))
|
|
|
|
# # sh-5.1$ python helpers.py
|
|
# # 1.0805372429895215
|
|
|
|
|
|
def base36encode(number, alphabet="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"):
|
|
"""Converts an integer to a base36 string."""
|
|
if not isinstance(number, (int)):
|
|
raise TypeError("number must be an integer")
|
|
|
|
base36 = ""
|
|
sign = ""
|
|
|
|
if number < 0:
|
|
sign = "-"
|
|
number = -number
|
|
|
|
if 0 <= number < len(alphabet):
|
|
return sign + alphabet[number]
|
|
|
|
while number != 0:
|
|
number, i = divmod(number, len(alphabet))
|
|
base36 = alphabet[i] + base36
|
|
|
|
return sign + base36
|
|
|
|
|
|
def base36decode(number):
|
|
return int(number, 36)
|
|
|
|
|
|
def randomise_list(user, data):
|
|
"""
|
|
Randomise data in a list of dictionaries.
|
|
"""
|
|
if user.has_perm("core.bypass_randomisation"):
|
|
return
|
|
if isinstance(data, list):
|
|
for index, item in enumerate(data):
|
|
for key, value in item.items():
|
|
if key in settings.RANDOMISE_FIELDS:
|
|
if isinstance(value, int):
|
|
min_val = value - (value * settings.RANDOMISE_RATIO)
|
|
max_val = value + (value * settings.RANDOMISE_RATIO)
|
|
new_val = randint(int(min_val), int(max_val))
|
|
data[index][key] = new_val
|
|
elif isinstance(data, dict):
|
|
for key, value in data.items():
|
|
# if key in settings.RANDOMISE_FIELDS:
|
|
if isinstance(value, int):
|
|
min_val = value - (value * settings.RANDOMISE_RATIO)
|
|
max_val = value + (value * settings.RANDOMISE_RATIO)
|
|
new_val = randint(int(min_val), int(max_val))
|
|
data[key] = new_val
|
|
|
|
|
|
def obfuscate_list(user, data):
|
|
"""
|
|
Obfuscate data in a list of dictionaries.
|
|
"""
|
|
if user.has_perm("core.bypass_obfuscation"):
|
|
print("USER HAS PERM")
|
|
return
|
|
print("NO HAVE PERM")
|
|
for index, item in enumerate(data):
|
|
for key, value in item.items():
|
|
# Obfuscate a ratio of the field
|
|
if key in settings.OBFUSCATE_FIELDS:
|
|
length = len(value) - 1
|
|
split = int(length * settings.OBFUSCATE_KEEP_RATIO)
|
|
first_part = value[:split]
|
|
second_part = value[split:]
|
|
second_len = len(second_part)
|
|
second_part = "*" * second_len
|
|
data[index][key] = first_part + second_part
|
|
# Obfuscate value based on fields
|
|
# Example: 2022-02-02 -> 2022-02-**
|
|
# 14:11:12 -> 14:11:**
|
|
elif key in settings.OBFUSCATE_FIELDS_SEP:
|
|
if "-" in value:
|
|
sep = "-"
|
|
value_spl = value.split("-")
|
|
hide_num = settings.OBFUSCATE_DASH_NUM
|
|
elif ":" in value:
|
|
sep = ":"
|
|
value_spl = value.split(":")
|
|
hide_num = settings.OBFUSCATE_COLON_NUM
|
|
|
|
first_part = value_spl[:hide_num]
|
|
second_part = value_spl[hide_num:]
|
|
for index_x, x in enumerate(second_part):
|
|
x_len = len(x)
|
|
second_part[index_x] = "*" * x_len
|
|
result = sep.join([*first_part, *second_part])
|
|
data[index][key] = result
|
|
for key in settings.COMBINE_FIELDS:
|
|
for index, item in enumerate(data):
|
|
if key in item:
|
|
k1, k2 = settings.COMBINE_FIELDS[key]
|
|
if k1 in item and k2 in item:
|
|
data[index][key] = item[k1] + item[k2]
|
|
|
|
|
|
def hash_list(user, data, hash_keys=False):
|
|
"""
|
|
Hash a list of dicts or a list with SipHash42.
|
|
"""
|
|
if user.has_perm("core.bypass_hashing"):
|
|
return
|
|
cache = "cache.hash"
|
|
hash_table = {}
|
|
if isinstance(data, dict):
|
|
data_copy = [{x: data[x]} for x in data]
|
|
else:
|
|
data_copy = type(data)((data))
|
|
for index, item in enumerate(data_copy):
|
|
if isinstance(item, dict):
|
|
for key, value in list(item.items()):
|
|
if (
|
|
key not in settings.WHITELIST_FIELDS
|
|
and key not in settings.NO_OBFUSCATE_PARAMS
|
|
):
|
|
if isinstance(value, int):
|
|
value = str(value)
|
|
if isinstance(value, bool):
|
|
continue
|
|
if value is None:
|
|
continue
|
|
if hash_keys:
|
|
hashed = siphash(settings.HASHING_KEY, key)
|
|
else:
|
|
hashed = siphash(settings.HASHING_KEY, value)
|
|
encoded = base36encode(hashed)
|
|
if encoded not in hash_table:
|
|
if hash_keys:
|
|
hash_table[encoded] = key
|
|
else:
|
|
hash_table[encoded] = value
|
|
if hash_keys:
|
|
# Rename the dict key
|
|
data[encoded] = data.pop(key)
|
|
else:
|
|
data[index][key] = encoded
|
|
elif isinstance(item, str):
|
|
hashed = siphash(settings.HASHING_KEY, item)
|
|
encoded = base36encode(hashed)
|
|
if encoded not in hash_table:
|
|
hash_table[encoded] = item
|
|
data[index] = encoded
|
|
if hash_table:
|
|
r.hmset(cache, hash_table)
|
|
|
|
|
|
def hash_lookup(user, data_dict):
|
|
cache = "cache.hash"
|
|
hash_list = SortedSet()
|
|
denied = []
|
|
for key, value in list(data_dict.items()):
|
|
if key in settings.SEARCH_FIELDS_DENY:
|
|
if not user.has_perm("core.bypass_hashing"):
|
|
data_dict[key] = SearchDenied(key=key, value=data_dict[key])
|
|
denied.append(data_dict[key])
|
|
if (
|
|
key not in settings.WHITELIST_FIELDS
|
|
and key not in settings.NO_OBFUSCATE_PARAMS
|
|
):
|
|
if not value:
|
|
continue
|
|
# hashes = re.findall("\|([^\|]*)\|", value) # noqa
|
|
if isinstance(value, str):
|
|
hashes = re.findall("[A-Z0-9]{12,13}", value)
|
|
elif isinstance(value, dict):
|
|
hashes = []
|
|
for key, value in value.items():
|
|
if not value:
|
|
continue
|
|
hashes_iter = re.findall("[A-Z0-9]{12,13}", value)
|
|
for h in hashes_iter:
|
|
hashes.append(h)
|
|
if not hashes:
|
|
# Otherwise the user could inject plaintext search queries
|
|
if not user.has_perm("core.bypass_hashing"):
|
|
data_dict[key] = SearchDenied(key=key, value=data_dict[key])
|
|
denied.append(data_dict[key])
|
|
continue
|
|
else:
|
|
# There are hashes here but there shouldn't be!
|
|
if key in settings.TAG_SEARCH_DENY:
|
|
data_dict[key] = LookupDenied(key=key, value=data_dict[key])
|
|
denied.append(data_dict[key])
|
|
continue
|
|
|
|
for hash in hashes:
|
|
hash_list.add(hash)
|
|
|
|
if hash_list:
|
|
values = r.hmget(cache, *hash_list)
|
|
if not values:
|
|
return
|
|
for index, val in enumerate(values):
|
|
if val is None:
|
|
values[index] = b"ERR"
|
|
values = [x.decode() for x in values]
|
|
total = dict(zip(hash_list, values))
|
|
for key in data_dict.keys():
|
|
for hash in total:
|
|
if data_dict[key]:
|
|
if isinstance(data_dict[key], str):
|
|
if hash in data_dict[key]:
|
|
data_dict[key] = data_dict[key].replace(
|
|
f"{hash}", total[hash]
|
|
)
|
|
elif isinstance(data_dict[key], dict):
|
|
for k2, v2 in data_dict[key].items():
|
|
if hash in v2:
|
|
data_dict[key][k2] = v2.replace(f"{hash}", total[hash])
|
|
return denied
|
|
|
|
|
|
def encrypt_list(user, data, secret):
|
|
if user.has_perm("core.bypass_encryption"):
|
|
return
|
|
cipher = Cipher(algorithms.AES(secret), ECB())
|
|
for index, item in enumerate(data):
|
|
for key, value in item.items():
|
|
if key not in settings.WHITELIST_FIELDS:
|
|
encryptor = cipher.encryptor()
|
|
if isinstance(value, int):
|
|
value = str(value)
|
|
if isinstance(value, bool):
|
|
continue
|
|
if value is None:
|
|
continue
|
|
decoded = value.encode("utf8", "replace")
|
|
length = 16 - (len(decoded) % 16)
|
|
decoded += bytes([length]) * length
|
|
ct = encryptor.update(decoded) + encryptor.finalize()
|
|
final_str = b64encode(ct)
|
|
data[index][key] = final_str.decode("utf-8", "replace")
|