neptune/core/views/helpers.py

308 lines
11 KiB
Python

import re
from base64 import b64encode
from random import randint
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
from cryptography.hazmat.primitives.ciphers.modes import ECB
from django.conf import settings
from siphashc import siphash
from sortedcontainers import SortedSet
from core import r
class SearchDenied:
def __init__(self, key, value):
self.key = key
self.value = value
class LookupDenied:
def __init__(self, key, value):
self.key = key
self.value = value
def dedup_list(data, check_keys):
"""
Remove duplicate dictionaries from list.
"""
seen = set()
out = []
dup_count = 0
for x in data:
dedupeKey = tuple(x[k] for k in check_keys if k in x)
if dedupeKey in seen:
dup_count += 1
continue
if dup_count > 0:
out.append({"type": "control", "hidden": dup_count})
dup_count = 0
out.append(x)
seen.add(dedupeKey)
if dup_count > 0:
out.append({"type": "control", "hidden": dup_count})
return out
# from random import randint
# from timeit import timeit
# entries = 10000
# a = [
# {'ts': "sss", 'msg': randint(1, 2), str(randint(1, 2)): \
# randint(1, 2)} for x in range(entries)
# ]
# kk = ["msg", "nick"]
# call = lambda: dedup_list(a, kk)
# #print(timeit(call, number=10))
# print(dedup_list(a, kk))
# # sh-5.1$ python helpers.py
# # 1.0805372429895215
def base36encode(number, alphabet="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"):
"""Converts an integer to a base36 string."""
if not isinstance(number, (int)):
raise TypeError("number must be an integer")
base36 = ""
sign = ""
if number < 0:
sign = "-"
number = -number
if 0 <= number < len(alphabet):
return sign + alphabet[number]
while number != 0:
number, i = divmod(number, len(alphabet))
base36 = alphabet[i] + base36
return sign + base36
def base36decode(number):
return int(number, 36)
def randomise_list(user, data):
"""
Randomise data in a list of dictionaries.
"""
if user.has_perm("core.bypass_randomisation"):
return
if isinstance(data, list):
for index, item in enumerate(data):
for key, value in item.items():
if key in settings.RANDOMISE_FIELDS:
if isinstance(value, int):
min_val = value - (value * settings.RANDOMISE_RATIO)
max_val = value + (value * settings.RANDOMISE_RATIO)
new_val = randint(int(min_val), int(max_val))
data[index][key] = new_val
elif isinstance(data, dict):
for key, value in data.items():
# if key in settings.RANDOMISE_FIELDS:
if isinstance(value, int):
min_val = value - (value * settings.RANDOMISE_RATIO)
max_val = value + (value * settings.RANDOMISE_RATIO)
new_val = randint(int(min_val), int(max_val))
data[key] = new_val
def obfuscate_list(user, data):
"""
Obfuscate data in a list of dictionaries.
"""
if user.has_perm("core.bypass_obfuscation"):
return
for index, item in enumerate(data):
for key, value in item.items():
# Obfuscate a ratio of the field
if key in settings.OBFUSCATE_FIELDS:
length = len(value) - 1
split = int(length * settings.OBFUSCATE_KEEP_RATIO)
first_part = value[:split]
second_part = value[split:]
second_len = len(second_part)
second_part = "*" * second_len
data[index][key] = first_part + second_part
# Obfuscate value based on fields
# Example: 2022-02-02 -> 2022-02-**
# 14:11:12 -> 14:11:**
elif key in settings.OBFUSCATE_FIELDS_SEP:
if "-" in value:
sep = "-"
value_spl = value.split("-")
hide_num = settings.OBFUSCATE_DASH_NUM
elif ":" in value:
sep = ":"
value_spl = value.split(":")
hide_num = settings.OBFUSCATE_COLON_NUM
first_part = value_spl[:hide_num]
second_part = value_spl[hide_num:]
for index_x, x in enumerate(second_part):
x_len = len(x)
second_part[index_x] = "*" * x_len
result = sep.join([*first_part, *second_part])
data[index][key] = result
for key in settings.COMBINE_FIELDS:
for index, item in enumerate(data):
if key in item:
k1, k2 = settings.COMBINE_FIELDS[key]
if k1 in item and k2 in item:
data[index][key] = item[k1] + item[k2]
def hash_list(user, data, hash_keys=False):
"""
Hash a list of dicts or a list with SipHash42.
"""
if user.has_perm("core.bypass_hashing"):
return
cache = "cache.hash"
hash_table = {}
if isinstance(data, dict):
data_copy = [{x: data[x]} for x in data]
else:
data_copy = type(data)((data))
for index, item in enumerate(data_copy):
if "src" in item:
if item["src"] in settings.SAFE_SOURCES:
continue
if isinstance(item, dict):
for key, value in list(item.items()):
if (
key not in settings.WHITELIST_FIELDS
and key not in settings.NO_OBFUSCATE_PARAMS
):
if isinstance(value, int):
value = str(value)
if isinstance(value, bool):
continue
if value is None:
continue
if hash_keys:
hashed = siphash(settings.HASHING_KEY, key)
else:
hashed = siphash(settings.HASHING_KEY, value)
encoded = base36encode(hashed)
if encoded not in hash_table:
if hash_keys:
hash_table[encoded] = key
else:
hash_table[encoded] = value
if hash_keys:
# Rename the dict key
data[encoded] = data.pop(key)
else:
data[index][key] = encoded
elif isinstance(item, str):
hashed = siphash(settings.HASHING_KEY, item)
encoded = base36encode(hashed)
if encoded not in hash_table:
hash_table[encoded] = item
data[index] = encoded
if hash_table:
r.hmset(cache, hash_table)
def hash_lookup(user, data_dict, supplementary_data=None):
cache = "cache.hash"
hash_list = SortedSet()
denied = []
for key, value in list(data_dict.items()):
print("DATA DICT", data_dict)
if "source" in data_dict:
if data_dict["source"] in settings.SAFE_SOURCES:
continue
if supplementary_data:
if "source" in supplementary_data:
if supplementary_data["source"] in settings.SAFE_SOURCES:
continue
if key in settings.SEARCH_FIELDS_DENY:
if not user.has_perm("core.bypass_hashing"):
data_dict[key] = SearchDenied(key=key, value=data_dict[key])
denied.append(data_dict[key])
if (
key not in settings.WHITELIST_FIELDS
and key not in settings.NO_OBFUSCATE_PARAMS
):
if not value:
continue
# hashes = re.findall("\|([^\|]*)\|", value) # noqa
if isinstance(value, str):
hashes = re.findall("[A-Z0-9]{12,13}", value)
elif isinstance(value, dict):
hashes = []
for key, value in value.items():
if not value:
continue
hashes_iter = re.findall("[A-Z0-9]{12,13}", value)
for h in hashes_iter:
hashes.append(h)
if not hashes:
# Otherwise the user could inject plaintext search queries
if not user.has_perm("core.bypass_hashing"):
data_dict[key] = SearchDenied(key=key, value=data_dict[key])
denied.append(data_dict[key])
continue
else:
# There are hashes here but there shouldn't be!
if key in settings.TAG_SEARCH_DENY:
data_dict[key] = LookupDenied(key=key, value=data_dict[key])
denied.append(data_dict[key])
continue
for hash in hashes:
hash_list.add(hash)
if hash_list:
values = r.hmget(cache, *hash_list)
if not values:
return
for index, val in enumerate(values):
if val is None:
values[index] = b"ERR"
values = [x.decode() for x in values]
total = dict(zip(hash_list, values))
for key in data_dict.keys():
for hash in total:
if data_dict[key]:
if isinstance(data_dict[key], str):
if hash in data_dict[key]:
data_dict[key] = data_dict[key].replace(
f"{hash}", total[hash]
)
elif isinstance(data_dict[key], dict):
for k2, v2 in data_dict[key].items():
if hash in v2:
data_dict[key][k2] = v2.replace(f"{hash}", total[hash])
return denied
def encrypt_list(user, data, secret):
if user.has_perm("core.bypass_encryption"):
return
cipher = Cipher(algorithms.AES(secret), ECB())
for index, item in enumerate(data):
for key, value in item.items():
if key not in settings.WHITELIST_FIELDS:
encryptor = cipher.encryptor()
if isinstance(value, int):
value = str(value)
if isinstance(value, bool):
continue
if value is None:
continue
decoded = value.encode("utf8", "replace")
length = 16 - (len(decoded) % 16)
decoded += bytes([length]) * length
ct = encryptor.update(decoded) + encryptor.finalize()
final_str = b64encode(ct)
data[index][key] = final_str.decode("utf-8", "replace")