neptune/core/views/helpers.py

754 lines
27 KiB
Python

# import re
# from base64 import b64encode
# from random import randint
# from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
# from cryptography.hazmat.primitives.ciphers.modes import ECB
# from django.conf import settings
# from siphashc import siphash
# from sortedcontainers import SortedSet
import uuid
# from core import r
from django.core.exceptions import ImproperlyConfigured
from django.core.paginator import Paginator
from django.db.models import QuerySet
from django.http import Http404, HttpResponse, HttpResponseBadRequest
from django.urls import reverse
from django.views.generic.detail import DetailView
from django.views.generic.edit import CreateView, DeleteView, UpdateView
from django.views.generic.list import ListView
from rest_framework.parsers import FormParser
from core.util import logs
log = logs.get_logger(__name__)
class RestrictedViewMixin:
"""
This mixin overrides two helpers in order to pass the user object to the filters.
get_queryset alters the objects returned for list views.
get_form_kwargs passes the request object to the form class. Remaining permissions
checks are in forms.py
"""
allow_empty = True
queryset = None
model = None
paginate_by = None
paginate_orphans = 0
context_object_name = None
paginator_class = Paginator
page_kwarg = "page"
ordering = None
def get_queryset(self, **kwargs):
"""
This function is overriden to filter the objects by the requesting user.
"""
if self.queryset is not None:
queryset = self.queryset
if isinstance(queryset, QuerySet):
# queryset = queryset.all()
queryset = queryset.filter(user=self.request.user)
elif self.model is not None:
queryset = self.model._default_manager.filter(user=self.request.user)
else:
raise ImproperlyConfigured(
"%(cls)s is missing a QuerySet. Define "
"%(cls)s.model, %(cls)s.queryset, or override "
"%(cls)s.get_queryset()." % {"cls": self.__class__.__name__}
)
if hasattr(self, "get_ordering"):
ordering = self.get_ordering()
if ordering:
if isinstance(ordering, str):
ordering = (ordering,)
queryset = queryset.order_by(*ordering)
return queryset
def get_form_kwargs(self):
"""Passes the request object to the form class.
This is necessary to only display members that belong to a given user"""
kwargs = super().get_form_kwargs()
kwargs["request"] = self.request
return kwargs
class ObjectNameMixin(object):
def __init__(self, *args, **kwargs):
if self.model is None:
self.title = self.context_object_name.title()
self.title_singular = self.context_object_name_singular.title()
else:
self.title_singular = self.model._meta.verbose_name.title() # Hook
self.context_object_name_singular = self.title_singular.lower() # hook
self.title = self.model._meta.verbose_name_plural.title() # Hooks
self.context_object_name = self.title.lower() # hooks
self.context_object_name = self.context_object_name.replace(" ", "")
self.context_object_name_singular = (
self.context_object_name_singular.replace(" ", "")
)
super().__init__(*args, **kwargs)
class ObjectList(RestrictedViewMixin, ObjectNameMixin, ListView):
allowed_types = ["modal", "widget", "window", "page"]
window_content = "window-content/objects.html"
list_template = None
page_title = None
page_subtitle = None
list_url_name = None
# WARNING: TAKEN FROM locals()
list_url_args = ["type"]
submit_url_name = None
delete_all_url_name = None
widget_options = None
# copied from BaseListView
def get(self, request, *args, **kwargs):
type = kwargs.get("type", None)
if not type:
return HttpResponseBadRequest("No type specified")
if type not in self.allowed_types:
return HttpResponseBadRequest("Invalid type specified")
self.request = request
self.object_list = self.get_queryset(**kwargs)
if isinstance(self.object_list, HttpResponse):
return self.object_list
if isinstance(self.object_list, HttpResponseBadRequest):
return self.object_list
allow_empty = self.get_allow_empty()
self.template_name = f"wm/{type}.html"
unique = str(uuid.uuid4())[:8]
list_url_args = {}
for arg in self.list_url_args:
if arg in locals():
list_url_args[arg] = locals()[arg]
elif arg in kwargs:
list_url_args[arg] = kwargs[arg]
orig_type = type
if type == "page":
type = "modal"
if not allow_empty:
# When pagination is enabled and object_list is a queryset,
# it's better to do a cheap query than to load the unpaginated
# queryset in memory.
if self.get_paginate_by(self.object_list) is not None and hasattr(
self.object_list, "exists"
):
is_empty = not self.object_list.exists()
else:
is_empty = not self.object_list
if is_empty:
raise Http404("Empty list")
context = self.get_context_data()
context["title"] = self.title + f" ({type})"
context["title_singular"] = self.title_singular
context["unique"] = unique
context["window_content"] = self.window_content
context["list_template"] = self.list_template
context["page_title"] = self.page_title
context["page_subtitle"] = self.page_subtitle
context["type"] = type
context["context_object_name"] = self.context_object_name
context["context_object_name_singular"] = self.context_object_name_singular
if self.submit_url_name is not None:
context["submit_url"] = reverse(self.submit_url_name, kwargs={"type": type})
if self.list_url_name is not None:
context["list_url"] = reverse(self.list_url_name, kwargs=list_url_args)
if self.delete_all_url_name:
context["delete_all_url"] = reverse(self.delete_all_url_name)
if self.widget_options:
context["widget_options"] = self.widget_options
# Return partials for HTMX
if self.request.htmx:
if request.headers["HX-Target"] == self.context_object_name + "-table":
self.template_name = self.list_template
elif orig_type == "page":
self.template_name = self.list_template
else:
context["window_content"] = self.list_template
return self.render_to_response(context)
class ObjectCreate(RestrictedViewMixin, ObjectNameMixin, CreateView):
allowed_types = ["modal", "widget", "window", "page"]
window_content = "window-content/object-form.html"
parser_classes = [FormParser]
page_title = None
page_subtitle = None
model = None
submit_url_name = None
submit_url_args = ["type"]
request = None
# Whether to hide the cancel button in the form
hide_cancel = False
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.title = "Create " + self.context_object_name_singular
def post_save(self, obj):
pass
def form_valid(self, form):
obj = form.save(commit=False)
if self.request is None:
raise Exception("Request is None")
obj.user = self.request.user
obj.save()
form.save_m2m()
self.post_save(obj)
context = {"message": "Object created", "class": "success"}
response = self.render_to_response(context)
response["HX-Trigger"] = f"{self.context_object_name_singular}Event"
return response
def form_invalid(self, form):
"""If the form is invalid, render the invalid form."""
return self.get(self.request, **self.kwargs, form=form)
def get(self, request, *args, **kwargs):
type = kwargs.get("type", None)
if not type:
return HttpResponseBadRequest("No type specified")
if type not in self.allowed_types:
return HttpResponseBadRequest("Invalid type specified")
self.template_name = f"wm/{type}.html"
unique = str(uuid.uuid4())[:8]
self.request = request
self.kwargs = kwargs
if type == "widget":
self.hide_cancel = True
if type == "page":
type = "modal"
self.object = None
submit_url_args = {}
for arg in self.submit_url_args:
if arg in locals():
submit_url_args[arg] = locals()[arg]
elif arg in kwargs:
submit_url_args[arg] = kwargs[arg]
submit_url = reverse(self.submit_url_name, kwargs=submit_url_args)
context = self.get_context_data()
form = kwargs.get("form", None)
if form:
context["form"] = form
context["unique"] = unique
context["window_content"] = self.window_content
context["context_object_name"] = self.context_object_name
context["context_object_name_singular"] = self.context_object_name_singular
context["submit_url"] = submit_url
context["type"] = type
context["hide_cancel"] = self.hide_cancel
if self.page_title:
context["page_title"] = self.page_title
if self.page_subtitle:
context["page_subtitle"] = self.page_subtitle
response = self.render_to_response(context)
# response["HX-Trigger"] = f"{self.context_object_name_singular}Event"
return response
def post(self, request, *args, **kwargs):
self.request = request
self.template_name = "partials/notify.html"
return super().post(request, *args, **kwargs)
class ObjectRead(RestrictedViewMixin, ObjectNameMixin, DetailView):
allowed_types = ["modal", "widget", "window", "page"]
window_content = "window-content/object.html"
detail_template = "partials/generic-detail.html"
page_title = None
page_subtitle = None
model = None
# submit_url_name = None
detail_url_name = None
# WARNING: TAKEN FROM locals()
detail_url_args = ["type"]
request = None
def get(self, request, *args, **kwargs):
type = kwargs.get("type", None)
if not type:
return HttpResponseBadRequest("No type specified")
if type not in self.allowed_types:
return HttpResponseBadRequest()
self.template_name = f"wm/{type}.html"
unique = str(uuid.uuid4())[:8]
detail_url_args = {}
for arg in self.detail_url_args:
if arg in locals():
detail_url_args[arg] = locals()[arg]
elif arg in kwargs:
detail_url_args[arg] = kwargs[arg]
self.request = request
self.object = self.get_object(**kwargs)
if isinstance(self.object, HttpResponse):
return self.object
orig_type = type
if type == "page":
type = "modal"
context = self.get_context_data()
context["title"] = self.title + f" ({type})"
context["title_singular"] = self.title_singular
context["unique"] = unique
context["window_content"] = self.window_content
context["detail_template"] = self.detail_template
if self.page_title:
context["page_title"] = self.page_title
if self.page_subtitle:
context["page_subtitle"] = self.page_subtitle
context["type"] = type
context["context_object_name"] = self.context_object_name
context["context_object_name_singular"] = self.context_object_name_singular
if self.detail_url_name is not None:
context["detail_url"] = reverse(
self.detail_url_name, kwargs=detail_url_args
)
# Return partials for HTMX
if self.request.htmx:
if request.headers["HX-Target"] == self.context_object_name + "-info":
self.template_name = self.detail_template
elif orig_type == "page":
self.template_name = self.detail_template
else:
context["window_content"] = self.detail_template
return self.render_to_response(context)
class ObjectUpdate(RestrictedViewMixin, ObjectNameMixin, UpdateView):
allowed_types = ["modal", "widget", "window", "page"]
window_content = "window-content/object-form.html"
parser_classes = [FormParser]
page_title = None
page_subtitle = None
model = None
submit_url_name = None
submit_url_args = ["type", "pk"]
request = None
# Whether pk is required in the get request
pk_required = True
# Whether to hide the cancel button in the form
hide_cancel = False
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.title = "Update " + self.context_object_name_singular
def post_save(self, obj):
pass
def form_valid(self, form):
obj = form.save(commit=False)
if self.request is None:
raise Exception("Request is None")
obj.save()
form.save_m2m()
self.post_save(obj)
context = {"message": "Object updated", "class": "success"}
response = self.render_to_response(context)
response["HX-Trigger"] = f"{self.context_object_name_singular}Event"
return response
def form_invalid(self, form):
"""If the form is invalid, render the invalid form."""
return self.get(self.request, **self.kwargs, form=form)
def get(self, request, *args, **kwargs):
self.request = request
type = kwargs.get("type", None)
pk = kwargs.get("pk", None)
if not type:
return HttpResponseBadRequest("No type specified")
if not pk:
if self.pk_required:
return HttpResponseBadRequest("No pk specified")
if type not in self.allowed_types:
return HttpResponseBadRequest("Invalid type specified")
self.template_name = f"wm/{type}.html"
unique = str(uuid.uuid4())[:8]
if type == "widget":
self.hide_cancel = True
if type == "page":
type = "modal"
self.object = self.get_object()
submit_url_args = {}
for arg in self.submit_url_args:
if arg in locals():
submit_url_args[arg] = locals()[arg]
elif arg in kwargs:
submit_url_args[arg] = kwargs[arg]
submit_url = reverse(self.submit_url_name, kwargs=submit_url_args)
context = self.get_context_data()
form = kwargs.get("form", None)
if form:
context["form"] = form
context["title"] = self.title + f" ({type})"
context["title_singular"] = self.title_singular
context["unique"] = unique
context["window_content"] = self.window_content
context["context_object_name"] = self.context_object_name
context["context_object_name_singular"] = self.context_object_name_singular
context["submit_url"] = submit_url
context["type"] = type
context["hide_cancel"] = self.hide_cancel
if self.page_title:
context["page_title"] = self.page_title
if self.page_subtitle:
context["page_subtitle"] = self.page_subtitle
response = self.render_to_response(context)
# response["HX-Trigger"] = f"{self.context_object_name_singular}Event"
return response
def post(self, request, *args, **kwargs):
self.request = request
self.template_name = "partials/notify.html"
return super().post(request, *args, **kwargs)
class ObjectDelete(RestrictedViewMixin, ObjectNameMixin, DeleteView):
model = None
template_name = "partials/notify.html"
# Overriden to prevent success URL from being used
def delete(self, request, *args, **kwargs):
"""
Call the delete() method on the fetched object and then redirect to the
success URL.
"""
self.object = self.get_object()
# success_url = self.get_success_url()
self.object.delete()
context = {"message": "Object deleted", "class": "success"}
response = self.render_to_response(context)
response["HX-Trigger"] = f"{self.context_object_name_singular}Event"
return response
# This will be used in newer Django versions, until then we get a warning
def form_valid(self, form):
"""
Call the delete() method on the fetched object.
"""
self.object = self.get_object()
self.object.delete()
context = {"message": "Object deleted", "class": "success"}
response = self.render_to_response(context)
response["HX-Trigger"] = f"{self.context_object_name_singular}Event"
return response
# from random import randint
# from timeit import timeit
# entries = 10000
# a = [
# {'ts': "sss", 'msg': randint(1, 2), str(randint(1, 2)): \
# randint(1, 2)} for x in range(entries)
# ]
# kk = ["msg", "nick"]
# call = lambda: dedup_list(a, kk)
# #print(timeit(call, number=10))
# print(dedup_list(a, kk))
# # sh-5.1$ python helpers.py
# # 1.0805372429895215
# def base36encode(number, alphabet="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"):
# """Converts an integer to a base36 string."""
# if not isinstance(number, (int)):
# raise TypeError("number must be an integer")
# base36 = ""
# sign = ""
# if number < 0:
# sign = "-"
# number = -number
# if 0 <= number < len(alphabet):
# return sign + alphabet[number]
# while number != 0:
# number, i = divmod(number, len(alphabet))
# base36 = alphabet[i] + base36
# return sign + base36
# def base36decode(number):
# return int(number, 36)
# def randomise_list(user, data):
# """
# Randomise data in a list of dictionaries.
# """
# if user.has_perm("core.bypass_randomisation"):
# return
# if isinstance(data, list):
# for index, item in enumerate(data):
# for key, value in item.items():
# if key in settings.RANDOMISE_FIELDS:
# if isinstance(value, int):
# min_val = value - (value * settings.RANDOMISE_RATIO)
# max_val = value + (value * settings.RANDOMISE_RATIO)
# new_val = randint(int(min_val), int(max_val))
# data[index][key] = new_val
# elif isinstance(data, dict):
# for key, value in data.items():
# # if key in settings.RANDOMISE_FIELDS:
# if isinstance(value, int):
# min_val = value - (value * settings.RANDOMISE_RATIO)
# max_val = value + (value * settings.RANDOMISE_RATIO)
# new_val = randint(int(min_val), int(max_val))
# data[key] = new_val
# def obfuscate_list(user, data):
# """
# Obfuscate data in a list of dictionaries.
# """
# if user.has_perm("core.bypass_obfuscation"):
# return
# for index, item in enumerate(data):
# for key, value in item.items():
# # Obfuscate a ratio of the field
# if key in settings.OBFUSCATE_FIELDS:
# length = len(value) - 1
# split = int(length * settings.OBFUSCATE_KEEP_RATIO)
# first_part = value[:split]
# second_part = value[split:]
# second_len = len(second_part)
# second_part = "*" * second_len
# data[index][key] = first_part + second_part
# # Obfuscate value based on fields
# # Example: 2022-02-02 -> 2022-02-**
# # 14:11:12 -> 14:11:**
# elif key in settings.OBFUSCATE_FIELDS_SEP:
# if "-" in value:
# sep = "-"
# value_spl = value.split("-")
# hide_num = settings.OBFUSCATE_DASH_NUM
# elif ":" in value:
# sep = ":"
# value_spl = value.split(":")
# hide_num = settings.OBFUSCATE_COLON_NUM
# first_part = value_spl[:hide_num]
# second_part = value_spl[hide_num:]
# for index_x, x in enumerate(second_part):
# x_len = len(x)
# second_part[index_x] = "*" * x_len
# result = sep.join([*first_part, *second_part])
# data[index][key] = result
# for key in settings.COMBINE_FIELDS:
# for index, item in enumerate(data):
# if key in item:
# k1, k2 = settings.COMBINE_FIELDS[key]
# if k1 in item and k2 in item:
# data[index][key] = item[k1] + item[k2]
# def hash_list(user, data, hash_keys=False):
# """
# Hash a list of dicts or a list with SipHash42.
# """
# if user.has_perm("core.bypass_hashing"):
# return
# cache = "cache.hash"
# hash_table = {}
# if isinstance(data, dict):
# data_copy = [{x: data[x]} for x in data]
# else:
# data_copy = type(data)((data))
# for index, item in enumerate(data_copy):
# if "src" in item:
# if item["src"] in settings.SAFE_SOURCES:
# continue
# if isinstance(item, dict):
# for key, value in list(item.items()):
# if (
# key not in settings.WHITELIST_FIELDS
# and key not in settings.NO_OBFUSCATE_PARAMS
# ):
# if isinstance(value, int):
# value = str(value)
# if isinstance(value, bool):
# continue
# if value is None:
# continue
# if hash_keys:
# hashed = siphash(settings.HASHING_KEY, key)
# else:
# hashed = siphash(settings.HASHING_KEY, value)
# encoded = base36encode(hashed)
# if encoded not in hash_table:
# if hash_keys:
# hash_table[encoded] = key
# else:
# hash_table[encoded] = value
# if hash_keys:
# # Rename the dict key
# data[encoded] = data.pop(key)
# else:
# data[index][key] = encoded
# elif isinstance(item, str):
# hashed = siphash(settings.HASHING_KEY, item)
# encoded = base36encode(hashed)
# if encoded not in hash_table:
# hash_table[encoded] = item
# data[index] = encoded
# if hash_table:
# r.hmset(cache, hash_table)
# def hash_lookup(user, data_dict, supplementary_data=None):
# cache = "cache.hash"
# hash_list = SortedSet()
# denied = []
# for key, value in list(data_dict.items()):
# if "source" in data_dict:
# if data_dict["source"] in settings.SAFE_SOURCES:
# continue
# if "src" in data_dict:
# if data_dict["src"] in settings.SAFE_SOURCES:
# continue
# if supplementary_data:
# if "source" in supplementary_data:
# if supplementary_data["source"] in settings.SAFE_SOURCES:
# continue
# if key in settings.SEARCH_FIELDS_DENY:
# if not user.has_perm("core.bypass_hashing"):
# data_dict[key] = SearchDenied(key=key, value=data_dict[key])
# denied.append(data_dict[key])
# if (
# key not in settings.WHITELIST_FIELDS
# and key not in settings.NO_OBFUSCATE_PARAMS
# ):
# if not value:
# continue
# # hashes = re.findall("\|([^\|]*)\|", value) # noqa
# if isinstance(value, str):
# hashes = re.findall("[A-Z0-9]{12,13}", value)
# elif isinstance(value, dict):
# hashes = []
# for key, value in value.items():
# if not value:
# continue
# hashes_iter = re.findall("[A-Z0-9]{12,13}", value)
# for h in hashes_iter:
# hashes.append(h)
# if not hashes:
# # Otherwise the user could inject plaintext search queries
# if not user.has_perm("core.bypass_hashing"):
# data_dict[key] = SearchDenied(key=key, value=data_dict[key])
# denied.append(data_dict[key])
# continue
# else:
# # There are hashes here but there shouldn't be!
# if key in settings.TAG_SEARCH_DENY:
# data_dict[key] = LookupDenied(key=key, value=data_dict[key])
# denied.append(data_dict[key])
# continue
# for hash in hashes:
# hash_list.add(hash)
# if hash_list:
# values = r.hmget(cache, *hash_list)
# if not values:
# return
# for index, val in enumerate(values):
# if val is None:
# values[index] = b"ERR"
# values = [x.decode() for x in values]
# total = dict(zip(hash_list, values))
# for key in data_dict.keys():
# for hash in total:
# if data_dict[key]:
# if isinstance(data_dict[key], str):
# if hash in data_dict[key]:
# data_dict[key] = data_dict[key].replace(
# f"{hash}", total[hash]
# )
# elif isinstance(data_dict[key], dict):
# for k2, v2 in data_dict[key].items():
# if hash in v2:
# data_dict[key][k2] = v2.repl
# ace(f"{hash}", total[hash])
# return denied
# def encrypt_list(user, data, secret):
# if user.has_perm("core.bypass_encryption"):
# return
# cipher = Cipher(algorithms.AES(secret), ECB())
# for index, item in enumerate(data):
# for key, value in item.items():
# if key not in settings.WHITELIST_FIELDS:
# encryptor = cipher.encryptor()
# if isinstance(value, int):
# value = str(value)
# if isinstance(value, bool):
# continue
# if value is None:
# continue
# decoded = value.encode("utf8", "replace")
# length = 16 - (len(decoded) % 16)
# decoded += bytes([length]) * length
# ct = encryptor.update(decoded) + encryptor.finalize()
# final_str = b64encode(ct)
# data[index][key] = final_str.decode("utf-8", "replace")