import re
from base64 import b64encode
from random import randint
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
from cryptography.hazmat.primitives.ciphers.modes import ECB
from django.conf import settings
from siphashc import siphash
from sortedcontainers import SortedSet
from core import r
class SearchDenied:
def __init__(self, key, value):
self.key = key
self.value = value
class LookupDenied:
def __init__(self, key, value):
self.key = key
self.value = value
def dedup_list(data, check_keys):
Remove duplicate dictionaries from list.
seen = set()
out = []
dup_count = 0
for x in data:
dedupeKey = tuple(x[k] for k in check_keys if k in x)
if dedupeKey in seen:
dup_count += 1
if dup_count > 0:
out.append({"type": "control", "hidden": dup_count})
dup_count = 0
if dup_count > 0:
out.append({"type": "control", "hidden": dup_count})
return out
# from random import randint
# from timeit import timeit
# entries = 10000
# a = [
# {'ts': "sss", 'msg': randint(1, 2), str(randint(1, 2)): \
# randint(1, 2)} for x in range(entries)
# ]
# kk = ["msg", "nick"]
# call = lambda: dedup_list(a, kk)
# #print(timeit(call, number=10))
# print(dedup_list(a, kk))
# # sh-5.1$ python
# # 1.0805372429895215
def base36encode(number, alphabet="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"):
"""Converts an integer to a base36 string."""
if not isinstance(number, (int)):
raise TypeError("number must be an integer")
base36 = ""
sign = ""
if number < 0:
sign = "-"
number = -number
if 0 <= number < len(alphabet):
return sign + alphabet[number]
while number != 0:
number, i = divmod(number, len(alphabet))
base36 = alphabet[i] + base36
return sign + base36
def base36decode(number):
return int(number, 36)
def randomise_list(user, data):
Randomise data in a list of dictionaries.
if user.has_perm("core.bypass_randomisation"):
if isinstance(data, list):
for index, item in enumerate(data):
for key, value in item.items():
if key in settings.RANDOMISE_FIELDS:
if isinstance(value, int):
min_val = value - (value * settings.RANDOMISE_RATIO)
max_val = value + (value * settings.RANDOMISE_RATIO)
new_val = randint(int(min_val), int(max_val))
data[index][key] = new_val
elif isinstance(data, dict):
for key, value in data.items():
# if key in settings.RANDOMISE_FIELDS:
if isinstance(value, int):
min_val = value - (value * settings.RANDOMISE_RATIO)
max_val = value + (value * settings.RANDOMISE_RATIO)
new_val = randint(int(min_val), int(max_val))
data[key] = new_val
def obfuscate_list(user, data):
Obfuscate data in a list of dictionaries.
if user.has_perm("core.bypass_obfuscation"):
for index, item in enumerate(data):
for key, value in item.items():
# Obfuscate a ratio of the field
if key in settings.OBFUSCATE_FIELDS:
length = len(value) - 1
split = int(length * settings.OBFUSCATE_KEEP_RATIO)
first_part = value[:split]
second_part = value[split:]
second_len = len(second_part)
second_part = "*" * second_len
data[index][key] = first_part + second_part
# Obfuscate value based on fields
# Example: 2022-02-02 -> 2022-02-**
# 14:11:12 -> 14:11:**
elif key in settings.OBFUSCATE_FIELDS_SEP:
if "-" in value:
sep = "-"
value_spl = value.split("-")
hide_num = settings.OBFUSCATE_DASH_NUM
elif ":" in value:
sep = ":"
value_spl = value.split(":")
hide_num = settings.OBFUSCATE_COLON_NUM
first_part = value_spl[:hide_num]
second_part = value_spl[hide_num:]
for index_x, x in enumerate(second_part):
x_len = len(x)
second_part[index_x] = "*" * x_len
result = sep.join([*first_part, *second_part])
data[index][key] = result
for key in settings.COMBINE_FIELDS:
for index, item in enumerate(data):
if key in item:
k1, k2 = settings.COMBINE_FIELDS[key]
if k1 in item and k2 in item:
data[index][key] = item[k1] + item[k2]
def hash_list(user, data, hash_keys=False):
Hash a list of dicts or a list with SipHash42.
if user.has_perm("core.bypass_hashing"):
cache = "cache.hash"
hash_table = {}
if isinstance(data, dict):
data_copy = [{x: data[x]} for x in data]
data_copy = type(data)((data))
for index, item in enumerate(data_copy):
if isinstance(item, dict):
for key, value in list(item.items()):
if (
key not in settings.WHITELIST_FIELDS
and key not in settings.NO_OBFUSCATE_PARAMS
if isinstance(value, int):
value = str(value)
if isinstance(value, bool):
if value is None:
if hash_keys:
hashed = siphash(settings.HASHING_KEY, key)
hashed = siphash(settings.HASHING_KEY, value)
encoded = base36encode(hashed)
if encoded not in hash_table:
if hash_keys:
hash_table[encoded] = key
hash_table[encoded] = value
if hash_keys:
# Rename the dict key
data[encoded] = data.pop(key)
data[index][key] = encoded
elif isinstance(item, str):
hashed = siphash(settings.HASHING_KEY, item)
encoded = base36encode(hashed)
if encoded not in hash_table:
hash_table[encoded] = item
data[index] = encoded
if hash_table:
r.hmset(cache, hash_table)
def hash_lookup(user, data_dict):
cache = "cache.hash"
hash_list = SortedSet()
denied = []
for key, value in list(data_dict.items()):
if key in settings.SEARCH_FIELDS_DENY:
if not user.has_perm("core.bypass_hashing"):
data_dict[key] = SearchDenied(key=key, value=data_dict[key])
if (
key not in settings.WHITELIST_FIELDS
and key not in settings.NO_OBFUSCATE_PARAMS
if not value:
# hashes = re.findall("\|([^\|]*)\|", value) # noqa
if isinstance(value, str):
hashes = re.findall("[A-Z0-9]{12,13}", value)
elif isinstance(value, dict):
hashes = []
for key, value in value.items():
if not value:
hashes_iter = re.findall("[A-Z0-9]{12,13}", value)
for h in hashes_iter:
if not hashes:
# Otherwise the user could inject plaintext search queries
if not user.has_perm("core.bypass_hashing"):
data_dict[key] = SearchDenied(key=key, value=data_dict[key])
# There are hashes here but there shouldn't be!
if key in settings.TAG_SEARCH_DENY:
data_dict[key] = LookupDenied(key=key, value=data_dict[key])
for hash in hashes:
if hash_list:
values = r.hmget(cache, *hash_list)
if not values:
for index, val in enumerate(values):
if val is None:
values[index] = b"ERR"
values = [x.decode() for x in values]
total = dict(zip(hash_list, values))
for key in data_dict.keys():
for hash in total:
if data_dict[key]:
if isinstance(data_dict[key], str):
if hash in data_dict[key]:
data_dict[key] = data_dict[key].replace(
f"{hash}", total[hash]
elif isinstance(data_dict[key], dict):
for k2, v2 in data_dict[key].items():
if hash in v2:
data_dict[key][k2] = v2.replace(f"{hash}", total[hash])
return denied
def encrypt_list(user, data, secret):
if user.has_perm("core.bypass_encryption"):
cipher = Cipher(algorithms.AES(secret), ECB())
for index, item in enumerate(data):
for key, value in item.items():
if key not in settings.WHITELIST_FIELDS:
encryptor = cipher.encryptor()
if isinstance(value, int):
value = str(value)
if isinstance(value, bool):
if value is None:
decoded = value.encode("utf8", "replace")
length = 16 - (len(decoded) % 16)
decoded += bytes([length]) * length
ct = encryptor.update(decoded) + encryptor.finalize()
final_str = b64encode(ct)
data[index][key] = final_str.decode("utf-8", "replace")