# import re # from base64 import b64encode # from random import randint # from cryptography.hazmat.primitives.ciphers import Cipher, algorithms # from cryptography.hazmat.primitives.ciphers.modes import ECB # from django.conf import settings # from siphashc import siphash # from sortedcontainers import SortedSet # from core import r from django.conf import settings class SearchDenied: def __init__(self, key, value): self.key = key self.value = value class LookupDenied: def __init__(self, key, value): self.key = key self.value = value def remove_defaults(query_params): for field, value in list(query_params.items()): if field in settings.DRILLDOWN_DEFAULT_PARAMS: if value == settings.DRILLDOWN_DEFAULT_PARAMS[field]: del query_params[field] def add_defaults(query_params): for field, value in settings.DRILLDOWN_DEFAULT_PARAMS.items(): if field not in query_params: query_params[field] = value def dedup_list(data, check_keys): """ Remove duplicate dictionaries from list. """ seen = set() out = [] dup_count = 0 for x in data: dedupeKey = tuple(x[k] for k in check_keys if k in x) if dedupeKey in seen: dup_count += 1 continue if dup_count > 0: out.append({"type": "control", "hidden": dup_count}) dup_count = 0 out.append(x) seen.add(dedupeKey) if dup_count > 0: out.append({"type": "control", "hidden": dup_count}) return out # from random import randint # from timeit import timeit # entries = 10000 # a = [ # {'ts': "sss", 'msg': randint(1, 2), str(randint(1, 2)): \ # randint(1, 2)} for x in range(entries) # ] # kk = ["msg", "nick"] # call = lambda: dedup_list(a, kk) # #print(timeit(call, number=10)) # print(dedup_list(a, kk)) # # sh-5.1$ python helpers.py # # 1.0805372429895215 # def base36encode(number, alphabet="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"): # """Converts an integer to a base36 string.""" # if not isinstance(number, (int)): # raise TypeError("number must be an integer") # base36 = "" # sign = "" # if number < 0: # sign = "-" # number = -number # if 0 <= number < len(alphabet): # return sign + alphabet[number] # while number != 0: # number, i = divmod(number, len(alphabet)) # base36 = alphabet[i] + base36 # return sign + base36 # def base36decode(number): # return int(number, 36) # def randomise_list(user, data): # """ # Randomise data in a list of dictionaries. # """ # if user.has_perm("core.bypass_randomisation"): # return # if isinstance(data, list): # for index, item in enumerate(data): # for key, value in item.items(): # if key in settings.RANDOMISE_FIELDS: # if isinstance(value, int): # min_val = value - (value * settings.RANDOMISE_RATIO) # max_val = value + (value * settings.RANDOMISE_RATIO) # new_val = randint(int(min_val), int(max_val)) # data[index][key] = new_val # elif isinstance(data, dict): # for key, value in data.items(): # # if key in settings.RANDOMISE_FIELDS: # if isinstance(value, int): # min_val = value - (value * settings.RANDOMISE_RATIO) # max_val = value + (value * settings.RANDOMISE_RATIO) # new_val = randint(int(min_val), int(max_val)) # data[key] = new_val # def obfuscate_list(user, data): # """ # Obfuscate data in a list of dictionaries. # """ # if user.has_perm("core.bypass_obfuscation"): # return # for index, item in enumerate(data): # for key, value in item.items(): # # Obfuscate a ratio of the field # if key in settings.OBFUSCATE_FIELDS: # length = len(value) - 1 # split = int(length * settings.OBFUSCATE_KEEP_RATIO) # first_part = value[:split] # second_part = value[split:] # second_len = len(second_part) # second_part = "*" * second_len # data[index][key] = first_part + second_part # # Obfuscate value based on fields # # Example: 2022-02-02 -> 2022-02-** # # 14:11:12 -> 14:11:** # elif key in settings.OBFUSCATE_FIELDS_SEP: # if "-" in value: # sep = "-" # value_spl = value.split("-") # hide_num = settings.OBFUSCATE_DASH_NUM # elif ":" in value: # sep = ":" # value_spl = value.split(":") # hide_num = settings.OBFUSCATE_COLON_NUM # first_part = value_spl[:hide_num] # second_part = value_spl[hide_num:] # for index_x, x in enumerate(second_part): # x_len = len(x) # second_part[index_x] = "*" * x_len # result = sep.join([*first_part, *second_part]) # data[index][key] = result # for key in settings.COMBINE_FIELDS: # for index, item in enumerate(data): # if key in item: # k1, k2 = settings.COMBINE_FIELDS[key] # if k1 in item and k2 in item: # data[index][key] = item[k1] + item[k2] # def hash_list(user, data, hash_keys=False): # """ # Hash a list of dicts or a list with SipHash42. # """ # if user.has_perm("core.bypass_hashing"): # return # cache = "cache.hash" # hash_table = {} # if isinstance(data, dict): # data_copy = [{x: data[x]} for x in data] # else: # data_copy = type(data)((data)) # for index, item in enumerate(data_copy): # if "src" in item: # if item["src"] in settings.SAFE_SOURCES: # continue # if isinstance(item, dict): # for key, value in list(item.items()): # if ( # key not in settings.WHITELIST_FIELDS # and key not in settings.NO_OBFUSCATE_PARAMS # ): # if isinstance(value, int): # value = str(value) # if isinstance(value, bool): # continue # if value is None: # continue # if hash_keys: # hashed = siphash(settings.HASHING_KEY, key) # else: # hashed = siphash(settings.HASHING_KEY, value) # encoded = base36encode(hashed) # if encoded not in hash_table: # if hash_keys: # hash_table[encoded] = key # else: # hash_table[encoded] = value # if hash_keys: # # Rename the dict key # data[encoded] = data.pop(key) # else: # data[index][key] = encoded # elif isinstance(item, str): # hashed = siphash(settings.HASHING_KEY, item) # encoded = base36encode(hashed) # if encoded not in hash_table: # hash_table[encoded] = item # data[index] = encoded # if hash_table: # r.hmset(cache, hash_table) # def hash_lookup(user, data_dict, supplementary_data=None): # cache = "cache.hash" # hash_list = SortedSet() # denied = [] # for key, value in list(data_dict.items()): # if "source" in data_dict: # if data_dict["source"] in settings.SAFE_SOURCES: # continue # if "src" in data_dict: # if data_dict["src"] in settings.SAFE_SOURCES: # continue # if supplementary_data: # if "source" in supplementary_data: # if supplementary_data["source"] in settings.SAFE_SOURCES: # continue # if key in settings.SEARCH_FIELDS_DENY: # if not user.has_perm("core.bypass_hashing"): # data_dict[key] = SearchDenied(key=key, value=data_dict[key]) # denied.append(data_dict[key]) # if ( # key not in settings.WHITELIST_FIELDS # and key not in settings.NO_OBFUSCATE_PARAMS # ): # if not value: # continue # # hashes = re.findall("\|([^\|]*)\|", value) # noqa # if isinstance(value, str): # hashes = re.findall("[A-Z0-9]{12,13}", value) # elif isinstance(value, dict): # hashes = [] # for key, value in value.items(): # if not value: # continue # hashes_iter = re.findall("[A-Z0-9]{12,13}", value) # for h in hashes_iter: # hashes.append(h) # if not hashes: # # Otherwise the user could inject plaintext search queries # if not user.has_perm("core.bypass_hashing"): # data_dict[key] = SearchDenied(key=key, value=data_dict[key]) # denied.append(data_dict[key]) # continue # else: # # There are hashes here but there shouldn't be! # if key in settings.TAG_SEARCH_DENY: # data_dict[key] = LookupDenied(key=key, value=data_dict[key]) # denied.append(data_dict[key]) # continue # for hash in hashes: # hash_list.add(hash) # if hash_list: # values = r.hmget(cache, *hash_list) # if not values: # return # for index, val in enumerate(values): # if val is None: # values[index] = b"ERR" # values = [x.decode() for x in values] # total = dict(zip(hash_list, values)) # for key in data_dict.keys(): # for hash in total: # if data_dict[key]: # if isinstance(data_dict[key], str): # if hash in data_dict[key]: # data_dict[key] = data_dict[key].replace( # f"{hash}", total[hash] # ) # elif isinstance(data_dict[key], dict): # for k2, v2 in data_dict[key].items(): # if hash in v2: # data_dict[key][k2] = v2.repl # ace(f"{hash}", total[hash]) # return denied # def encrypt_list(user, data, secret): # if user.has_perm("core.bypass_encryption"): # return # cipher = Cipher(algorithms.AES(secret), ECB()) # for index, item in enumerate(data): # for key, value in item.items(): # if key not in settings.WHITELIST_FIELDS: # encryptor = cipher.encryptor() # if isinstance(value, int): # value = str(value) # if isinstance(value, bool): # continue # if value is None: # continue # decoded = value.encode("utf8", "replace") # length = 16 - (len(decoded) % 16) # decoded += bytes([length]) * length # ct = encryptor.update(decoded) + encryptor.finalize() # final_str = b64encode(ct) # data[index][key] = final_str.decode("utf-8", "replace")