Fully implement WhatsApp, Signal and XMPP multiplexing
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
import asyncio
|
||||
import json
|
||||
import time
|
||||
from urllib.parse import quote_plus, urlparse
|
||||
|
||||
import aiohttp
|
||||
@@ -472,6 +473,14 @@ class HandleMessage(Command):
|
||||
outgoing=is_from_bot,
|
||||
)
|
||||
stored_messages.add(message_key)
|
||||
# Notify unified router to ensure service context is preserved
|
||||
await self.ur.message_received(
|
||||
self.service,
|
||||
identifier=identifier,
|
||||
text=message_text,
|
||||
ts=ts,
|
||||
payload=msg,
|
||||
)
|
||||
|
||||
# TODO: Permission checks
|
||||
manips = await sync_to_async(list)(Manipulation.objects.filter(enabled=True))
|
||||
@@ -595,6 +604,7 @@ class HandleMessage(Command):
|
||||
class SignalClient(ClientBase):
|
||||
def __init__(self, ur, *args, **kwargs):
|
||||
super().__init__(ur, *args, **kwargs)
|
||||
self._stopping = False
|
||||
signal_number = str(getattr(settings, "SIGNAL_NUMBER", "")).strip()
|
||||
self.client = NewSignalBot(
|
||||
ur,
|
||||
@@ -606,9 +616,121 @@ class SignalClient(ClientBase):
|
||||
)
|
||||
|
||||
self.client.register(HandleMessage(self.ur, self.service))
|
||||
self._command_task = None
|
||||
|
||||
async def _drain_runtime_commands(self):
|
||||
"""Process queued runtime commands (e.g., web UI sends via composite router)."""
|
||||
from core.clients import transport
|
||||
# Process a small burst each loop to keep sends responsive.
|
||||
for _ in range(5):
|
||||
command = transport.pop_runtime_command(self.service)
|
||||
if not command:
|
||||
return
|
||||
await self._execute_runtime_command(command)
|
||||
|
||||
async def _execute_runtime_command(self, command):
|
||||
"""Execute a single runtime command like send_message_raw."""
|
||||
from core.clients import transport
|
||||
command_id = str((command or {}).get("id") or "").strip()
|
||||
action = str((command or {}).get("action") or "").strip()
|
||||
payload = dict((command or {}).get("payload") or {})
|
||||
if not command_id:
|
||||
return
|
||||
|
||||
if action == "send_message_raw":
|
||||
recipient = str(payload.get("recipient") or "").strip()
|
||||
text = payload.get("text")
|
||||
attachments = payload.get("attachments") or []
|
||||
try:
|
||||
result = await signalapi.send_message_raw(
|
||||
recipient_uuid=recipient,
|
||||
text=text,
|
||||
attachments=attachments,
|
||||
)
|
||||
if result is False or result is None:
|
||||
raise RuntimeError("signal_send_failed")
|
||||
transport.set_runtime_command_result(
|
||||
self.service,
|
||||
command_id,
|
||||
{
|
||||
"ok": True,
|
||||
"timestamp": int(result)
|
||||
if isinstance(result, int)
|
||||
else int(time.time() * 1000),
|
||||
},
|
||||
)
|
||||
except Exception as exc:
|
||||
self.log.error(f"send_message_raw failed: {exc}", exc_info=True)
|
||||
transport.set_runtime_command_result(
|
||||
self.service,
|
||||
command_id,
|
||||
{
|
||||
"ok": False,
|
||||
"error": str(exc),
|
||||
},
|
||||
)
|
||||
return
|
||||
|
||||
if action == "notify_xmpp_sent":
|
||||
person_identifier_id = str(payload.get("person_identifier_id") or "").strip()
|
||||
text = str(payload.get("text") or "")
|
||||
if not person_identifier_id:
|
||||
transport.set_runtime_command_result(
|
||||
self.service,
|
||||
command_id,
|
||||
{"ok": False, "error": "missing_person_identifier_id"},
|
||||
)
|
||||
return
|
||||
try:
|
||||
identifier = await sync_to_async(
|
||||
lambda: PersonIdentifier.objects.filter(id=person_identifier_id).select_related("user", "person").first()
|
||||
)()
|
||||
if identifier is None:
|
||||
transport.set_runtime_command_result(
|
||||
self.service,
|
||||
command_id,
|
||||
{"ok": False, "error": "person_identifier_not_found"},
|
||||
)
|
||||
return
|
||||
await self.ur.xmpp.client.send_from_external(
|
||||
identifier.user,
|
||||
identifier,
|
||||
text,
|
||||
True,
|
||||
attachments=[],
|
||||
)
|
||||
transport.set_runtime_command_result(
|
||||
self.service,
|
||||
command_id,
|
||||
{"ok": True, "timestamp": int(time.time() * 1000)},
|
||||
)
|
||||
except Exception as exc:
|
||||
transport.set_runtime_command_result(
|
||||
self.service,
|
||||
command_id,
|
||||
{"ok": False, "error": str(exc)},
|
||||
)
|
||||
return
|
||||
|
||||
transport.set_runtime_command_result(
|
||||
self.service,
|
||||
command_id,
|
||||
{"ok": False, "error": f"unsupported_action:{action or '-'}"},
|
||||
)
|
||||
|
||||
async def _command_loop(self):
|
||||
"""Background task to periodically drain queued commands."""
|
||||
while not self._stopping:
|
||||
try:
|
||||
await self._drain_runtime_commands()
|
||||
except Exception as exc:
|
||||
self.log.warning(f"Command loop error: {exc}")
|
||||
await asyncio.sleep(1)
|
||||
|
||||
def start(self):
|
||||
self.log.info("Signal client starting...")
|
||||
self.client._event_loop = self.loop
|
||||
|
||||
# Start background command processing loop
|
||||
if not self._command_task or self._command_task.done():
|
||||
self._command_task = self.loop.create_task(self._command_loop())
|
||||
self.client.start()
|
||||
|
||||
Reference in New Issue
Block a user