import re from base64 import b64encode from cryptography.hazmat.primitives.ciphers import Cipher, algorithms from cryptography.hazmat.primitives.ciphers.modes import ECB from django.conf import settings from siphashc import siphash from sortedcontainers import SortedSet from core import r def dedup_list(data, check_keys): """ Remove duplicate dictionaries from list. """ seen = set() out = [] dup_count = 0 for x in data: dedupeKey = tuple(x[k] for k in check_keys if k in x) if dedupeKey in seen: dup_count += 1 continue if dup_count > 0: out.append({"type": "control", "hidden": dup_count}) dup_count = 0 out.append(x) seen.add(dedupeKey) if dup_count > 0: out.append({"type": "control", "hidden": dup_count}) return out # from random import randint # from timeit import timeit # entries = 10000 # a = [ # {'ts': "sss", 'msg': randint(1, 2), str(randint(1, 2)): \ # randint(1, 2)} for x in range(entries) # ] # kk = ["msg", "nick"] # call = lambda: dedup_list(a, kk) # #print(timeit(call, number=10)) # print(dedup_list(a, kk)) # # sh-5.1$ python helpers.py # # 1.0805372429895215 def base36encode(number, alphabet="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"): """Converts an integer to a base36 string.""" if not isinstance(number, (int)): raise TypeError("number must be an integer") base36 = "" sign = "" if number < 0: sign = "-" number = -number if 0 <= number < len(alphabet): return sign + alphabet[number] while number != 0: number, i = divmod(number, len(alphabet)) base36 = alphabet[i] + base36 return sign + base36 def base36decode(number): return int(number, 36) def hash_list(user, data, hash_keys=False): """ Hash a list of dicts or a list with SipHash42. """ if user.has_perm("core.bypass_hashing"): return cache = "cache.hash" hash_table = {} if isinstance(data, dict): data_copy = [{x: data[x]} for x in data] else: data_copy = type(data)((data)) for index, item in enumerate(data_copy): if isinstance(item, dict): for key, value in list(item.items()): if key not in settings.WHITELIST_FIELDS: if isinstance(value, int): value = str(value) if isinstance(value, bool): continue if value is None: continue if hash_keys: hashed = siphash(settings.HASHING_KEY, key) else: hashed = siphash(settings.HASHING_KEY, value) encoded = base36encode(hashed) if encoded not in hash_table: if hash_keys: hash_table[encoded] = key else: hash_table[encoded] = value if hash_keys: # Rename the dict key data[encoded] = data.pop(key) else: data[index][key] = encoded elif isinstance(item, str): hashed = siphash(settings.HASHING_KEY, item) encoded = base36encode(hashed) if encoded not in hash_table: hash_table[encoded] = item data[index] = encoded if hash_table: r.hmset(cache, hash_table) def hash_lookup(data_dict): cache = "cache.hash" hash_list = SortedSet() for key, value in data_dict.items(): if not value: continue # hashes = re.findall("\|([^\|]*)\|", value) # noqa hashes = re.findall("[A-Z0-9]{12,13}", value) if not hashes: continue for hash in hashes: hash_list.add(hash) if hash_list: values = r.hmget(cache, *hash_list) if not values: return for index, val in enumerate(values): if val is None: values[index] = b"ERR" values = [x.decode() for x in values] total = dict(zip(hash_list, values)) for key in data_dict.keys(): for hash in total: if data_dict[key]: if hash in data_dict[key]: data_dict[key] = data_dict[key].replace(f"{hash}", total[hash]) def encrypt_list(user, data, secret): if user.has_perm("core.bypass_encryption"): return cipher = Cipher(algorithms.AES(secret), ECB()) for index, item in enumerate(data): for key, value in item.items(): if key not in settings.WHITELIST_FIELDS: encryptor = cipher.encryptor() if isinstance(value, int): value = str(value) if isinstance(value, bool): continue if value is None: continue decoded = value.encode("utf8", "replace") length = 16 - (len(decoded) % 16) decoded += bytes([length]) * length ct = encryptor.update(decoded) + encryptor.finalize() final_str = b64encode(ct) data[index][key] = final_str.decode("utf-8", "replace")