# import re # from base64 import b64encode # from random import randint # from cryptography.hazmat.primitives.ciphers import Cipher, algorithms # from cryptography.hazmat.primitives.ciphers.modes import ECB # from django.conf import settings # from siphashc import siphash # from sortedcontainers import SortedSet import uuid # from core import r from django.conf import settings from django.core.exceptions import ImproperlyConfigured from django.core.paginator import Paginator from django.db.models import QuerySet from django.http import Http404, HttpResponse, HttpResponseBadRequest from django.urls import reverse from django.views.generic.detail import DetailView from django.views.generic.edit import CreateView, DeleteView, UpdateView from django.views.generic.list import ListView from rest_framework.parsers import FormParser from core.util import logs log = logs.get_logger(__name__) class RestrictedViewMixin: """ This mixin overrides two helpers in order to pass the user object to the filters. get_queryset alters the objects returned for list views. get_form_kwargs passes the request object to the form class. Remaining permissions checks are in forms.py """ allow_empty = True queryset = None model = None paginate_by = None paginate_orphans = 0 context_object_name = None paginator_class = Paginator page_kwarg = "page" ordering = None def get_queryset(self, **kwargs): """ This function is overriden to filter the objects by the requesting user. """ if self.queryset is not None: queryset = self.queryset if isinstance(queryset, QuerySet): # queryset = queryset.all() queryset = queryset.filter(user=self.request.user) elif self.model is not None: queryset = self.model._default_manager.filter(user=self.request.user) else: raise ImproperlyConfigured( "%(cls)s is missing a QuerySet. Define " "%(cls)s.model, %(cls)s.queryset, or override " "%(cls)s.get_queryset()." % {"cls": self.__class__.__name__} ) if hasattr(self, "get_ordering"): ordering = self.get_ordering() if ordering: if isinstance(ordering, str): ordering = (ordering,) queryset = queryset.order_by(*ordering) return queryset def get_form_kwargs(self): """Passes the request object to the form class. This is necessary to only display members that belong to a given user""" kwargs = super().get_form_kwargs() kwargs["request"] = self.request return kwargs class ObjectNameMixin(object): def __init__(self, *args, **kwargs): if self.model is None: self.title = self.context_object_name.title() self.title_singular = self.context_object_name_singular.title() else: self.title_singular = self.model._meta.verbose_name.title() # Hook self.context_object_name_singular = self.title_singular.lower() # hook self.title = self.model._meta.verbose_name_plural.title() # Hooks self.context_object_name = self.title.lower() # hooks self.context_object_name = self.context_object_name.replace(" ", "") self.context_object_name_singular = ( self.context_object_name_singular.replace(" ", "") ) super().__init__(*args, **kwargs) class ObjectList(RestrictedViewMixin, ObjectNameMixin, ListView): allowed_types = ["modal", "widget", "window", "page"] window_content = "window-content/objects.html" list_template = None page_title = None page_subtitle = None list_url_name = None # WARNING: TAKEN FROM locals() list_url_args = ["type"] submit_url_name = None delete_all_url_name = None widget_options = None # copied from BaseListView def get(self, request, *args, **kwargs): type = kwargs.get("type", None) if not type: return HttpResponseBadRequest("No type specified") if type not in self.allowed_types: return HttpResponseBadRequest("Invalid type specified") self.request = request self.object_list = self.get_queryset(**kwargs) if isinstance(self.object_list, HttpResponse): return self.object_list if isinstance(self.object_list, HttpResponseBadRequest): return self.object_list allow_empty = self.get_allow_empty() self.template_name = f"wm/{type}.html" unique = str(uuid.uuid4())[:8] list_url_args = {} for arg in self.list_url_args: if arg in locals(): list_url_args[arg] = locals()[arg] elif arg in kwargs: list_url_args[arg] = kwargs[arg] orig_type = type if type == "page": type = "modal" if not allow_empty: # When pagination is enabled and object_list is a queryset, # it's better to do a cheap query than to load the unpaginated # queryset in memory. if self.get_paginate_by(self.object_list) is not None and hasattr( self.object_list, "exists" ): is_empty = not self.object_list.exists() else: is_empty = not self.object_list if is_empty: raise Http404("Empty list") context = self.get_context_data() context["title"] = self.title + f" ({type})" context["title_singular"] = self.title_singular context["unique"] = unique context["window_content"] = self.window_content context["list_template"] = self.list_template context["page_title"] = self.page_title context["page_subtitle"] = self.page_subtitle context["type"] = type context["context_object_name"] = self.context_object_name context["context_object_name_singular"] = self.context_object_name_singular if self.submit_url_name is not None: context["submit_url"] = reverse(self.submit_url_name, kwargs={"type": type}) if self.list_url_name is not None: context["list_url"] = reverse(self.list_url_name, kwargs=list_url_args) if self.delete_all_url_name: context["delete_all_url"] = reverse(self.delete_all_url_name) if self.widget_options: context["widget_options"] = self.widget_options # Return partials for HTMX if self.request.htmx: if request.headers["HX-Target"] == self.context_object_name + "-table": self.template_name = self.list_template elif orig_type == "page": self.template_name = self.list_template else: context["window_content"] = self.list_template return self.render_to_response(context) class ObjectCreate(RestrictedViewMixin, ObjectNameMixin, CreateView): allowed_types = ["modal", "widget", "window", "page"] window_content = "window-content/object-form.html" parser_classes = [FormParser] page_title = None page_subtitle = None model = None submit_url_name = None submit_url_args = ["type"] request = None # Whether to hide the cancel button in the form hide_cancel = False def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.title = "Create " + self.context_object_name_singular def post_save(self, obj): pass def form_valid(self, form): obj = form.save(commit=False) if self.request is None: raise Exception("Request is None") obj.user = self.request.user obj.save() form.save_m2m() self.post_save(obj) context = {"message": "Object created", "class": "success"} response = self.render_to_response(context) response["HX-Trigger"] = f"{self.context_object_name_singular}Event" return response def form_invalid(self, form): """If the form is invalid, render the invalid form.""" return self.get(self.request, **self.kwargs, form=form) def get(self, request, *args, **kwargs): type = kwargs.get("type", None) if not type: return HttpResponseBadRequest("No type specified") if type not in self.allowed_types: return HttpResponseBadRequest("Invalid type specified") self.template_name = f"wm/{type}.html" unique = str(uuid.uuid4())[:8] self.request = request self.kwargs = kwargs if type == "widget": self.hide_cancel = True if type == "page": type = "modal" self.object = None submit_url_args = {} for arg in self.submit_url_args: if arg in locals(): submit_url_args[arg] = locals()[arg] elif arg in kwargs: submit_url_args[arg] = kwargs[arg] submit_url = reverse(self.submit_url_name, kwargs=submit_url_args) context = self.get_context_data() form = kwargs.get("form", None) if form: context["form"] = form context["unique"] = unique context["window_content"] = self.window_content context["context_object_name"] = self.context_object_name context["context_object_name_singular"] = self.context_object_name_singular context["submit_url"] = submit_url context["type"] = type context["hide_cancel"] = self.hide_cancel if self.page_title: context["page_title"] = self.page_title if self.page_subtitle: context["page_subtitle"] = self.page_subtitle response = self.render_to_response(context) # response["HX-Trigger"] = f"{self.context_object_name_singular}Event" return response def post(self, request, *args, **kwargs): self.request = request self.template_name = "partials/notify.html" return super().post(request, *args, **kwargs) class ObjectRead(RestrictedViewMixin, ObjectNameMixin, DetailView): allowed_types = ["modal", "widget", "window", "page"] window_content = "window-content/object.html" detail_template = "partials/generic-detail.html" page_title = None page_subtitle = None model = None # submit_url_name = None detail_url_name = None # WARNING: TAKEN FROM locals() detail_url_args = ["type"] request = None def get(self, request, *args, **kwargs): type = kwargs.get("type", None) if not type: return HttpResponseBadRequest("No type specified") if type not in self.allowed_types: return HttpResponseBadRequest() self.template_name = f"wm/{type}.html" unique = str(uuid.uuid4())[:8] detail_url_args = {} for arg in self.detail_url_args: if arg in locals(): detail_url_args[arg] = locals()[arg] elif arg in kwargs: detail_url_args[arg] = kwargs[arg] self.request = request self.object = self.get_object(**kwargs) if isinstance(self.object, HttpResponse): return self.object orig_type = type if type == "page": type = "modal" context = self.get_context_data() context["title"] = self.title + f" ({type})" context["title_singular"] = self.title_singular context["unique"] = unique context["window_content"] = self.window_content context["detail_template"] = self.detail_template if self.page_title: context["page_title"] = self.page_title if self.page_subtitle: context["page_subtitle"] = self.page_subtitle context["type"] = type context["context_object_name"] = self.context_object_name context["context_object_name_singular"] = self.context_object_name_singular if self.detail_url_name is not None: context["detail_url"] = reverse( self.detail_url_name, kwargs=detail_url_args ) # Return partials for HTMX if self.request.htmx: if request.headers["HX-Target"] == self.context_object_name + "-info": self.template_name = self.detail_template elif orig_type == "page": self.template_name = self.detail_template else: context["window_content"] = self.detail_template return self.render_to_response(context) class ObjectUpdate(RestrictedViewMixin, ObjectNameMixin, UpdateView): allowed_types = ["modal", "widget", "window", "page"] window_content = "window-content/object-form.html" parser_classes = [FormParser] page_title = None page_subtitle = None model = None submit_url_name = None submit_url_args = ["type", "pk"] request = None # Whether pk is required in the get request pk_required = True # Whether to hide the cancel button in the form hide_cancel = False def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.title = "Update " + self.context_object_name_singular def post_save(self, obj): pass def form_valid(self, form): obj = form.save(commit=False) if self.request is None: raise Exception("Request is None") obj.save() form.save_m2m() self.post_save(obj) context = {"message": "Object updated", "class": "success"} response = self.render_to_response(context) response["HX-Trigger"] = f"{self.context_object_name_singular}Event" return response def form_invalid(self, form): """If the form is invalid, render the invalid form.""" return self.get(self.request, **self.kwargs, form=form) def get(self, request, *args, **kwargs): self.request = request type = kwargs.get("type", None) pk = kwargs.get("pk", None) if not type: return HttpResponseBadRequest("No type specified") if not pk: if self.pk_required: return HttpResponseBadRequest("No pk specified") if type not in self.allowed_types: return HttpResponseBadRequest("Invalid type specified") self.template_name = f"wm/{type}.html" unique = str(uuid.uuid4())[:8] if type == "widget": self.hide_cancel = True if type == "page": type = "modal" self.object = self.get_object() submit_url_args = {} for arg in self.submit_url_args: if arg in locals(): submit_url_args[arg] = locals()[arg] elif arg in kwargs: submit_url_args[arg] = kwargs[arg] submit_url = reverse(self.submit_url_name, kwargs=submit_url_args) context = self.get_context_data() form = kwargs.get("form", None) if form: context["form"] = form context["title"] = self.title + f" ({type})" context["title_singular"] = self.title_singular context["unique"] = unique context["window_content"] = self.window_content context["context_object_name"] = self.context_object_name context["context_object_name_singular"] = self.context_object_name_singular context["submit_url"] = submit_url context["type"] = type context["hide_cancel"] = self.hide_cancel if self.page_title: context["page_title"] = self.page_title if self.page_subtitle: context["page_subtitle"] = self.page_subtitle response = self.render_to_response(context) # response["HX-Trigger"] = f"{self.context_object_name_singular}Event" return response def post(self, request, *args, **kwargs): self.request = request self.template_name = "partials/notify.html" return super().post(request, *args, **kwargs) class ObjectDelete(RestrictedViewMixin, ObjectNameMixin, DeleteView): model = None template_name = "partials/notify.html" # Overriden to prevent success URL from being used def delete(self, request, *args, **kwargs): """ Call the delete() method on the fetched object and then redirect to the success URL. """ self.object = self.get_object() # success_url = self.get_success_url() self.object.delete() context = {"message": "Object deleted", "class": "success"} response = self.render_to_response(context) response["HX-Trigger"] = f"{self.context_object_name_singular}Event" return response # This will be used in newer Django versions, until then we get a warning def form_valid(self, form): """ Call the delete() method on the fetched object. """ self.object = self.get_object() self.object.delete() context = {"message": "Object deleted", "class": "success"} response = self.render_to_response(context) response["HX-Trigger"] = f"{self.context_object_name_singular}Event" return response class SearchDenied: def __init__(self, key, value): self.key = key self.value = value class LookupDenied: def __init__(self, key, value): self.key = key self.value = value def remove_defaults(query_params): for field, value in list(query_params.items()): if field in settings.DRILLDOWN_DEFAULT_PARAMS: if value == settings.DRILLDOWN_DEFAULT_PARAMS[field]: del query_params[field] def add_defaults(query_params): for field, value in settings.DRILLDOWN_DEFAULT_PARAMS.items(): if field not in query_params: query_params[field] = value def dedup_list(data, check_keys): """ Remove duplicate dictionaries from list. """ seen = set() out = [] dup_count = 0 for x in data: dedupeKey = tuple(x[k] for k in check_keys if k in x) if dedupeKey in seen: dup_count += 1 continue if dup_count > 0: out.append({"type": "control", "hidden": dup_count}) dup_count = 0 out.append(x) seen.add(dedupeKey) if dup_count > 0: out.append({"type": "control", "hidden": dup_count}) return out # from random import randint # from timeit import timeit # entries = 10000 # a = [ # {'ts': "sss", 'msg': randint(1, 2), str(randint(1, 2)): \ # randint(1, 2)} for x in range(entries) # ] # kk = ["msg", "nick"] # call = lambda: dedup_list(a, kk) # #print(timeit(call, number=10)) # print(dedup_list(a, kk)) # # sh-5.1$ python helpers.py # # 1.0805372429895215 # def base36encode(number, alphabet="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"): # """Converts an integer to a base36 string.""" # if not isinstance(number, (int)): # raise TypeError("number must be an integer") # base36 = "" # sign = "" # if number < 0: # sign = "-" # number = -number # if 0 <= number < len(alphabet): # return sign + alphabet[number] # while number != 0: # number, i = divmod(number, len(alphabet)) # base36 = alphabet[i] + base36 # return sign + base36 # def base36decode(number): # return int(number, 36) # def randomise_list(user, data): # """ # Randomise data in a list of dictionaries. # """ # if user.has_perm("core.bypass_randomisation"): # return # if isinstance(data, list): # for index, item in enumerate(data): # for key, value in item.items(): # if key in settings.RANDOMISE_FIELDS: # if isinstance(value, int): # min_val = value - (value * settings.RANDOMISE_RATIO) # max_val = value + (value * settings.RANDOMISE_RATIO) # new_val = randint(int(min_val), int(max_val)) # data[index][key] = new_val # elif isinstance(data, dict): # for key, value in data.items(): # # if key in settings.RANDOMISE_FIELDS: # if isinstance(value, int): # min_val = value - (value * settings.RANDOMISE_RATIO) # max_val = value + (value * settings.RANDOMISE_RATIO) # new_val = randint(int(min_val), int(max_val)) # data[key] = new_val # def obfuscate_list(user, data): # """ # Obfuscate data in a list of dictionaries. # """ # if user.has_perm("core.bypass_obfuscation"): # return # for index, item in enumerate(data): # for key, value in item.items(): # # Obfuscate a ratio of the field # if key in settings.OBFUSCATE_FIELDS: # length = len(value) - 1 # split = int(length * settings.OBFUSCATE_KEEP_RATIO) # first_part = value[:split] # second_part = value[split:] # second_len = len(second_part) # second_part = "*" * second_len # data[index][key] = first_part + second_part # # Obfuscate value based on fields # # Example: 2022-02-02 -> 2022-02-** # # 14:11:12 -> 14:11:** # elif key in settings.OBFUSCATE_FIELDS_SEP: # if "-" in value: # sep = "-" # value_spl = value.split("-") # hide_num = settings.OBFUSCATE_DASH_NUM # elif ":" in value: # sep = ":" # value_spl = value.split(":") # hide_num = settings.OBFUSCATE_COLON_NUM # first_part = value_spl[:hide_num] # second_part = value_spl[hide_num:] # for index_x, x in enumerate(second_part): # x_len = len(x) # second_part[index_x] = "*" * x_len # result = sep.join([*first_part, *second_part]) # data[index][key] = result # for key in settings.COMBINE_FIELDS: # for index, item in enumerate(data): # if key in item: # k1, k2 = settings.COMBINE_FIELDS[key] # if k1 in item and k2 in item: # data[index][key] = item[k1] + item[k2] # def hash_list(user, data, hash_keys=False): # """ # Hash a list of dicts or a list with SipHash42. # """ # if user.has_perm("core.bypass_hashing"): # return # cache = "cache.hash" # hash_table = {} # if isinstance(data, dict): # data_copy = [{x: data[x]} for x in data] # else: # data_copy = type(data)((data)) # for index, item in enumerate(data_copy): # if "src" in item: # if item["src"] in settings.SAFE_SOURCES: # continue # if isinstance(item, dict): # for key, value in list(item.items()): # if ( # key not in settings.WHITELIST_FIELDS # and key not in settings.NO_OBFUSCATE_PARAMS # ): # if isinstance(value, int): # value = str(value) # if isinstance(value, bool): # continue # if value is None: # continue # if hash_keys: # hashed = siphash(settings.HASHING_KEY, key) # else: # hashed = siphash(settings.HASHING_KEY, value) # encoded = base36encode(hashed) # if encoded not in hash_table: # if hash_keys: # hash_table[encoded] = key # else: # hash_table[encoded] = value # if hash_keys: # # Rename the dict key # data[encoded] = data.pop(key) # else: # data[index][key] = encoded # elif isinstance(item, str): # hashed = siphash(settings.HASHING_KEY, item) # encoded = base36encode(hashed) # if encoded not in hash_table: # hash_table[encoded] = item # data[index] = encoded # if hash_table: # r.hmset(cache, hash_table) # def hash_lookup(user, data_dict, supplementary_data=None): # cache = "cache.hash" # hash_list = SortedSet() # denied = [] # for key, value in list(data_dict.items()): # if "source" in data_dict: # if data_dict["source"] in settings.SAFE_SOURCES: # continue # if "src" in data_dict: # if data_dict["src"] in settings.SAFE_SOURCES: # continue # if supplementary_data: # if "source" in supplementary_data: # if supplementary_data["source"] in settings.SAFE_SOURCES: # continue # if key in settings.SEARCH_FIELDS_DENY: # if not user.has_perm("core.bypass_hashing"): # data_dict[key] = SearchDenied(key=key, value=data_dict[key]) # denied.append(data_dict[key]) # if ( # key not in settings.WHITELIST_FIELDS # and key not in settings.NO_OBFUSCATE_PARAMS # ): # if not value: # continue # # hashes = re.findall("\|([^\|]*)\|", value) # noqa # if isinstance(value, str): # hashes = re.findall("[A-Z0-9]{12,13}", value) # elif isinstance(value, dict): # hashes = [] # for key, value in value.items(): # if not value: # continue # hashes_iter = re.findall("[A-Z0-9]{12,13}", value) # for h in hashes_iter: # hashes.append(h) # if not hashes: # # Otherwise the user could inject plaintext search queries # if not user.has_perm("core.bypass_hashing"): # data_dict[key] = SearchDenied(key=key, value=data_dict[key]) # denied.append(data_dict[key]) # continue # else: # # There are hashes here but there shouldn't be! # if key in settings.TAG_SEARCH_DENY: # data_dict[key] = LookupDenied(key=key, value=data_dict[key]) # denied.append(data_dict[key]) # continue # for hash in hashes: # hash_list.add(hash) # if hash_list: # values = r.hmget(cache, *hash_list) # if not values: # return # for index, val in enumerate(values): # if val is None: # values[index] = b"ERR" # values = [x.decode() for x in values] # total = dict(zip(hash_list, values)) # for key in data_dict.keys(): # for hash in total: # if data_dict[key]: # if isinstance(data_dict[key], str): # if hash in data_dict[key]: # data_dict[key] = data_dict[key].replace( # f"{hash}", total[hash] # ) # elif isinstance(data_dict[key], dict): # for k2, v2 in data_dict[key].items(): # if hash in v2: # data_dict[key][k2] = v2.repl # ace(f"{hash}", total[hash]) # return denied # def encrypt_list(user, data, secret): # if user.has_perm("core.bypass_encryption"): # return # cipher = Cipher(algorithms.AES(secret), ECB()) # for index, item in enumerate(data): # for key, value in item.items(): # if key not in settings.WHITELIST_FIELDS: # encryptor = cipher.encryptor() # if isinstance(value, int): # value = str(value) # if isinstance(value, bool): # continue # if value is None: # continue # decoded = value.encode("utf8", "replace") # length = 16 - (len(decoded) % 16) # decoded += bytes([length]) * length # ct = encryptor.update(decoded) + encryptor.finalize() # final_str = b64encode(ct) # data[index][key] = final_str.decode("utf-8", "replace")