Files
GIA/core/tests/test_xmpp_integration.py
2026-03-08 22:08:55 +00:00

582 lines
21 KiB
Python

"""
Integration tests for XMPP connectivity.
These tests require the GIA stack to be running (Prosody + gia app). They probe
both the XEP-0114 component port and the c2s (client-to-server) port on 5222,
mirroring exactly the flow a phone XMPP client uses:
TCP connect → STARTTLS → TLS upgrade (with cert check) → SASL PLAIN auth
Tests are skipped automatically when XMPP settings are absent (e.g. in CI
environments without a running stack).
"""
from __future__ import annotations
import base64
import hashlib
import http.client
import re
import socket
import ssl
import time
import unittest
import urllib.parse
import xml.etree.ElementTree as ET
from django.conf import settings
from django.test import SimpleTestCase
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _xmpp_configured() -> bool:
return bool(
getattr(settings, "XMPP_JID", None)
and getattr(settings, "XMPP_SECRET", None)
and getattr(settings, "XMPP_ADDRESS", None)
)
def _xmpp_address() -> str:
return str(settings.XMPP_ADDRESS)
def _xmpp_component_port() -> int:
return int(getattr(settings, "XMPP_PORT", None) or 8888)
def _xmpp_c2s_port() -> int:
"""Standard XMPP c2s port (same as a phone client would use)."""
return int(getattr(settings, "XMPP_C2S_PORT", None) or 5222)
def _xmpp_domain() -> str:
"""The VirtualHost domain, derived from XMPP_JID or XMPP_DOMAIN."""
domain = getattr(settings, "XMPP_DOMAIN", None)
if domain:
return str(domain)
jid = str(settings.XMPP_JID)
# Component JIDs may be subdomains; derive the parent domain when needed.
parts = jid.split(".")
if len(parts) > 2:
return ".".join(parts[1:])
return jid
def _prosody_auth_endpoint() -> str:
"""URL of the Django auth bridge that Prosody calls for c2s authentication."""
return str(
getattr(
settings,
"PROSODY_AUTH_ENDPOINT",
"http://127.0.0.1:8090/internal/prosody/auth/",
)
)
def _recv_until(
sock: socket.socket,
patterns: list[bytes],
timeout: float = 8.0,
max_bytes: int = 16384,
) -> bytes:
"""Read from sock until one of the byte patterns appears or timeout/max_bytes hit."""
buf = b""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
remaining = deadline - time.monotonic()
sock.settimeout(max(0.1, remaining))
try:
chunk = sock.recv(4096)
except socket.timeout:
break
if not chunk:
break
buf += chunk
if any(p in buf for p in patterns):
break
if len(buf) >= max_bytes:
break
return buf
def _component_handshake(
address: str, port: int, jid: str, secret: str, timeout: float = 5.0
) -> tuple[bool, str]:
"""
Attempt an XEP-0114 external component handshake.
Returns (success, message).
"""
stream_open = (
"<?xml version='1.0'?>"
f"<stream:stream xmlns='jabber:component:accept' "
f"xmlns:stream='http://etherx.jabber.org/streams' "
f"to='{jid}'>"
)
try:
with socket.create_connection((address, port), timeout=timeout) as sock:
sock.settimeout(timeout)
sock.sendall(stream_open.encode())
buf = _recv_until(sock, [b"id="], timeout=timeout)
header_text = buf.decode(errors="replace")
try:
root = ET.fromstring(header_text + "</stream:stream>")
stream_id = root.get("id", "")
except ET.ParseError:
m = re.search(r'\bid=["\']([^"\']+)["\']', header_text)
stream_id = m.group(1) if m else ""
if not stream_id:
return False, f"No stream id in header: {header_text[:200]}"
token = hashlib.sha1((stream_id + secret).encode()).hexdigest()
sock.sendall(f"<handshake>{token}</handshake>".encode())
response = _recv_until(
sock, [b"<handshake", b"<stream:error"], timeout=timeout
)
resp_text = response.decode(errors="replace")
if "<handshake/>" in resp_text or "<handshake />" in resp_text:
return True, "Handshake accepted"
if "conflict" in resp_text and "already connected" in resp_text:
return True, "Credentials valid (component already connected)"
if "not-authorized" in resp_text:
return False, "Handshake rejected: not-authorized"
if response:
return False, f"Unexpected response: {resp_text[:200]}"
return False, "No response received after handshake"
except socket.timeout:
return False, f"Timed out connecting to {address}:{port}"
except ConnectionRefusedError:
return False, f"Connection refused to {address}:{port}"
except OSError as exc:
return False, f"Socket error: {exc}"
class _C2SResult:
"""Return value from _c2s_sasl_auth."""
def __init__(self, success: bool, stage: str, detail: str):
self.success = success # True = SASL <success/>
self.stage = stage # where we got to: tcp/starttls/tls/features/auth
self.detail = detail # human-readable explanation
def __repr__(self):
return f"<C2SResult success={self.success} stage={self.stage!r} detail={self.detail!r}>"
def _c2s_sasl_auth(
address: str,
port: int,
domain: str,
username: str,
password: str,
verify_cert: bool = True,
timeout: float = 10.0,
) -> _C2SResult:
"""
Mirror the full c2s auth flow a phone XMPP client performs:
1. TCP connect to address:port
2. Open XMPP stream to domain
3. Receive <features> with <starttls>
4. Send <starttls/>, receive <proceed/>
5. ssl.wrap_socket with server_hostname=domain (cert validation if verify_cert)
6. Re-open XMPP stream
7. Receive post-TLS <features> with SASL mechanisms
8. Send SASL PLAIN base64(\x00username\x00password)
9. Return _C2SResult with (True, "auth") on <success/> or (False, "auth") on <failure/>
"""
NS_STREAM = "http://etherx.jabber.org/streams"
NS_TLS = "urn:ietf:params:xml:ns:xmpp-tls"
NS_SASL = "urn:ietf:params:xml:ns:xmpp-sasl"
def stream_open(to: str) -> bytes:
return (
"<?xml version='1.0'?>"
f"<stream:stream xmlns='jabber:client' "
f"xmlns:stream='{NS_STREAM}' "
f"to='{to}' version='1.0'>"
).encode()
try:
raw = socket.create_connection((address, port), timeout=timeout)
except ConnectionRefusedError:
return _C2SResult(False, "tcp", f"Connection refused to {address}:{port}")
except (socket.timeout, OSError) as exc:
return _C2SResult(False, "tcp", f"TCP connect failed: {exc}")
try:
raw.settimeout(timeout)
raw.sendall(stream_open(domain))
# --- Receive pre-TLS features (expect <starttls>) ---
buf = _recv_until(
raw, [b"</stream:features>", b"<stream:error"], timeout=timeout
)
text = buf.decode(errors="replace")
if "<stream:error" in text:
return _C2SResult(
False, "starttls", f"Stream error before features: {text[:200]}"
)
if "starttls" not in text.lower():
return _C2SResult(
False, "starttls", f"No <starttls> in pre-TLS features: {text[:300]}"
)
# --- Negotiate STARTTLS ---
raw.sendall(f"<starttls xmlns='{NS_TLS}'/>".encode())
buf2 = _recv_until(raw, [b"<proceed", b"<failure"], timeout=timeout)
text2 = buf2.decode(errors="replace")
if "<proceed" not in text2:
return _C2SResult(
False,
"starttls",
f"No <proceed/> after STARTTLS request: {text2[:200]}",
)
# --- Upgrade to TLS ---
ctx = ssl.create_default_context()
if not verify_cert:
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
try:
tls = ctx.wrap_socket(raw, server_hostname=domain)
except ssl.SSLCertVerificationError as exc:
return _C2SResult(
False, "tls", f"TLS cert verification failed for {domain!r}: {exc}"
)
except ssl.SSLError as exc:
return _C2SResult(False, "tls", f"TLS handshake error: {exc}")
tls.settimeout(timeout)
# --- Re-open stream over TLS ---
tls.sendall(stream_open(domain))
buf3 = _recv_until(
tls, [b"</stream:features>", b"<stream:error"], timeout=timeout
)
text3 = buf3.decode(errors="replace")
if "<stream:error" in text3:
return _C2SResult(
False, "features", f"Stream error after TLS: {text3[:200]}"
)
mechanisms = re.findall(r"<mechanism>([^<]+)</mechanism>", text3, re.IGNORECASE)
if not mechanisms:
return _C2SResult(
False,
"features",
f"No SASL mechanisms in post-TLS features: {text3[:300]}",
)
if "PLAIN" not in [m.upper() for m in mechanisms]:
return _C2SResult(
False, "features", f"SASL PLAIN not offered; got: {mechanisms}"
)
# --- SASL PLAIN auth ---
credential = base64.b64encode(f"\x00{username}\x00{password}".encode()).decode()
tls.sendall(
f"<auth xmlns='{NS_SASL}' mechanism='PLAIN'>{credential}</auth>".encode()
)
buf4 = _recv_until(
tls, [b"<success", b"<failure", b"<stream:error"], timeout=timeout
)
text4 = buf4.decode(errors="replace")
if "<success" in text4:
return _C2SResult(True, "auth", "SASL PLAIN accepted")
if "<failure" in text4:
# Extract the failure condition element name (e.g. not-authorized)
m = re.search(r"<failure[^>]*>\s*<([a-z-]+)", text4)
condition = m.group(1) if m else "unknown"
return _C2SResult(
False, "auth", f"SASL PLAIN rejected: {condition}{text4[:200]}"
)
if "<stream:error" in text4:
return _C2SResult(False, "auth", f"Stream error during auth: {text4[:200]}")
return _C2SResult(False, "auth", f"No auth response received: {text4[:200]}")
finally:
try:
raw.close()
except Exception:
pass
# ---------------------------------------------------------------------------
# Component tests (XEP-0114)
# ---------------------------------------------------------------------------
@unittest.skipUnless(_xmpp_configured(), "XMPP settings not configured")
class XMPPComponentTests(SimpleTestCase):
def test_component_port_reachable(self):
"""Prosody component port accepts TCP connections."""
addr = _xmpp_address()
port = _xmpp_component_port()
try:
with socket.create_connection((addr, port), timeout=5):
pass
except (ConnectionRefusedError, socket.timeout, OSError) as exc:
self.fail(f"Cannot reach XMPP component port {addr}:{port}: {exc}")
def test_component_handshake_succeeds(self):
"""Prosody accepts the component JID and secret (full XEP-0114 handshake)."""
ok, msg = _component_handshake(
address=_xmpp_address(),
port=_xmpp_component_port(),
jid=str(settings.XMPP_JID),
secret=str(settings.XMPP_SECRET),
)
self.assertTrue(ok, msg)
def test_wrong_secret_rejected(self):
"""Prosody rejects a component connection with an invalid secret."""
ok, msg = _component_handshake(
address=_xmpp_address(),
port=_xmpp_component_port(),
jid=str(settings.XMPP_JID),
secret="definitely-wrong-secret",
)
self.assertFalse(ok, f"Expected rejection but got: {msg}")
# ---------------------------------------------------------------------------
# Auth bridge tests (what Prosody calls to validate user passwords)
# ---------------------------------------------------------------------------
@unittest.skipUnless(_xmpp_configured(), "XMPP settings not configured")
class XMPPAuthBridgeTests(SimpleTestCase):
"""
Tests that probe the Django auth bridge endpoint that Prosody calls when a
phone client attempts to log in. If this endpoint is unreachable, ALL
c2s logins will silently fail with not-authorized regardless of password.
"""
def _parse_endpoint(self):
url = _prosody_auth_endpoint()
parsed = urllib.parse.urlparse(url)
return (
parsed.scheme,
parsed.hostname,
parsed.port or (443 if parsed.scheme == "https" else 80),
parsed.path,
)
def test_auth_endpoint_tcp_reachable(self):
"""Auth bridge port (8090) is listening inside the pod."""
_, host, port, _ = self._parse_endpoint()
try:
with socket.create_connection((host, port), timeout=5):
pass
except (ConnectionRefusedError, socket.timeout, OSError) as exc:
self.fail(
f"Cannot reach auth bridge at {host}:{port}: {exc}\n"
"This means uWSGI is not binding http-socket=127.0.0.1:8090 — "
"ALL c2s logins will fail with not-authorized."
)
def test_auth_endpoint_rejects_bad_secret(self):
"""Auth bridge returns 0 (or error) for a request with a wrong XMPP_SECRET."""
_, host, port, path = self._parse_endpoint()
# isuser command with wrong secret — should be rejected or return 0
query = (
f"?command=isuser%3Anonexistent%3A{urllib.parse.quote(_xmpp_domain())}"
"&secret=wrongsecret"
)
try:
conn = http.client.HTTPConnection(host, port, timeout=5)
conn.request("GET", path + query)
resp = conn.getresponse()
body = resp.read().decode(errors="replace").strip()
conn.close()
except (ConnectionRefusedError, OSError) as exc:
self.fail(f"Could not connect to auth bridge: {exc}")
# Should not return "1" (success) with wrong secret
self.assertNotEqual(
body,
"1",
f"Auth bridge accepted a request with wrong secret (body={body!r})",
)
def test_auth_endpoint_isuser_returns_zero_or_one(self):
"""Auth bridge responds with '0' or '1' for an isuser query (not an error page)."""
secret = getattr(settings, "XMPP_SECRET", "")
_, host, port, path = self._parse_endpoint()
query = (
f"?command=isuser%3Anonexistent%3A{urllib.parse.quote(_xmpp_domain())}"
f"&secret={urllib.parse.quote(secret)}"
)
try:
conn = http.client.HTTPConnection(host, port, timeout=5)
conn.request("GET", path + query)
resp = conn.getresponse()
body = resp.read().decode(errors="replace").strip()
conn.close()
except (ConnectionRefusedError, OSError) as exc:
self.fail(f"Could not connect to auth bridge: {exc}")
self.assertIn(
body,
("0", "1"),
f"Unexpected auth bridge response {body!r} (expected '0' or '1')",
)
# ---------------------------------------------------------------------------
# c2s (client-to-server) tests — mirrors the phone's XMPP connection flow
# ---------------------------------------------------------------------------
@unittest.skipUnless(_xmpp_configured(), "XMPP settings not configured")
class XMPPClientAuthTests(SimpleTestCase):
"""
Full end-to-end XMPP c2s tests that reproduce exactly what a phone client
does when it connects. If the phone cannot authenticate, the
test_c2s_invalid_credentials_rejected test should confirm the error is
not-authorized (not a connection/TLS error), and test_c2s_valid_credentials_accepted
should fail for the same reason as the phone.
"""
def test_c2s_port_reachable(self):
"""Prosody c2s port 5222 accepts TCP connections."""
addr = _xmpp_address()
port = _xmpp_c2s_port()
try:
with socket.create_connection((addr, port), timeout=5):
pass
except (ConnectionRefusedError, socket.timeout, OSError) as exc:
self.fail(f"Cannot reach XMPP c2s port {addr}:{port}: {exc}")
def test_c2s_tls_cert_valid_for_domain(self):
"""
Prosody's TLS certificate is valid for the XMPP domain (what the phone checks).
Failure here means the phone will see a cert error before even trying to log in.
"""
addr = _xmpp_address()
port = _xmpp_c2s_port()
domain = _xmpp_domain()
result = _c2s_sasl_auth(
address=addr,
port=port,
domain=domain,
username="certcheck",
password="certcheck",
verify_cert=True,
timeout=10.0,
)
# We only care that we got past TLS — a SASL failure at stage "auth" is fine.
self.assertNotEqual(
result.stage,
"tls",
f"TLS cert validation failed for domain {domain!r}: {result.detail}\n"
"Phone will see a certificate error — it cannot connect at all.",
)
self.assertNotEqual(
result.stage, "tcp", f"Could not reach c2s port at all: {result.detail}"
)
self.assertNotEqual(
result.stage, "starttls", f"STARTTLS negotiation failed: {result.detail}"
)
def test_c2s_sasl_plain_offered(self):
"""Prosody offers SASL PLAIN after STARTTLS (required for password auth)."""
addr = _xmpp_address()
port = _xmpp_c2s_port()
domain = _xmpp_domain()
result = _c2s_sasl_auth(
address=addr,
port=port,
domain=domain,
username="saslcheck",
password="saslcheck",
verify_cert=False,
timeout=10.0,
)
# We should reach the "auth" stage (SASL PLAIN was offered and we tried it).
# Reaching any earlier stage means SASL PLAIN wasn't offered or something broke.
self.assertIn(
result.stage,
("auth",),
f"Did not reach SASL auth stage — stopped at {result.stage!r}: {result.detail}\n"
"Check that allow_unencrypted_plain_auth = true in prosody config.",
)
def test_c2s_invalid_credentials_rejected(self):
"""
Prosody returns not-authorized for bad credentials — not a connection error.
This is the minimum bar: if this test fails with a connection error
(stage != 'auth'), it means the auth path itself is broken (e.g. the
Django auth bridge endpoint is unreachable). In that case, even valid
credentials would fail, which is exactly what the phone experiences.
"""
addr = _xmpp_address()
port = _xmpp_c2s_port()
domain = _xmpp_domain()
result = _c2s_sasl_auth(
address=addr,
port=port,
domain=domain,
username="nobody_special",
password="definitely-wrong-password-xyz",
verify_cert=False,
timeout=10.0,
)
self.assertFalse(
result.success,
f"Expected auth failure for invalid creds but got success: {result}",
)
self.assertEqual(
result.stage,
"auth",
f"Auth failed at stage {result.stage!r} (expected 'auth' / not-authorized).\n"
f"Detail: {result.detail}\n"
"This means Prosody cannot reach the Django auth bridge — "
"valid credentials would also fail. "
"Check that uWSGI has http-socket=127.0.0.1:8090 and the container is running.",
)
self.assertIn(
"not-authorized",
result.detail,
f"Expected 'not-authorized' failure, got: {result.detail}",
)
@unittest.skipUnless(
bool(__import__("os").environ.get("XMPP_TEST_USER")),
"Set XMPP_TEST_USER and XMPP_TEST_PASSWORD env vars to run live credential test",
)
def test_c2s_valid_credentials_accepted(self):
"""
Real credentials (XMPP_TEST_USER / XMPP_TEST_PASSWORD) are accepted.
Skipped unless env vars are set — run manually to verify end-to-end login.
"""
import os
addr = _xmpp_address()
port = _xmpp_c2s_port()
domain = _xmpp_domain()
username = os.environ["XMPP_TEST_USER"]
password = os.environ.get("XMPP_TEST_PASSWORD", "")
result = _c2s_sasl_auth(
address=addr,
port=port,
domain=domain,
username=username,
password=password,
verify_cert=True,
timeout=10.0,
)
self.assertTrue(
result.success,
f"Login with XMPP_TEST_USER={username!r} failed at stage {result.stage!r}: {result.detail}",
)