89 lines
3.1 KiB
Python
89 lines
3.1 KiB
Python
from __future__ import annotations
|
|
|
|
import base64
|
|
|
|
from django.conf import settings
|
|
from django.contrib.auth import authenticate, get_user_model
|
|
from django.http import HttpRequest, HttpResponse
|
|
from django.views import View
|
|
|
|
|
|
class ProsodyAuthBridge(View):
|
|
"""
|
|
Minimal external-auth bridge for Prosody.
|
|
Returns plain text "1" or "0" per Prosody external auth protocol.
|
|
"""
|
|
|
|
http_method_names = ["get", "post"]
|
|
|
|
def _denied(self) -> HttpResponse:
|
|
return HttpResponse("0\n", content_type="text/plain")
|
|
|
|
def _b64url_decode(self, value: str) -> str:
|
|
raw = str(value or "").strip()
|
|
if not raw:
|
|
return ""
|
|
padded = raw + "=" * (-len(raw) % 4)
|
|
padded = padded.replace("-", "+").replace("_", "/")
|
|
try:
|
|
return base64.b64decode(padded.encode("ascii")).decode(
|
|
"utf-8", errors="ignore"
|
|
)
|
|
except Exception:
|
|
return ""
|
|
|
|
def _extract_line(self, request: HttpRequest) -> str:
|
|
line_b64 = str(request.GET.get("line_b64") or "").strip()
|
|
if line_b64:
|
|
return self._b64url_decode(line_b64)
|
|
body = (request.body or b"").decode("utf-8", errors="ignore").strip()
|
|
if body:
|
|
return body
|
|
return str(request.POST.get("line") or "").strip()
|
|
|
|
def post(self, request: HttpRequest) -> HttpResponse:
|
|
remote_addr = str(request.META.get("REMOTE_ADDR") or "").strip()
|
|
if remote_addr not in {"127.0.0.1", "::1"}:
|
|
return self._denied()
|
|
|
|
expected_secret = str(getattr(settings, "XMPP_SECRET", "") or "").strip()
|
|
supplied_secret = str(request.headers.get("X-Prosody-Secret") or "").strip()
|
|
if not supplied_secret:
|
|
supplied_secret = str(request.GET.get("secret") or "").strip()
|
|
secret_b64 = str(request.GET.get("secret_b64") or "").strip()
|
|
if not supplied_secret and secret_b64:
|
|
supplied_secret = self._b64url_decode(secret_b64)
|
|
if not expected_secret or supplied_secret != expected_secret:
|
|
return self._denied()
|
|
|
|
line = self._extract_line(request)
|
|
if not line:
|
|
return self._denied()
|
|
|
|
parts = line.split(":")
|
|
if len(parts) < 3:
|
|
return self._denied()
|
|
|
|
command, username, _domain = parts[:3]
|
|
password = ":".join(parts[3:]) if len(parts) > 3 else None
|
|
|
|
if command == "auth":
|
|
if not password:
|
|
return self._denied()
|
|
user = authenticate(username=username, password=password)
|
|
ok = bool(user is not None and getattr(user, "is_active", False))
|
|
return HttpResponse("1\n" if ok else "0\n", content_type="text/plain")
|
|
|
|
if command == "isuser":
|
|
User = get_user_model()
|
|
exists = bool(User.objects.filter(username=username).exists())
|
|
return HttpResponse("1\n" if exists else "0\n", content_type="text/plain")
|
|
|
|
if command == "setpass":
|
|
return self._denied()
|
|
|
|
return self._denied()
|
|
|
|
def get(self, request: HttpRequest) -> HttpResponse:
|
|
return self.post(request)
|