166 lines
6.3 KiB
Python
166 lines
6.3 KiB
Python
from types import SimpleNamespace
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
from asgiref.sync import async_to_sync
|
|
from django.test import TestCase, override_settings
|
|
from slixmpp.plugins.xep_0356.permissions import MessagePermission
|
|
|
|
from core.clients.xmpp import XMPPComponent
|
|
from core.models import Person, PersonIdentifier, User
|
|
|
|
|
|
@override_settings(
|
|
XMPP_JID="component.example.test",
|
|
XMPP_USER_DOMAIN="example.test",
|
|
)
|
|
class XMPPCarbonTests(TestCase):
|
|
def _component(self):
|
|
component = XMPPComponent(
|
|
ur=MagicMock(),
|
|
jid="component.example.test",
|
|
secret="secret",
|
|
server="localhost",
|
|
port=5347,
|
|
)
|
|
component.log = MagicMock()
|
|
return component
|
|
|
|
def test_build_privileged_outbound_message_targets_contact(self):
|
|
component = self._component()
|
|
|
|
msg = component._build_privileged_outbound_message(
|
|
user_jid="user@example.test",
|
|
contact_jid="contact|signal@component.example.test",
|
|
body_text="hello from signal",
|
|
attachment_url="https://files.example.test/demo.png",
|
|
)
|
|
|
|
self.assertEqual("user@example.test", str(msg["from"]))
|
|
self.assertEqual("contact|signal@component.example.test", str(msg["to"]))
|
|
body = next(
|
|
(child for child in msg.xml if str(child.tag).endswith("body")),
|
|
None,
|
|
)
|
|
self.assertIsNotNone(body)
|
|
self.assertEqual("hello from signal", body.text)
|
|
oob = msg.xml.find(".//{jabber:x:oob}url")
|
|
self.assertIsNotNone(oob)
|
|
self.assertEqual("https://files.example.test/demo.png", oob.text)
|
|
|
|
def test_send_sent_carbon_copy_requires_outgoing_privilege(self):
|
|
component = self._component()
|
|
plugin = SimpleNamespace(
|
|
granted_privileges={"example.test": SimpleNamespace(message="none")},
|
|
send_privileged_message=MagicMock(),
|
|
_make_privileged_message=MagicMock(),
|
|
)
|
|
with (
|
|
patch.object(component.plugin, "get", return_value=plugin),
|
|
patch.object(component, "_user_xmpp_domain", return_value="other.example.test"),
|
|
):
|
|
sent = async_to_sync(component.send_sent_carbon_copy)(
|
|
user_jid="user@example.test",
|
|
contact_jid="contact|signal@component.example.test",
|
|
body_text="hello",
|
|
)
|
|
|
|
self.assertFalse(sent)
|
|
plugin.send_privileged_message.assert_not_called()
|
|
plugin._make_privileged_message.assert_not_called()
|
|
|
|
def test_send_sent_carbon_copy_sends_privileged_message_when_allowed(self):
|
|
component = self._component()
|
|
plugin = SimpleNamespace(
|
|
granted_privileges={
|
|
"example.test": SimpleNamespace(message=MessagePermission.OUTGOING)
|
|
},
|
|
send_privileged_message=MagicMock(),
|
|
)
|
|
with patch.object(component.plugin, "get", return_value=plugin):
|
|
sent = async_to_sync(component.send_sent_carbon_copy)(
|
|
user_jid="user@example.test",
|
|
contact_jid="contact|signal@component.example.test",
|
|
body_text="hello",
|
|
)
|
|
|
|
self.assertTrue(sent)
|
|
plugin.send_privileged_message.assert_called_once()
|
|
sent_message = plugin.send_privileged_message.call_args.args[0]
|
|
self.assertEqual("contact|signal@component.example.test", str(sent_message["to"]))
|
|
self.assertEqual("user@example.test", str(sent_message["from"]))
|
|
self.assertIsNotNone(
|
|
next(
|
|
(child for child in sent_message.xml if str(child.tag).endswith("body")),
|
|
None,
|
|
)
|
|
)
|
|
|
|
def test_send_sent_carbon_copy_uses_configured_domain_without_advertisement(self):
|
|
component = self._component()
|
|
wrapped = MagicMock()
|
|
plugin = SimpleNamespace(
|
|
granted_privileges={},
|
|
send_privileged_message=MagicMock(),
|
|
_make_privileged_message=MagicMock(return_value=wrapped),
|
|
)
|
|
with patch.object(component.plugin, "get", return_value=plugin):
|
|
sent = async_to_sync(component.send_sent_carbon_copy)(
|
|
user_jid="user@example.test",
|
|
contact_jid="contact|signal@component.example.test",
|
|
body_text="hello",
|
|
)
|
|
|
|
self.assertTrue(sent)
|
|
plugin.send_privileged_message.assert_not_called()
|
|
plugin._make_privileged_message.assert_called_once()
|
|
wrapped.send.assert_called_once()
|
|
|
|
def test_outgoing_relay_keeps_you_prefix_while_attempting_carbon_copy(self):
|
|
component = self._component()
|
|
user = User.objects.create_user(username="user", password="pw")
|
|
person = Person.objects.create(user=user, name="Contact")
|
|
identifier = PersonIdentifier.objects.create(
|
|
user=user,
|
|
person=person,
|
|
service="signal",
|
|
identifier="+15550000000",
|
|
)
|
|
call_order = []
|
|
|
|
async def send_xmpp_message(*args, **kwargs):
|
|
call_order.append("fallback")
|
|
return "xmpp-message-id"
|
|
|
|
async def send_sent_carbon_copy(*args, **kwargs):
|
|
call_order.append("carbon")
|
|
return True
|
|
|
|
component.send_sent_carbon_copy = AsyncMock(side_effect=send_sent_carbon_copy)
|
|
component.send_xmpp_message = AsyncMock(side_effect=send_xmpp_message)
|
|
|
|
with (
|
|
patch("core.clients.xmpp.transport.record_bridge_mapping"),
|
|
patch("core.clients.xmpp.history.save_bridge_ref", new=AsyncMock()),
|
|
):
|
|
async_to_sync(component.send_from_external)(
|
|
user,
|
|
identifier,
|
|
"hello",
|
|
True,
|
|
attachments=[],
|
|
source_ref={},
|
|
)
|
|
|
|
component.send_sent_carbon_copy.assert_awaited_once_with(
|
|
user_jid="user@example.test",
|
|
contact_jid="contact|signal@component.example.test",
|
|
body_text="hello",
|
|
)
|
|
component.send_xmpp_message.assert_awaited_once_with(
|
|
"user@example.test",
|
|
"contact|signal@component.example.test",
|
|
"YOU: hello",
|
|
use_omemo_encryption=True,
|
|
)
|
|
self.assertEqual(["fallback", "carbon"], call_order)
|