Switch database location and use mixins for CRUD
This commit is contained in:
parent
115c6dd1ad
commit
df273a6009
|
@ -2,6 +2,7 @@ from django import forms
|
|||
from django.contrib.auth.forms import UserCreationForm
|
||||
from django.core.exceptions import FieldDoesNotExist
|
||||
from django.forms import ModelForm
|
||||
from mixins.restrictions import RestrictedFormMixin
|
||||
|
||||
from core.db.storage import db
|
||||
from core.lib.parsing import QueryError
|
||||
|
@ -12,36 +13,6 @@ from .models import NotificationRule, NotificationSettings, User
|
|||
# flake8: noqa: E501
|
||||
|
||||
|
||||
class RestrictedFormMixin:
|
||||
"""
|
||||
This mixin is used to restrict the queryset of a form to the current user.
|
||||
The request object is passed from the view.
|
||||
Fieldargs is used to pass additional arguments to the queryset filter.
|
||||
"""
|
||||
|
||||
fieldargs = {}
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
# self.fieldargs = {}
|
||||
self.request = kwargs.pop("request")
|
||||
super().__init__(*args, **kwargs)
|
||||
for field in self.fields:
|
||||
# Check it's not something like a CharField which has no queryset
|
||||
if not hasattr(self.fields[field], "queryset"):
|
||||
continue
|
||||
|
||||
model = self.fields[field].queryset.model
|
||||
# Check if the model has a user field
|
||||
try:
|
||||
model._meta.get_field("user")
|
||||
# Add the user to the queryset filters
|
||||
self.fields[field].queryset = model.objects.filter(
|
||||
user=self.request.user, **self.fieldargs.get(field, {})
|
||||
)
|
||||
except FieldDoesNotExist:
|
||||
pass
|
||||
|
||||
|
||||
class NewUserForm(UserCreationForm):
|
||||
email = forms.EmailField(required=True)
|
||||
|
||||
|
|
|
@ -42,7 +42,7 @@
|
|||
{% endblock %}
|
||||
|
||||
{% block modal_content %}
|
||||
{% include 'partials/notify.html' %}
|
||||
{% include 'mixins/partials/notify.html' %}
|
||||
<div class="tabs is-toggle is-fullwidth is-info" id="tabs-{{ unique }}">
|
||||
<ul>
|
||||
<li class="is-active" data-tab="1">
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
{% endblock %}
|
||||
|
||||
{% block panel_content %}
|
||||
{% include 'partials/notify.html' %}
|
||||
{% include 'mixins/partials/notify.html' %}
|
||||
{% if cache is not None %}
|
||||
<span class="icon has-tooltip-bottom" data-tooltip="Cached">
|
||||
<i class="fa-solid fa-database"></i>
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
{% include 'partials/notify.html' %}
|
||||
{% include 'mixins/partials/notify.html' %}
|
||||
|
||||
<table
|
||||
class="table is-fullwidth is-hoverable"
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
hx-swap="outerHTML">
|
||||
</div>
|
||||
<div id="info">
|
||||
{% include 'partials/notify.html' %}
|
||||
{% include 'mixins/partials/notify.html' %}
|
||||
{% if item is not None %}
|
||||
<div class="content" style="max-height: 30em; overflow: auto;">
|
||||
<div class="table-container">
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
{% extends "base.html" %}
|
||||
{% load static %}
|
||||
{% block content %}
|
||||
{% include 'partials/notify.html' %}
|
||||
{% include 'mixins/partials/notify.html' %}
|
||||
<script src="{% static 'tabs.js' %}"></script>
|
||||
<style>
|
||||
.icon { border-bottom: 0px !important;}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
{% include 'partials/notify.html' %}
|
||||
{% include 'mixins/partials/notify.html' %}
|
||||
{% if page_title is not None %}
|
||||
<h1 class="title is-4">{{ page_title }}</h1>
|
||||
{% endif %}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
{% include 'partials/notify.html' %}
|
||||
{% include 'mixins/partials/notify.html' %}
|
||||
{% if page_title is not None %}
|
||||
<h1 class="title is-4">{{ page_title }}</h1>
|
||||
{% endif %}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
{% include 'partials/notify.html' %}
|
||||
{% include 'mixins/partials/notify.html' %}
|
||||
{% if page_title is not None %}
|
||||
<h1 class="title is-4">{{ page_title }}</h1>
|
||||
{% endif %}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{% load static %}
|
||||
|
||||
{% include 'partials/notify.html' %}
|
||||
{% include 'mixins/partials/notify.html' %}
|
||||
{% if cache is not None %}
|
||||
<span class="icon has-tooltip-bottom" data-tooltip="Cached">
|
||||
<i class="fa-solid fa-database"></i>
|
||||
|
|
|
@ -1,753 +0,0 @@
|
|||
# import re
|
||||
# from base64 import b64encode
|
||||
# from random import randint
|
||||
|
||||
# from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
|
||||
# from cryptography.hazmat.primitives.ciphers.modes import ECB
|
||||
# from django.conf import settings
|
||||
# from siphashc import siphash
|
||||
# from sortedcontainers import SortedSet
|
||||
|
||||
import uuid
|
||||
|
||||
# from core import r
|
||||
from django.core.exceptions import ImproperlyConfigured
|
||||
from django.core.paginator import Paginator
|
||||
from django.db.models import QuerySet
|
||||
from django.http import Http404, HttpResponse, HttpResponseBadRequest
|
||||
from django.urls import reverse
|
||||
from django.views.generic.detail import DetailView
|
||||
from django.views.generic.edit import CreateView, DeleteView, UpdateView
|
||||
from django.views.generic.list import ListView
|
||||
from rest_framework.parsers import FormParser
|
||||
|
||||
from core.util import logs
|
||||
|
||||
log = logs.get_logger(__name__)
|
||||
|
||||
|
||||
class RestrictedViewMixin:
|
||||
"""
|
||||
This mixin overrides two helpers in order to pass the user object to the filters.
|
||||
get_queryset alters the objects returned for list views.
|
||||
get_form_kwargs passes the request object to the form class. Remaining permissions
|
||||
checks are in forms.py
|
||||
"""
|
||||
|
||||
allow_empty = True
|
||||
queryset = None
|
||||
model = None
|
||||
paginate_by = None
|
||||
paginate_orphans = 0
|
||||
context_object_name = None
|
||||
paginator_class = Paginator
|
||||
page_kwarg = "page"
|
||||
ordering = None
|
||||
|
||||
def get_queryset(self, **kwargs):
|
||||
"""
|
||||
This function is overriden to filter the objects by the requesting user.
|
||||
"""
|
||||
if self.queryset is not None:
|
||||
queryset = self.queryset
|
||||
if isinstance(queryset, QuerySet):
|
||||
# queryset = queryset.all()
|
||||
queryset = queryset.filter(user=self.request.user)
|
||||
elif self.model is not None:
|
||||
queryset = self.model._default_manager.filter(user=self.request.user)
|
||||
else:
|
||||
raise ImproperlyConfigured(
|
||||
"%(cls)s is missing a QuerySet. Define "
|
||||
"%(cls)s.model, %(cls)s.queryset, or override "
|
||||
"%(cls)s.get_queryset()." % {"cls": self.__class__.__name__}
|
||||
)
|
||||
if hasattr(self, "get_ordering"):
|
||||
ordering = self.get_ordering()
|
||||
if ordering:
|
||||
if isinstance(ordering, str):
|
||||
ordering = (ordering,)
|
||||
queryset = queryset.order_by(*ordering)
|
||||
|
||||
return queryset
|
||||
|
||||
def get_form_kwargs(self):
|
||||
"""Passes the request object to the form class.
|
||||
This is necessary to only display members that belong to a given user"""
|
||||
|
||||
kwargs = super().get_form_kwargs()
|
||||
kwargs["request"] = self.request
|
||||
return kwargs
|
||||
|
||||
|
||||
class ObjectNameMixin(object):
|
||||
def __init__(self, *args, **kwargs):
|
||||
if self.model is None:
|
||||
self.title = self.context_object_name.title()
|
||||
self.title_singular = self.context_object_name_singular.title()
|
||||
else:
|
||||
self.title_singular = self.model._meta.verbose_name.title() # Hook
|
||||
self.context_object_name_singular = self.title_singular.lower() # hook
|
||||
self.title = self.model._meta.verbose_name_plural.title() # Hooks
|
||||
self.context_object_name = self.title.lower() # hooks
|
||||
|
||||
self.context_object_name = self.context_object_name.replace(" ", "")
|
||||
self.context_object_name_singular = (
|
||||
self.context_object_name_singular.replace(" ", "")
|
||||
)
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
|
||||
class ObjectList(RestrictedViewMixin, ObjectNameMixin, ListView):
|
||||
allowed_types = ["modal", "widget", "window", "page"]
|
||||
window_content = "window-content/objects.html"
|
||||
list_template = None
|
||||
|
||||
page_title = None
|
||||
page_subtitle = None
|
||||
|
||||
list_url_name = None
|
||||
# WARNING: TAKEN FROM locals()
|
||||
list_url_args = ["type"]
|
||||
|
||||
submit_url_name = None
|
||||
|
||||
delete_all_url_name = None
|
||||
widget_options = None
|
||||
|
||||
# copied from BaseListView
|
||||
def get(self, request, *args, **kwargs):
|
||||
type = kwargs.get("type", None)
|
||||
if not type:
|
||||
return HttpResponseBadRequest("No type specified")
|
||||
if type not in self.allowed_types:
|
||||
return HttpResponseBadRequest("Invalid type specified")
|
||||
|
||||
self.request = request
|
||||
self.object_list = self.get_queryset(**kwargs)
|
||||
if isinstance(self.object_list, HttpResponse):
|
||||
return self.object_list
|
||||
if isinstance(self.object_list, HttpResponseBadRequest):
|
||||
return self.object_list
|
||||
allow_empty = self.get_allow_empty()
|
||||
|
||||
self.template_name = f"wm/{type}.html"
|
||||
unique = str(uuid.uuid4())[:8]
|
||||
|
||||
list_url_args = {}
|
||||
for arg in self.list_url_args:
|
||||
if arg in locals():
|
||||
list_url_args[arg] = locals()[arg]
|
||||
elif arg in kwargs:
|
||||
list_url_args[arg] = kwargs[arg]
|
||||
|
||||
orig_type = type
|
||||
if type == "page":
|
||||
type = "modal"
|
||||
|
||||
if not allow_empty:
|
||||
# When pagination is enabled and object_list is a queryset,
|
||||
# it's better to do a cheap query than to load the unpaginated
|
||||
# queryset in memory.
|
||||
if self.get_paginate_by(self.object_list) is not None and hasattr(
|
||||
self.object_list, "exists"
|
||||
):
|
||||
is_empty = not self.object_list.exists()
|
||||
else:
|
||||
is_empty = not self.object_list
|
||||
if is_empty:
|
||||
raise Http404("Empty list")
|
||||
|
||||
context = self.get_context_data()
|
||||
context["title"] = self.title + f" ({type})"
|
||||
context["title_singular"] = self.title_singular
|
||||
context["unique"] = unique
|
||||
context["window_content"] = self.window_content
|
||||
context["list_template"] = self.list_template
|
||||
context["page_title"] = self.page_title
|
||||
context["page_subtitle"] = self.page_subtitle
|
||||
context["type"] = type
|
||||
context["context_object_name"] = self.context_object_name
|
||||
context["context_object_name_singular"] = self.context_object_name_singular
|
||||
|
||||
if self.submit_url_name is not None:
|
||||
context["submit_url"] = reverse(self.submit_url_name, kwargs={"type": type})
|
||||
|
||||
if self.list_url_name is not None:
|
||||
context["list_url"] = reverse(self.list_url_name, kwargs=list_url_args)
|
||||
|
||||
if self.delete_all_url_name:
|
||||
context["delete_all_url"] = reverse(self.delete_all_url_name)
|
||||
if self.widget_options:
|
||||
context["widget_options"] = self.widget_options
|
||||
|
||||
# Return partials for HTMX
|
||||
if self.request.htmx:
|
||||
if request.headers["HX-Target"] == self.context_object_name + "-table":
|
||||
self.template_name = self.list_template
|
||||
elif orig_type == "page":
|
||||
self.template_name = self.list_template
|
||||
else:
|
||||
context["window_content"] = self.list_template
|
||||
return self.render_to_response(context)
|
||||
|
||||
|
||||
class ObjectCreate(RestrictedViewMixin, ObjectNameMixin, CreateView):
|
||||
allowed_types = ["modal", "widget", "window", "page"]
|
||||
window_content = "window-content/object-form.html"
|
||||
parser_classes = [FormParser]
|
||||
|
||||
page_title = None
|
||||
page_subtitle = None
|
||||
|
||||
model = None
|
||||
submit_url_name = None
|
||||
submit_url_args = ["type"]
|
||||
|
||||
request = None
|
||||
|
||||
# Whether to hide the cancel button in the form
|
||||
hide_cancel = False
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.title = "Create " + self.context_object_name_singular
|
||||
|
||||
def post_save(self, obj):
|
||||
pass
|
||||
|
||||
def form_valid(self, form):
|
||||
obj = form.save(commit=False)
|
||||
if self.request is None:
|
||||
raise Exception("Request is None")
|
||||
obj.user = self.request.user
|
||||
obj.save()
|
||||
form.save_m2m()
|
||||
self.post_save(obj)
|
||||
context = {"message": "Object created", "class": "success"}
|
||||
response = self.render_to_response(context)
|
||||
response["HX-Trigger"] = f"{self.context_object_name_singular}Event"
|
||||
return response
|
||||
|
||||
def form_invalid(self, form):
|
||||
"""If the form is invalid, render the invalid form."""
|
||||
return self.get(self.request, **self.kwargs, form=form)
|
||||
|
||||
def get(self, request, *args, **kwargs):
|
||||
type = kwargs.get("type", None)
|
||||
if not type:
|
||||
return HttpResponseBadRequest("No type specified")
|
||||
if type not in self.allowed_types:
|
||||
return HttpResponseBadRequest("Invalid type specified")
|
||||
self.template_name = f"wm/{type}.html"
|
||||
unique = str(uuid.uuid4())[:8]
|
||||
|
||||
self.request = request
|
||||
self.kwargs = kwargs
|
||||
|
||||
if type == "widget":
|
||||
self.hide_cancel = True
|
||||
|
||||
if type == "page":
|
||||
type = "modal"
|
||||
|
||||
self.object = None
|
||||
|
||||
submit_url_args = {}
|
||||
for arg in self.submit_url_args:
|
||||
if arg in locals():
|
||||
submit_url_args[arg] = locals()[arg]
|
||||
elif arg in kwargs:
|
||||
submit_url_args[arg] = kwargs[arg]
|
||||
submit_url = reverse(self.submit_url_name, kwargs=submit_url_args)
|
||||
|
||||
context = self.get_context_data()
|
||||
form = kwargs.get("form", None)
|
||||
if form:
|
||||
context["form"] = form
|
||||
context["unique"] = unique
|
||||
context["window_content"] = self.window_content
|
||||
context["context_object_name"] = self.context_object_name
|
||||
context["context_object_name_singular"] = self.context_object_name_singular
|
||||
context["submit_url"] = submit_url
|
||||
context["type"] = type
|
||||
context["hide_cancel"] = self.hide_cancel
|
||||
if self.page_title:
|
||||
context["page_title"] = self.page_title
|
||||
if self.page_subtitle:
|
||||
context["page_subtitle"] = self.page_subtitle
|
||||
response = self.render_to_response(context)
|
||||
# response["HX-Trigger"] = f"{self.context_object_name_singular}Event"
|
||||
return response
|
||||
|
||||
def post(self, request, *args, **kwargs):
|
||||
self.request = request
|
||||
self.template_name = "partials/notify.html"
|
||||
return super().post(request, *args, **kwargs)
|
||||
|
||||
|
||||
class ObjectRead(RestrictedViewMixin, ObjectNameMixin, DetailView):
|
||||
allowed_types = ["modal", "widget", "window", "page"]
|
||||
window_content = "window-content/object.html"
|
||||
detail_template = "partials/generic-detail.html"
|
||||
|
||||
page_title = None
|
||||
page_subtitle = None
|
||||
|
||||
model = None
|
||||
# submit_url_name = None
|
||||
|
||||
detail_url_name = None
|
||||
# WARNING: TAKEN FROM locals()
|
||||
detail_url_args = ["type"]
|
||||
|
||||
request = None
|
||||
|
||||
def get(self, request, *args, **kwargs):
|
||||
type = kwargs.get("type", None)
|
||||
if not type:
|
||||
return HttpResponseBadRequest("No type specified")
|
||||
if type not in self.allowed_types:
|
||||
return HttpResponseBadRequest()
|
||||
self.template_name = f"wm/{type}.html"
|
||||
unique = str(uuid.uuid4())[:8]
|
||||
|
||||
detail_url_args = {}
|
||||
for arg in self.detail_url_args:
|
||||
if arg in locals():
|
||||
detail_url_args[arg] = locals()[arg]
|
||||
elif arg in kwargs:
|
||||
detail_url_args[arg] = kwargs[arg]
|
||||
|
||||
self.request = request
|
||||
self.object = self.get_object(**kwargs)
|
||||
if isinstance(self.object, HttpResponse):
|
||||
return self.object
|
||||
|
||||
orig_type = type
|
||||
if type == "page":
|
||||
type = "modal"
|
||||
|
||||
context = self.get_context_data()
|
||||
|
||||
context["title"] = self.title + f" ({type})"
|
||||
context["title_singular"] = self.title_singular
|
||||
context["unique"] = unique
|
||||
context["window_content"] = self.window_content
|
||||
context["detail_template"] = self.detail_template
|
||||
if self.page_title:
|
||||
context["page_title"] = self.page_title
|
||||
if self.page_subtitle:
|
||||
context["page_subtitle"] = self.page_subtitle
|
||||
context["type"] = type
|
||||
context["context_object_name"] = self.context_object_name
|
||||
context["context_object_name_singular"] = self.context_object_name_singular
|
||||
|
||||
if self.detail_url_name is not None:
|
||||
context["detail_url"] = reverse(
|
||||
self.detail_url_name, kwargs=detail_url_args
|
||||
)
|
||||
|
||||
# Return partials for HTMX
|
||||
if self.request.htmx:
|
||||
if request.headers["HX-Target"] == self.context_object_name + "-info":
|
||||
self.template_name = self.detail_template
|
||||
elif orig_type == "page":
|
||||
self.template_name = self.detail_template
|
||||
else:
|
||||
context["window_content"] = self.detail_template
|
||||
|
||||
return self.render_to_response(context)
|
||||
|
||||
|
||||
class ObjectUpdate(RestrictedViewMixin, ObjectNameMixin, UpdateView):
|
||||
allowed_types = ["modal", "widget", "window", "page"]
|
||||
window_content = "window-content/object-form.html"
|
||||
parser_classes = [FormParser]
|
||||
|
||||
page_title = None
|
||||
page_subtitle = None
|
||||
|
||||
model = None
|
||||
submit_url_name = None
|
||||
submit_url_args = ["type", "pk"]
|
||||
|
||||
request = None
|
||||
|
||||
# Whether pk is required in the get request
|
||||
pk_required = True
|
||||
|
||||
# Whether to hide the cancel button in the form
|
||||
hide_cancel = False
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.title = "Update " + self.context_object_name_singular
|
||||
|
||||
def post_save(self, obj):
|
||||
pass
|
||||
|
||||
def form_valid(self, form):
|
||||
obj = form.save(commit=False)
|
||||
if self.request is None:
|
||||
raise Exception("Request is None")
|
||||
obj.save()
|
||||
form.save_m2m()
|
||||
self.post_save(obj)
|
||||
context = {"message": "Object updated", "class": "success"}
|
||||
response = self.render_to_response(context)
|
||||
response["HX-Trigger"] = f"{self.context_object_name_singular}Event"
|
||||
return response
|
||||
|
||||
def form_invalid(self, form):
|
||||
"""If the form is invalid, render the invalid form."""
|
||||
return self.get(self.request, **self.kwargs, form=form)
|
||||
|
||||
def get(self, request, *args, **kwargs):
|
||||
self.request = request
|
||||
type = kwargs.get("type", None)
|
||||
pk = kwargs.get("pk", None)
|
||||
if not type:
|
||||
return HttpResponseBadRequest("No type specified")
|
||||
if not pk:
|
||||
if self.pk_required:
|
||||
return HttpResponseBadRequest("No pk specified")
|
||||
if type not in self.allowed_types:
|
||||
return HttpResponseBadRequest("Invalid type specified")
|
||||
self.template_name = f"wm/{type}.html"
|
||||
unique = str(uuid.uuid4())[:8]
|
||||
if type == "widget":
|
||||
self.hide_cancel = True
|
||||
|
||||
if type == "page":
|
||||
type = "modal"
|
||||
|
||||
self.object = self.get_object()
|
||||
|
||||
submit_url_args = {}
|
||||
for arg in self.submit_url_args:
|
||||
if arg in locals():
|
||||
submit_url_args[arg] = locals()[arg]
|
||||
elif arg in kwargs:
|
||||
submit_url_args[arg] = kwargs[arg]
|
||||
submit_url = reverse(self.submit_url_name, kwargs=submit_url_args)
|
||||
|
||||
context = self.get_context_data()
|
||||
form = kwargs.get("form", None)
|
||||
if form:
|
||||
context["form"] = form
|
||||
context["title"] = self.title + f" ({type})"
|
||||
context["title_singular"] = self.title_singular
|
||||
context["unique"] = unique
|
||||
context["window_content"] = self.window_content
|
||||
context["context_object_name"] = self.context_object_name
|
||||
context["context_object_name_singular"] = self.context_object_name_singular
|
||||
context["submit_url"] = submit_url
|
||||
context["type"] = type
|
||||
context["hide_cancel"] = self.hide_cancel
|
||||
if self.page_title:
|
||||
context["page_title"] = self.page_title
|
||||
if self.page_subtitle:
|
||||
context["page_subtitle"] = self.page_subtitle
|
||||
response = self.render_to_response(context)
|
||||
# response["HX-Trigger"] = f"{self.context_object_name_singular}Event"
|
||||
return response
|
||||
|
||||
def post(self, request, *args, **kwargs):
|
||||
self.request = request
|
||||
self.template_name = "partials/notify.html"
|
||||
return super().post(request, *args, **kwargs)
|
||||
|
||||
|
||||
class ObjectDelete(RestrictedViewMixin, ObjectNameMixin, DeleteView):
|
||||
model = None
|
||||
template_name = "partials/notify.html"
|
||||
|
||||
# Overriden to prevent success URL from being used
|
||||
def delete(self, request, *args, **kwargs):
|
||||
"""
|
||||
Call the delete() method on the fetched object and then redirect to the
|
||||
success URL.
|
||||
"""
|
||||
self.object = self.get_object()
|
||||
# success_url = self.get_success_url()
|
||||
self.object.delete()
|
||||
context = {"message": "Object deleted", "class": "success"}
|
||||
response = self.render_to_response(context)
|
||||
response["HX-Trigger"] = f"{self.context_object_name_singular}Event"
|
||||
return response
|
||||
|
||||
# This will be used in newer Django versions, until then we get a warning
|
||||
def form_valid(self, form):
|
||||
"""
|
||||
Call the delete() method on the fetched object.
|
||||
"""
|
||||
self.object = self.get_object()
|
||||
self.object.delete()
|
||||
context = {"message": "Object deleted", "class": "success"}
|
||||
response = self.render_to_response(context)
|
||||
response["HX-Trigger"] = f"{self.context_object_name_singular}Event"
|
||||
return response
|
||||
|
||||
|
||||
# from random import randint
|
||||
# from timeit import timeit
|
||||
# entries = 10000
|
||||
# a = [
|
||||
# {'ts': "sss", 'msg': randint(1, 2), str(randint(1, 2)): \
|
||||
# randint(1, 2)} for x in range(entries)
|
||||
# ]
|
||||
# kk = ["msg", "nick"]
|
||||
# call = lambda: dedup_list(a, kk)
|
||||
# #print(timeit(call, number=10))
|
||||
# print(dedup_list(a, kk))
|
||||
|
||||
# # sh-5.1$ python helpers.py
|
||||
# # 1.0805372429895215
|
||||
|
||||
|
||||
# def base36encode(number, alphabet="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"):
|
||||
# """Converts an integer to a base36 string."""
|
||||
# if not isinstance(number, (int)):
|
||||
# raise TypeError("number must be an integer")
|
||||
|
||||
# base36 = ""
|
||||
# sign = ""
|
||||
|
||||
# if number < 0:
|
||||
# sign = "-"
|
||||
# number = -number
|
||||
|
||||
# if 0 <= number < len(alphabet):
|
||||
# return sign + alphabet[number]
|
||||
|
||||
# while number != 0:
|
||||
# number, i = divmod(number, len(alphabet))
|
||||
# base36 = alphabet[i] + base36
|
||||
|
||||
# return sign + base36
|
||||
|
||||
|
||||
# def base36decode(number):
|
||||
# return int(number, 36)
|
||||
|
||||
|
||||
# def randomise_list(user, data):
|
||||
# """
|
||||
# Randomise data in a list of dictionaries.
|
||||
# """
|
||||
# if user.has_perm("core.bypass_randomisation"):
|
||||
# return
|
||||
# if isinstance(data, list):
|
||||
# for index, item in enumerate(data):
|
||||
# for key, value in item.items():
|
||||
# if key in settings.RANDOMISE_FIELDS:
|
||||
# if isinstance(value, int):
|
||||
# min_val = value - (value * settings.RANDOMISE_RATIO)
|
||||
# max_val = value + (value * settings.RANDOMISE_RATIO)
|
||||
# new_val = randint(int(min_val), int(max_val))
|
||||
# data[index][key] = new_val
|
||||
# elif isinstance(data, dict):
|
||||
# for key, value in data.items():
|
||||
# # if key in settings.RANDOMISE_FIELDS:
|
||||
# if isinstance(value, int):
|
||||
# min_val = value - (value * settings.RANDOMISE_RATIO)
|
||||
# max_val = value + (value * settings.RANDOMISE_RATIO)
|
||||
# new_val = randint(int(min_val), int(max_val))
|
||||
# data[key] = new_val
|
||||
|
||||
|
||||
# def obfuscate_list(user, data):
|
||||
# """
|
||||
# Obfuscate data in a list of dictionaries.
|
||||
# """
|
||||
# if user.has_perm("core.bypass_obfuscation"):
|
||||
# return
|
||||
# for index, item in enumerate(data):
|
||||
# for key, value in item.items():
|
||||
# # Obfuscate a ratio of the field
|
||||
# if key in settings.OBFUSCATE_FIELDS:
|
||||
# length = len(value) - 1
|
||||
# split = int(length * settings.OBFUSCATE_KEEP_RATIO)
|
||||
# first_part = value[:split]
|
||||
# second_part = value[split:]
|
||||
# second_len = len(second_part)
|
||||
# second_part = "*" * second_len
|
||||
# data[index][key] = first_part + second_part
|
||||
# # Obfuscate value based on fields
|
||||
# # Example: 2022-02-02 -> 2022-02-**
|
||||
# # 14:11:12 -> 14:11:**
|
||||
# elif key in settings.OBFUSCATE_FIELDS_SEP:
|
||||
# if "-" in value:
|
||||
# sep = "-"
|
||||
# value_spl = value.split("-")
|
||||
# hide_num = settings.OBFUSCATE_DASH_NUM
|
||||
# elif ":" in value:
|
||||
# sep = ":"
|
||||
# value_spl = value.split(":")
|
||||
# hide_num = settings.OBFUSCATE_COLON_NUM
|
||||
|
||||
# first_part = value_spl[:hide_num]
|
||||
# second_part = value_spl[hide_num:]
|
||||
# for index_x, x in enumerate(second_part):
|
||||
# x_len = len(x)
|
||||
# second_part[index_x] = "*" * x_len
|
||||
# result = sep.join([*first_part, *second_part])
|
||||
# data[index][key] = result
|
||||
# for key in settings.COMBINE_FIELDS:
|
||||
# for index, item in enumerate(data):
|
||||
# if key in item:
|
||||
# k1, k2 = settings.COMBINE_FIELDS[key]
|
||||
# if k1 in item and k2 in item:
|
||||
# data[index][key] = item[k1] + item[k2]
|
||||
|
||||
|
||||
# def hash_list(user, data, hash_keys=False):
|
||||
# """
|
||||
# Hash a list of dicts or a list with SipHash42.
|
||||
# """
|
||||
# if user.has_perm("core.bypass_hashing"):
|
||||
# return
|
||||
# cache = "cache.hash"
|
||||
# hash_table = {}
|
||||
# if isinstance(data, dict):
|
||||
# data_copy = [{x: data[x]} for x in data]
|
||||
# else:
|
||||
# data_copy = type(data)((data))
|
||||
# for index, item in enumerate(data_copy):
|
||||
# if "src" in item:
|
||||
# if item["src"] in settings.SAFE_SOURCES:
|
||||
# continue
|
||||
# if isinstance(item, dict):
|
||||
# for key, value in list(item.items()):
|
||||
# if (
|
||||
# key not in settings.WHITELIST_FIELDS
|
||||
# and key not in settings.NO_OBFUSCATE_PARAMS
|
||||
# ):
|
||||
# if isinstance(value, int):
|
||||
# value = str(value)
|
||||
# if isinstance(value, bool):
|
||||
# continue
|
||||
# if value is None:
|
||||
# continue
|
||||
# if hash_keys:
|
||||
# hashed = siphash(settings.HASHING_KEY, key)
|
||||
# else:
|
||||
# hashed = siphash(settings.HASHING_KEY, value)
|
||||
# encoded = base36encode(hashed)
|
||||
# if encoded not in hash_table:
|
||||
# if hash_keys:
|
||||
# hash_table[encoded] = key
|
||||
# else:
|
||||
# hash_table[encoded] = value
|
||||
# if hash_keys:
|
||||
# # Rename the dict key
|
||||
# data[encoded] = data.pop(key)
|
||||
# else:
|
||||
# data[index][key] = encoded
|
||||
# elif isinstance(item, str):
|
||||
# hashed = siphash(settings.HASHING_KEY, item)
|
||||
# encoded = base36encode(hashed)
|
||||
# if encoded not in hash_table:
|
||||
# hash_table[encoded] = item
|
||||
# data[index] = encoded
|
||||
# if hash_table:
|
||||
# r.hmset(cache, hash_table)
|
||||
|
||||
|
||||
# def hash_lookup(user, data_dict, supplementary_data=None):
|
||||
# cache = "cache.hash"
|
||||
# hash_list = SortedSet()
|
||||
# denied = []
|
||||
# for key, value in list(data_dict.items()):
|
||||
# if "source" in data_dict:
|
||||
# if data_dict["source"] in settings.SAFE_SOURCES:
|
||||
# continue
|
||||
# if "src" in data_dict:
|
||||
# if data_dict["src"] in settings.SAFE_SOURCES:
|
||||
# continue
|
||||
# if supplementary_data:
|
||||
# if "source" in supplementary_data:
|
||||
# if supplementary_data["source"] in settings.SAFE_SOURCES:
|
||||
# continue
|
||||
# if key in settings.SEARCH_FIELDS_DENY:
|
||||
# if not user.has_perm("core.bypass_hashing"):
|
||||
# data_dict[key] = SearchDenied(key=key, value=data_dict[key])
|
||||
# denied.append(data_dict[key])
|
||||
# if (
|
||||
# key not in settings.WHITELIST_FIELDS
|
||||
# and key not in settings.NO_OBFUSCATE_PARAMS
|
||||
# ):
|
||||
# if not value:
|
||||
# continue
|
||||
# # hashes = re.findall("\|([^\|]*)\|", value) # noqa
|
||||
# if isinstance(value, str):
|
||||
# hashes = re.findall("[A-Z0-9]{12,13}", value)
|
||||
# elif isinstance(value, dict):
|
||||
# hashes = []
|
||||
# for key, value in value.items():
|
||||
# if not value:
|
||||
# continue
|
||||
# hashes_iter = re.findall("[A-Z0-9]{12,13}", value)
|
||||
# for h in hashes_iter:
|
||||
# hashes.append(h)
|
||||
# if not hashes:
|
||||
# # Otherwise the user could inject plaintext search queries
|
||||
# if not user.has_perm("core.bypass_hashing"):
|
||||
# data_dict[key] = SearchDenied(key=key, value=data_dict[key])
|
||||
# denied.append(data_dict[key])
|
||||
# continue
|
||||
# else:
|
||||
# # There are hashes here but there shouldn't be!
|
||||
# if key in settings.TAG_SEARCH_DENY:
|
||||
# data_dict[key] = LookupDenied(key=key, value=data_dict[key])
|
||||
# denied.append(data_dict[key])
|
||||
# continue
|
||||
|
||||
# for hash in hashes:
|
||||
# hash_list.add(hash)
|
||||
|
||||
# if hash_list:
|
||||
# values = r.hmget(cache, *hash_list)
|
||||
# if not values:
|
||||
# return
|
||||
# for index, val in enumerate(values):
|
||||
# if val is None:
|
||||
# values[index] = b"ERR"
|
||||
# values = [x.decode() for x in values]
|
||||
# total = dict(zip(hash_list, values))
|
||||
# for key in data_dict.keys():
|
||||
# for hash in total:
|
||||
# if data_dict[key]:
|
||||
# if isinstance(data_dict[key], str):
|
||||
# if hash in data_dict[key]:
|
||||
# data_dict[key] = data_dict[key].replace(
|
||||
# f"{hash}", total[hash]
|
||||
# )
|
||||
# elif isinstance(data_dict[key], dict):
|
||||
# for k2, v2 in data_dict[key].items():
|
||||
# if hash in v2:
|
||||
# data_dict[key][k2] = v2.repl
|
||||
# ace(f"{hash}", total[hash])
|
||||
# return denied
|
||||
|
||||
|
||||
# def encrypt_list(user, data, secret):
|
||||
# if user.has_perm("core.bypass_encryption"):
|
||||
# return
|
||||
# cipher = Cipher(algorithms.AES(secret), ECB())
|
||||
# for index, item in enumerate(data):
|
||||
# for key, value in item.items():
|
||||
# if key not in settings.WHITELIST_FIELDS:
|
||||
# encryptor = cipher.encryptor()
|
||||
# if isinstance(value, int):
|
||||
# value = str(value)
|
||||
# if isinstance(value, bool):
|
||||
# continue
|
||||
# if value is None:
|
||||
# continue
|
||||
# decoded = value.encode("utf8", "replace")
|
||||
# length = 16 - (len(decoded) % 16)
|
||||
# decoded += bytes([length]) * length
|
||||
# ct = encryptor.update(decoded) + encryptor.finalize()
|
||||
# final_str = b64encode(ct)
|
||||
# data[index][key] = final_str.decode("utf-8", "replace")
|
|
@ -1,12 +1,12 @@
|
|||
from django.contrib.auth.mixins import LoginRequiredMixin, PermissionRequiredMixin
|
||||
from django.shortcuts import render
|
||||
from mixins.views import ObjectCreate, ObjectDelete, ObjectList, ObjectUpdate
|
||||
from rest_framework.views import APIView
|
||||
|
||||
from core.db.storage import db
|
||||
from core.forms import NotificationRuleForm, NotificationSettingsForm
|
||||
from core.lib.rules import NotificationRuleData
|
||||
from core.models import NotificationRule, NotificationSettings
|
||||
from core.views.helpers import ObjectCreate, ObjectDelete, ObjectList, ObjectUpdate
|
||||
|
||||
|
||||
# Notifications - we create a new notification settings object if there isn't one
|
||||
|
@ -70,7 +70,7 @@ class RuleClear(LoginRequiredMixin, PermissionRequiredMixin, APIView):
|
|||
permission_required = "use_rules"
|
||||
|
||||
def post(self, request, type, pk):
|
||||
template_name = "partials/notify.html"
|
||||
template_name = "mixins/partials/notify.html"
|
||||
rule = NotificationRule.objects.get(pk=pk, user=request.user)
|
||||
if isinstance(rule.match, dict):
|
||||
for index in rule.match:
|
||||
|
|
|
@ -42,7 +42,7 @@ services:
|
|||
- ${PORTAINER_GIT_DIR}:/code
|
||||
- ${PORTAINER_GIT_DIR}/docker/uwsgi.ini:/conf/uwsgi.ini
|
||||
- ${APP_LOCAL_SETTINGS}:/code/app/local_settings.py
|
||||
- ${APP_DATABASE_FILE}:/code/db.sqlite3
|
||||
- ${APP_DATABASE_FILE}:/conf/db.sqlite3
|
||||
- neptune_static:${STATIC_ROOT}
|
||||
env_file:
|
||||
- stack.env
|
||||
|
@ -72,7 +72,7 @@ services:
|
|||
- ${PORTAINER_GIT_DIR}:/code
|
||||
- ${PORTAINER_GIT_DIR}/docker/uwsgi.ini:/conf/uwsgi.ini
|
||||
- ${APP_LOCAL_SETTINGS}:/code/app/local_settings.py
|
||||
- ${APP_DATABASE_FILE}:/code/db.sqlite3
|
||||
- ${APP_DATABASE_FILE}:/conf/db.sqlite3
|
||||
- neptune_static:${STATIC_ROOT}
|
||||
env_file:
|
||||
- stack.env
|
||||
|
@ -101,7 +101,7 @@ services:
|
|||
volumes:
|
||||
- ${PORTAINER_GIT_DIR}:/code
|
||||
- ${APP_LOCAL_SETTINGS}:/code/app/local_settings.py
|
||||
- ${APP_DATABASE_FILE}:/code/db.sqlite3
|
||||
- ${APP_DATABASE_FILE}:/conf/db.sqlite3
|
||||
- neptune_static:${STATIC_ROOT}
|
||||
volumes_from:
|
||||
- tmp
|
||||
|
@ -120,7 +120,7 @@ services:
|
|||
volumes:
|
||||
- ${PORTAINER_GIT_DIR}:/code
|
||||
- ${APP_LOCAL_SETTINGS}:/code/app/local_settings.py
|
||||
- ${APP_DATABASE_FILE}:/code/db.sqlite3
|
||||
- ${APP_DATABASE_FILE}:/conf/db.sqlite3
|
||||
- neptune_static:${STATIC_ROOT}
|
||||
volumes_from:
|
||||
- tmp
|
||||
|
|
Loading…
Reference in New Issue