from __future__ import annotations import base64 from django.conf import settings from django.contrib.auth import authenticate, get_user_model from django.http import HttpRequest, HttpResponse from django.views import View class ProsodyAuthBridge(View): """ Minimal external-auth bridge for Prosody. Returns plain text "1" or "0" per Prosody external auth protocol. """ http_method_names = ["get", "post"] def _denied(self) -> HttpResponse: return HttpResponse("0\n", content_type="text/plain") def _b64url_decode(self, value: str) -> str: raw = str(value or "").strip() if not raw: return "" padded = raw + "=" * (-len(raw) % 4) padded = padded.replace("-", "+").replace("_", "/") try: return base64.b64decode(padded.encode("ascii")).decode( "utf-8", errors="ignore" ) except Exception: return "" def _extract_line(self, request: HttpRequest) -> str: line_b64 = str(request.GET.get("line_b64") or "").strip() if line_b64: return self._b64url_decode(line_b64) body = (request.body or b"").decode("utf-8", errors="ignore").strip() if body: return body return str(request.POST.get("line") or "").strip() def post(self, request: HttpRequest) -> HttpResponse: remote_addr = str(request.META.get("REMOTE_ADDR") or "").strip() if remote_addr not in {"127.0.0.1", "::1"}: return self._denied() expected_secret = str(getattr(settings, "XMPP_SECRET", "") or "").strip() supplied_secret = str(request.headers.get("X-Prosody-Secret") or "").strip() if not supplied_secret: supplied_secret = str(request.GET.get("secret") or "").strip() secret_b64 = str(request.GET.get("secret_b64") or "").strip() if not supplied_secret and secret_b64: supplied_secret = self._b64url_decode(secret_b64) if not expected_secret or supplied_secret != expected_secret: return self._denied() line = self._extract_line(request) if not line: return self._denied() parts = line.split(":") if len(parts) < 3: return self._denied() command, username, _domain = parts[:3] password = ":".join(parts[3:]) if len(parts) > 3 else None if command == "auth": if not password: return self._denied() user = authenticate(username=username, password=password) ok = bool(user is not None and getattr(user, "is_active", False)) return HttpResponse("1\n" if ok else "0\n", content_type="text/plain") if command == "isuser": User = get_user_model() exists = bool(User.objects.filter(username=username).exists()) return HttpResponse("1\n" if exists else "0\n", content_type="text/plain") if command == "setpass": return self._denied() return self._denied() def get(self, request: HttpRequest) -> HttpResponse: return self.post(request)