neptune/core/views/helpers.py

171 lines
5.2 KiB
Python
Raw Normal View History

2022-08-18 06:20:30 +00:00
import re
from base64 import b64encode
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
from cryptography.hazmat.primitives.ciphers.modes import ECB
from django.conf import settings
from siphashc import siphash
from sortedcontainers import SortedSet
from core import r
def dedup_list(data, check_keys):
"""
Remove duplicate dictionaries from list.
"""
seen = set()
out = []
dup_count = 0
for x in data:
dedupeKey = tuple(x[k] for k in check_keys if k in x)
if dedupeKey in seen:
dup_count += 1
continue
if dup_count > 0:
out.append({"type": "control", "hidden": dup_count})
dup_count = 0
out.append(x)
seen.add(dedupeKey)
if dup_count > 0:
out.append({"type": "control", "hidden": dup_count})
return out
# from random import randint
# from timeit import timeit
# entries = 10000
# a = [
2022-08-16 07:58:35 +00:00
# {'ts': "sss", 'msg': randint(1, 2), str(randint(1, 2)): \
# randint(1, 2)} for x in range(entries)
# ]
# kk = ["msg", "nick"]
# call = lambda: dedup_list(a, kk)
# #print(timeit(call, number=10))
# print(dedup_list(a, kk))
2022-08-16 07:58:35 +00:00
# # sh-5.1$ python helpers.py
# # 1.0805372429895215
2022-08-18 06:20:30 +00:00
def base36encode(number, alphabet="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"):
"""Converts an integer to a base36 string."""
if not isinstance(number, (int)):
raise TypeError("number must be an integer")
base36 = ""
sign = ""
if number < 0:
sign = "-"
number = -number
if 0 <= number < len(alphabet):
return sign + alphabet[number]
while number != 0:
number, i = divmod(number, len(alphabet))
base36 = alphabet[i] + base36
return sign + base36
def base36decode(number):
return int(number, 36)
def hash_list(data, hash_keys=False):
"""
Hash a list of dicts or a list with SipHash42.
"""
cache = "cache.hash"
hash_table = {}
if isinstance(data, dict):
data_copy = [{x: data[x]} for x in data]
else:
data_copy = type(data)((data))
for index, item in enumerate(data_copy):
if isinstance(item, dict):
for key, value in list(item.items()):
if key not in settings.WHITELIST_FIELDS:
if isinstance(value, int):
value = str(value)
if isinstance(value, bool):
continue
if value is None:
continue
if hash_keys:
hashed = siphash(settings.HASHING_KEY, key)
else:
hashed = siphash(settings.HASHING_KEY, value)
encoded = base36encode(hashed)
if encoded not in hash_table:
if hash_keys:
hash_table[encoded] = key
else:
hash_table[encoded] = value
if hash_keys:
# Rename the dict key
data[encoded] = data.pop(key)
else:
data[index][key] = encoded
elif isinstance(item, str):
hashed = siphash(settings.HASHING_KEY, item)
encoded = base36encode(hashed)
if encoded not in hash_table:
hash_table[encoded] = item
data[index] = encoded
if hash_table:
r.hmset(cache, hash_table)
def hash_lookup(data_dict):
cache = "cache.hash"
hash_list = SortedSet()
for key, value in data_dict.items():
if not value:
continue
hashes = re.findall("\|([^\|]*)\|", value) # noqa
if not hashes:
continue
for hash in hashes:
hash_list.add(hash)
if hash_list:
values = r.hmget(cache, *hash_list)
if not values:
return
for index, val in enumerate(values):
if not val:
values[index] = "ERR"
values = [x.decode() for x in values]
total = dict(zip(hash_list, values))
for key in data_dict.keys():
for hash in total:
if data_dict[key]:
if hash in data_dict[key]:
data_dict[key] = data_dict[key].replace(
f"|{hash}|", total[hash]
)
def encrypt_list(data, secret):
cipher = Cipher(algorithms.AES(secret), ECB())
for index, item in enumerate(data):
for key, value in item.items():
if key not in settings.WHITELIST_FIELDS:
encryptor = cipher.encryptor()
if isinstance(value, int):
value = str(value)
if isinstance(value, bool):
continue
if value is None:
continue
decoded = value.encode("utf8", "replace")
length = 16 - (len(decoded) % 16)
decoded += bytes([length]) * length
ct = encryptor.update(decoded) + encryptor.finalize()
final_str = b64encode(ct)
data[index][key] = final_str.decode("utf-8", "replace")