import re from base64 import b64encode from random import randint from cryptography.hazmat.primitives.ciphers import Cipher, algorithms from cryptography.hazmat.primitives.ciphers.modes import ECB from django.conf import settings from siphashc import siphash from sortedcontainers import SortedSet from core import r class SearchDenied: def __init__(self, key, value): self.key = key self.value = value class LookupDenied: def __init__(self, key, value): self.key = key self.value = value def dedup_list(data, check_keys): """ Remove duplicate dictionaries from list. """ seen = set() out = [] dup_count = 0 for x in data: dedupeKey = tuple(x[k] for k in check_keys if k in x) if dedupeKey in seen: dup_count += 1 continue if dup_count > 0: out.append({"type": "control", "hidden": dup_count}) dup_count = 0 out.append(x) seen.add(dedupeKey) if dup_count > 0: out.append({"type": "control", "hidden": dup_count}) return out # from random import randint # from timeit import timeit # entries = 10000 # a = [ # {'ts': "sss", 'msg': randint(1, 2), str(randint(1, 2)): \ # randint(1, 2)} for x in range(entries) # ] # kk = ["msg", "nick"] # call = lambda: dedup_list(a, kk) # #print(timeit(call, number=10)) # print(dedup_list(a, kk)) # # sh-5.1$ python helpers.py # # 1.0805372429895215 def base36encode(number, alphabet="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"): """Converts an integer to a base36 string.""" if not isinstance(number, (int)): raise TypeError("number must be an integer") base36 = "" sign = "" if number < 0: sign = "-" number = -number if 0 <= number < len(alphabet): return sign + alphabet[number] while number != 0: number, i = divmod(number, len(alphabet)) base36 = alphabet[i] + base36 return sign + base36 def base36decode(number): return int(number, 36) def randomise_list(user, data): """ Randomise data in a list of dictionaries. """ if user.has_perm("core.bypass_randomisation"): return if isinstance(data, list): for index, item in enumerate(data): for key, value in item.items(): if key in settings.RANDOMISE_FIELDS: if isinstance(value, int): min_val = value - (value * settings.RANDOMISE_RATIO) max_val = value + (value * settings.RANDOMISE_RATIO) new_val = randint(int(min_val), int(max_val)) data[index][key] = new_val elif isinstance(data, dict): for key, value in data.items(): # if key in settings.RANDOMISE_FIELDS: if isinstance(value, int): min_val = value - (value * settings.RANDOMISE_RATIO) max_val = value + (value * settings.RANDOMISE_RATIO) new_val = randint(int(min_val), int(max_val)) data[key] = new_val def obfuscate_list(user, data): """ Obfuscate data in a list of dictionaries. """ if user.has_perm("core.bypass_obfuscation"): return for index, item in enumerate(data): for key, value in item.items(): # Obfuscate a ratio of the field if key in settings.OBFUSCATE_FIELDS: length = len(value) - 1 split = int(length * settings.OBFUSCATE_KEEP_RATIO) first_part = value[:split] second_part = value[split:] second_len = len(second_part) second_part = "*" * second_len data[index][key] = first_part + second_part # Obfuscate value based on fields # Example: 2022-02-02 -> 2022-02-** # 14:11:12 -> 14:11:** elif key in settings.OBFUSCATE_FIELDS_SEP: if "-" in value: sep = "-" value_spl = value.split("-") hide_num = settings.OBFUSCATE_DASH_NUM elif ":" in value: sep = ":" value_spl = value.split(":") hide_num = settings.OBFUSCATE_COLON_NUM first_part = value_spl[:hide_num] second_part = value_spl[hide_num:] for index_x, x in enumerate(second_part): x_len = len(x) second_part[index_x] = "*" * x_len result = sep.join([*first_part, *second_part]) data[index][key] = result for key in settings.COMBINE_FIELDS: for index, item in enumerate(data): if key in item: k1, k2 = settings.COMBINE_FIELDS[key] if k1 in item and k2 in item: data[index][key] = item[k1] + item[k2] def hash_list(user, data, hash_keys=False): """ Hash a list of dicts or a list with SipHash42. """ if user.has_perm("core.bypass_hashing"): return cache = "cache.hash" hash_table = {} if isinstance(data, dict): data_copy = [{x: data[x]} for x in data] else: data_copy = type(data)((data)) for index, item in enumerate(data_copy): if "src" in item: if item["src"] in settings.SAFE_SOURCES: continue if isinstance(item, dict): for key, value in list(item.items()): if ( key not in settings.WHITELIST_FIELDS and key not in settings.NO_OBFUSCATE_PARAMS ): if isinstance(value, int): value = str(value) if isinstance(value, bool): continue if value is None: continue if hash_keys: hashed = siphash(settings.HASHING_KEY, key) else: hashed = siphash(settings.HASHING_KEY, value) encoded = base36encode(hashed) if encoded not in hash_table: if hash_keys: hash_table[encoded] = key else: hash_table[encoded] = value if hash_keys: # Rename the dict key data[encoded] = data.pop(key) else: data[index][key] = encoded elif isinstance(item, str): hashed = siphash(settings.HASHING_KEY, item) encoded = base36encode(hashed) if encoded not in hash_table: hash_table[encoded] = item data[index] = encoded if hash_table: r.hmset(cache, hash_table) def hash_lookup(user, data_dict): cache = "cache.hash" hash_list = SortedSet() denied = [] for key, value in list(data_dict.items()): if key in settings.SEARCH_FIELDS_DENY: if not user.has_perm("core.bypass_hashing"): data_dict[key] = SearchDenied(key=key, value=data_dict[key]) denied.append(data_dict[key]) if ( key not in settings.WHITELIST_FIELDS and key not in settings.NO_OBFUSCATE_PARAMS ): if not value: continue # hashes = re.findall("\|([^\|]*)\|", value) # noqa if isinstance(value, str): hashes = re.findall("[A-Z0-9]{12,13}", value) elif isinstance(value, dict): hashes = [] for key, value in value.items(): if not value: continue hashes_iter = re.findall("[A-Z0-9]{12,13}", value) for h in hashes_iter: hashes.append(h) if not hashes: # Otherwise the user could inject plaintext search queries if not user.has_perm("core.bypass_hashing"): data_dict[key] = SearchDenied(key=key, value=data_dict[key]) denied.append(data_dict[key]) continue else: # There are hashes here but there shouldn't be! if key in settings.TAG_SEARCH_DENY: data_dict[key] = LookupDenied(key=key, value=data_dict[key]) denied.append(data_dict[key]) continue for hash in hashes: hash_list.add(hash) if hash_list: values = r.hmget(cache, *hash_list) if not values: return for index, val in enumerate(values): if val is None: values[index] = b"ERR" values = [x.decode() for x in values] total = dict(zip(hash_list, values)) for key in data_dict.keys(): for hash in total: if data_dict[key]: if isinstance(data_dict[key], str): if hash in data_dict[key]: data_dict[key] = data_dict[key].replace( f"{hash}", total[hash] ) elif isinstance(data_dict[key], dict): for k2, v2 in data_dict[key].items(): if hash in v2: data_dict[key][k2] = v2.replace(f"{hash}", total[hash]) return denied def encrypt_list(user, data, secret): if user.has_perm("core.bypass_encryption"): return cipher = Cipher(algorithms.AES(secret), ECB()) for index, item in enumerate(data): for key, value in item.items(): if key not in settings.WHITELIST_FIELDS: encryptor = cipher.encryptor() if isinstance(value, int): value = str(value) if isinstance(value, bool): continue if value is None: continue decoded = value.encode("utf8", "replace") length = 16 - (len(decoded) % 16) decoded += bytes([length]) * length ct = encryptor.update(decoded) + encryptor.finalize() final_str = b64encode(ct) data[index][key] = final_str.decode("utf-8", "replace")