neptune/core/views/helpers.py

247 lines
8.3 KiB
Python

import re
from base64 import b64encode
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
from cryptography.hazmat.primitives.ciphers.modes import ECB
from django.conf import settings
from siphashc import siphash
from sortedcontainers import SortedSet
from core import r
class SearchDenied:
def __init__(self, value):
self.value = value
def dedup_list(data, check_keys):
"""
Remove duplicate dictionaries from list.
"""
seen = set()
out = []
dup_count = 0
for x in data:
dedupeKey = tuple(x[k] for k in check_keys if k in x)
if dedupeKey in seen:
dup_count += 1
continue
if dup_count > 0:
out.append({"type": "control", "hidden": dup_count})
dup_count = 0
out.append(x)
seen.add(dedupeKey)
if dup_count > 0:
out.append({"type": "control", "hidden": dup_count})
return out
# from random import randint
# from timeit import timeit
# entries = 10000
# a = [
# {'ts': "sss", 'msg': randint(1, 2), str(randint(1, 2)): \
# randint(1, 2)} for x in range(entries)
# ]
# kk = ["msg", "nick"]
# call = lambda: dedup_list(a, kk)
# #print(timeit(call, number=10))
# print(dedup_list(a, kk))
# # sh-5.1$ python helpers.py
# # 1.0805372429895215
def base36encode(number, alphabet="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"):
"""Converts an integer to a base36 string."""
if not isinstance(number, (int)):
raise TypeError("number must be an integer")
base36 = ""
sign = ""
if number < 0:
sign = "-"
number = -number
if 0 <= number < len(alphabet):
return sign + alphabet[number]
while number != 0:
number, i = divmod(number, len(alphabet))
base36 = alphabet[i] + base36
return sign + base36
def base36decode(number):
return int(number, 36)
def obfuscate_list(user, data):
"""
Obfuscate data in a list of dictionaries.
"""
if user.has_perm("core.bypass_obfuscation"):
return
for index, item in enumerate(data):
for key, value in item.items():
# Obfuscate a ratio of the field
if key in settings.OBFUSCATE_FIELDS:
length = len(value) - 1
split = int(length * settings.OBFUSCATE_KEEP_RATIO)
first_part = value[:split]
second_part = value[split:]
second_len = len(second_part)
second_part = "*" * second_len
data[index][key] = first_part + second_part
# Obfuscate value based on fields
# Example: 2022-02-02 -> 2022-02-**
# 14:11:12 -> 14:11:**
elif key in settings.OBFUSCATE_FIELDS_SEP:
if "-" in value:
sep = "-"
value_spl = value.split("-")
hide_num = settings.OBFUSCATE_DASH_NUM
elif ":" in value:
sep = ":"
value_spl = value.split(":")
hide_num = settings.OBFUSCATE_COLON_NUM
first_part = value_spl[:hide_num]
second_part = value_spl[hide_num:]
for index_x, x in enumerate(second_part):
x_len = len(x)
second_part[index_x] = "*" * x_len
result = sep.join([*first_part, *second_part])
data[index][key] = result
def hash_list(user, data, hash_keys=False):
"""
Hash a list of dicts or a list with SipHash42.
"""
if user.has_perm("core.bypass_hashing"):
return
cache = "cache.hash"
hash_table = {}
if isinstance(data, dict):
data_copy = [{x: data[x]} for x in data]
else:
data_copy = type(data)((data))
for index, item in enumerate(data_copy):
if isinstance(item, dict):
for key, value in list(item.items()):
if (
key not in settings.WHITELIST_FIELDS
and key not in settings.NO_OBFUSCATE_PARAMS
):
if isinstance(value, int):
value = str(value)
if isinstance(value, bool):
continue
if value is None:
continue
if hash_keys:
hashed = siphash(settings.HASHING_KEY, key)
else:
hashed = siphash(settings.HASHING_KEY, value)
encoded = base36encode(hashed)
if encoded not in hash_table:
if hash_keys:
hash_table[encoded] = key
else:
hash_table[encoded] = value
if hash_keys:
# Rename the dict key
data[encoded] = data.pop(key)
else:
data[index][key] = encoded
elif isinstance(item, str):
hashed = siphash(settings.HASHING_KEY, item)
encoded = base36encode(hashed)
if encoded not in hash_table:
hash_table[encoded] = item
data[index] = encoded
if hash_table:
r.hmset(cache, hash_table)
def hash_lookup(user, data_dict):
cache = "cache.hash"
hash_list = SortedSet()
for key, value in list(data_dict.items()):
if (
key not in settings.WHITELIST_FIELDS
and key not in settings.NO_OBFUSCATE_PARAMS
):
if not value:
continue
# hashes = re.findall("\|([^\|]*)\|", value) # noqa
if isinstance(value, str):
hashes = re.findall("[A-Z0-9]{12,13}", value)
elif isinstance(value, dict):
hashes = []
for key, value in value.items():
if not value:
continue
hashes_iter = re.findall("[A-Z0-9]{12,13}", value)
for h in hashes_iter:
hashes.append(h)
if not hashes:
# Otherwise the user could inject plaintext search queries
if not user.has_perm("bypass_hashing"):
data_dict[key] = SearchDenied(value=data_dict[key])
# del data_dict[key]
for hash in hashes:
hash_list.add(hash)
if hash_list:
values = r.hmget(cache, *hash_list)
if not values:
return
for index, val in enumerate(values):
if val is None:
values[index] = b"ERR"
values = [x.decode() for x in values]
total = dict(zip(hash_list, values))
for key in data_dict.keys():
for hash in total:
if data_dict[key]:
if isinstance(data_dict[key], str):
if hash in data_dict[key]:
print("Replacing", data_dict[key], "with", total[hash])
data_dict[key] = data_dict[key].replace(
f"{hash}", total[hash]
)
elif isinstance(data_dict[key], dict):
for k2, v2 in data_dict[key].items():
if hash in v2:
print("Replacing", v2, "with", total[hash])
data_dict[key][k2] = v2.replace(f"{hash}", total[hash])
def encrypt_list(user, data, secret):
if user.has_perm("core.bypass_encryption"):
return
cipher = Cipher(algorithms.AES(secret), ECB())
for index, item in enumerate(data):
for key, value in item.items():
if key not in settings.WHITELIST_FIELDS:
encryptor = cipher.encryptor()
if isinstance(value, int):
value = str(value)
if isinstance(value, bool):
continue
if value is None:
continue
decoded = value.encode("utf8", "replace")
length = 16 - (len(decoded) % 16)
decoded += bytes([length]) * length
ct = encryptor.update(decoded) + encryptor.finalize()
final_str = b64encode(ct)
data[index][key] = final_str.decode("utf-8", "replace")