import re from base64 import b64encode from cryptography.hazmat.primitives.ciphers import Cipher, algorithms from cryptography.hazmat.primitives.ciphers.modes import ECB from django.conf import settings from siphashc import siphash from sortedcontainers import SortedSet from core import r class SearchDenied: def __init__(self, value): self.value = value def dedup_list(data, check_keys): """ Remove duplicate dictionaries from list. """ seen = set() out = [] dup_count = 0 for x in data: dedupeKey = tuple(x[k] for k in check_keys if k in x) if dedupeKey in seen: dup_count += 1 continue if dup_count > 0: out.append({"type": "control", "hidden": dup_count}) dup_count = 0 out.append(x) seen.add(dedupeKey) if dup_count > 0: out.append({"type": "control", "hidden": dup_count}) return out # from random import randint # from timeit import timeit # entries = 10000 # a = [ # {'ts': "sss", 'msg': randint(1, 2), str(randint(1, 2)): \ # randint(1, 2)} for x in range(entries) # ] # kk = ["msg", "nick"] # call = lambda: dedup_list(a, kk) # #print(timeit(call, number=10)) # print(dedup_list(a, kk)) # # sh-5.1$ python helpers.py # # 1.0805372429895215 def base36encode(number, alphabet="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"): """Converts an integer to a base36 string.""" if not isinstance(number, (int)): raise TypeError("number must be an integer") base36 = "" sign = "" if number < 0: sign = "-" number = -number if 0 <= number < len(alphabet): return sign + alphabet[number] while number != 0: number, i = divmod(number, len(alphabet)) base36 = alphabet[i] + base36 return sign + base36 def base36decode(number): return int(number, 36) def obfuscate_list(user, data): """ Obfuscate data in a list of dictionaries. """ if user.has_perm("core.bypass_obfuscation"): return for index, item in enumerate(data): for key, value in item.items(): # Obfuscate a ratio of the field if key in settings.OBFUSCATE_FIELDS: length = len(value) - 1 split = int(length * settings.OBFUSCATE_KEEP_RATIO) first_part = value[:split] second_part = value[split:] second_len = len(second_part) second_part = "*" * second_len data[index][key] = first_part + second_part # Obfuscate value based on fields # Example: 2022-02-02 -> 2022-02-** # 14:11:12 -> 14:11:** elif key in settings.OBFUSCATE_FIELDS_SEP: if "-" in value: sep = "-" value_spl = value.split("-") hide_num = settings.OBFUSCATE_DASH_NUM elif ":" in value: sep = ":" value_spl = value.split(":") hide_num = settings.OBFUSCATE_COLON_NUM first_part = value_spl[:hide_num] second_part = value_spl[hide_num:] for index_x, x in enumerate(second_part): x_len = len(x) second_part[index_x] = "*" * x_len result = sep.join([*first_part, *second_part]) data[index][key] = result def hash_list(user, data, hash_keys=False): """ Hash a list of dicts or a list with SipHash42. """ if user.has_perm("core.bypass_hashing"): return cache = "cache.hash" hash_table = {} if isinstance(data, dict): data_copy = [{x: data[x]} for x in data] else: data_copy = type(data)((data)) for index, item in enumerate(data_copy): if isinstance(item, dict): for key, value in list(item.items()): if ( key not in settings.WHITELIST_FIELDS and key not in settings.NO_OBFUSCATE_PARAMS ): if isinstance(value, int): value = str(value) if isinstance(value, bool): continue if value is None: continue if hash_keys: hashed = siphash(settings.HASHING_KEY, key) else: hashed = siphash(settings.HASHING_KEY, value) encoded = base36encode(hashed) if encoded not in hash_table: if hash_keys: hash_table[encoded] = key else: hash_table[encoded] = value if hash_keys: # Rename the dict key data[encoded] = data.pop(key) else: data[index][key] = encoded elif isinstance(item, str): hashed = siphash(settings.HASHING_KEY, item) encoded = base36encode(hashed) if encoded not in hash_table: hash_table[encoded] = item data[index] = encoded if hash_table: r.hmset(cache, hash_table) def hash_lookup(user, data_dict): cache = "cache.hash" hash_list = SortedSet() for key, value in list(data_dict.items()): if ( key not in settings.WHITELIST_FIELDS and key not in settings.NO_OBFUSCATE_PARAMS ): if not value: continue # hashes = re.findall("\|([^\|]*)\|", value) # noqa if isinstance(value, str): hashes = re.findall("[A-Z0-9]{12,13}", value) elif isinstance(value, dict): hashes = [] for key, value in value.items(): if not value: continue hashes_iter = re.findall("[A-Z0-9]{12,13}", value) for h in hashes_iter: hashes.append(h) if not hashes: # Otherwise the user could inject plaintext search queries if not user.has_perm("bypass_hashing"): data_dict[key] = SearchDenied(value=data_dict[key]) # del data_dict[key] for hash in hashes: hash_list.add(hash) if hash_list: values = r.hmget(cache, *hash_list) if not values: return for index, val in enumerate(values): if val is None: values[index] = b"ERR" values = [x.decode() for x in values] total = dict(zip(hash_list, values)) for key in data_dict.keys(): for hash in total: if data_dict[key]: if isinstance(data_dict[key], str): if hash in data_dict[key]: data_dict[key] = data_dict[key].replace( f"{hash}", total[hash] ) elif isinstance(data_dict[key], dict): for k2, v2 in data_dict[key].items(): if hash in v2: data_dict[key][k2] = v2.replace(f"{hash}", total[hash]) def encrypt_list(user, data, secret): if user.has_perm("core.bypass_encryption"): return cipher = Cipher(algorithms.AES(secret), ECB()) for index, item in enumerate(data): for key, value in item.items(): if key not in settings.WHITELIST_FIELDS: encryptor = cipher.encryptor() if isinstance(value, int): value = str(value) if isinstance(value, bool): continue if value is None: continue decoded = value.encode("utf8", "replace") length = 16 - (len(decoded) % 16) decoded += bytes([length]) * length ct = encryptor.update(decoded) + encryptor.finalize() final_str = b64encode(ct) data[index][key] = final_str.decode("utf-8", "replace")