Increase platform abstraction cohesion
This commit is contained in:
88
core/views/prosody.py
Normal file
88
core/views/prosody.py
Normal file
@@ -0,0 +1,88 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib.auth import authenticate, get_user_model
|
||||
from django.http import HttpRequest, HttpResponse
|
||||
from django.views import View
|
||||
|
||||
|
||||
class ProsodyAuthBridge(View):
|
||||
"""
|
||||
Minimal external-auth bridge for Prosody.
|
||||
Returns plain text "1" or "0" per Prosody external auth protocol.
|
||||
"""
|
||||
|
||||
http_method_names = ["get", "post"]
|
||||
|
||||
def _denied(self) -> HttpResponse:
|
||||
return HttpResponse("0\n", content_type="text/plain")
|
||||
|
||||
def _b64url_decode(self, value: str) -> str:
|
||||
raw = str(value or "").strip()
|
||||
if not raw:
|
||||
return ""
|
||||
padded = raw + "=" * (-len(raw) % 4)
|
||||
padded = padded.replace("-", "+").replace("_", "/")
|
||||
try:
|
||||
return base64.b64decode(padded.encode("ascii")).decode(
|
||||
"utf-8", errors="ignore"
|
||||
)
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
def _extract_line(self, request: HttpRequest) -> str:
|
||||
line_b64 = str(request.GET.get("line_b64") or "").strip()
|
||||
if line_b64:
|
||||
return self._b64url_decode(line_b64)
|
||||
body = (request.body or b"").decode("utf-8", errors="ignore").strip()
|
||||
if body:
|
||||
return body
|
||||
return str(request.POST.get("line") or "").strip()
|
||||
|
||||
def post(self, request: HttpRequest) -> HttpResponse:
|
||||
remote_addr = str(request.META.get("REMOTE_ADDR") or "").strip()
|
||||
if remote_addr not in {"127.0.0.1", "::1"}:
|
||||
return self._denied()
|
||||
|
||||
expected_secret = str(getattr(settings, "XMPP_SECRET", "") or "").strip()
|
||||
supplied_secret = str(request.headers.get("X-Prosody-Secret") or "").strip()
|
||||
if not supplied_secret:
|
||||
supplied_secret = str(request.GET.get("secret") or "").strip()
|
||||
secret_b64 = str(request.GET.get("secret_b64") or "").strip()
|
||||
if not supplied_secret and secret_b64:
|
||||
supplied_secret = self._b64url_decode(secret_b64)
|
||||
if not expected_secret or supplied_secret != expected_secret:
|
||||
return self._denied()
|
||||
|
||||
line = self._extract_line(request)
|
||||
if not line:
|
||||
return self._denied()
|
||||
|
||||
parts = line.split(":")
|
||||
if len(parts) < 3:
|
||||
return self._denied()
|
||||
|
||||
command, username, _domain = parts[:3]
|
||||
password = ":".join(parts[3:]) if len(parts) > 3 else None
|
||||
|
||||
if command == "auth":
|
||||
if not password:
|
||||
return self._denied()
|
||||
user = authenticate(username=username, password=password)
|
||||
ok = bool(user is not None and getattr(user, "is_active", False))
|
||||
return HttpResponse("1\n" if ok else "0\n", content_type="text/plain")
|
||||
|
||||
if command == "isuser":
|
||||
User = get_user_model()
|
||||
exists = bool(User.objects.filter(username=username).exists())
|
||||
return HttpResponse("1\n" if exists else "0\n", content_type="text/plain")
|
||||
|
||||
if command == "setpass":
|
||||
return self._denied()
|
||||
|
||||
return self._denied()
|
||||
|
||||
def get(self, request: HttpRequest) -> HttpResponse:
|
||||
return self.post(request)
|
||||
Reference in New Issue
Block a user