import asyncio import base64 import io import secrets import time from urllib.parse import quote_plus from typing import Any import aiohttp import orjson import qrcode from django.conf import settings from django.core.cache import cache from core.clients import signalapi from core.messaging import media_bridge from core.util import logs log = logs.get_logger("transport") _RUNTIME_STATE_TTL = 60 * 60 * 24 _RUNTIME_COMMANDS_TTL = 60 * 15 _RUNTIME_COMMAND_RESULT_TTL = 60 _RUNTIME_CLIENTS: dict[str, Any] = {} def _service_key(service: str) -> str: return str(service or "").strip().lower() def _runtime_key(service: str) -> str: return f"gia:service:runtime:{_service_key(service)}" def _runtime_commands_key(service: str) -> str: return f"gia:service:commands:{_service_key(service)}" def _runtime_command_result_key(service: str, command_id: str) -> str: return f"gia:service:command-result:{_service_key(service)}:{command_id}" def _gateway_base(service: str) -> str: key = f"{service.upper()}_HTTP_URL" default = f"http://{service}:8080" return str(getattr(settings, key, default)).rstrip("/") def _as_qr_png(data: str) -> bytes: image = qrcode.make(data) stream = io.BytesIO() image.save(stream, format="PNG") return stream.getvalue() def _parse_timestamp(data: Any): if isinstance(data, dict): ts = data.get("timestamp") if ts: return ts return None def register_runtime_client(service: str, client: Any): """ Register an in-process runtime client (UR process). """ _RUNTIME_CLIENTS[_service_key(service)] = client def get_runtime_client(service: str): return _RUNTIME_CLIENTS.get(_service_key(service)) def get_runtime_state(service: str) -> dict[str, Any]: return dict(cache.get(_runtime_key(service)) or {}) def update_runtime_state(service: str, **updates): """ Persist runtime state to shared cache so web/UI process can read it. """ key = _runtime_key(service) state = dict(cache.get(key) or {}) state.update(updates) state["updated_at"] = int(time.time()) cache.set(key, state, timeout=_RUNTIME_STATE_TTL) return state def enqueue_runtime_command(service: str, action: str, payload: dict | None = None) -> str: service_key = _service_key(service) command_id = secrets.token_hex(12) command = { "id": command_id, "action": str(action or "").strip(), "payload": dict(payload or {}), "created_at": int(time.time()), } key = _runtime_commands_key(service_key) queued = list(cache.get(key) or []) queued.append(command) # Keep queue bounded to avoid unbounded growth. if len(queued) > 200: queued = queued[-200:] cache.set(key, queued, timeout=_RUNTIME_COMMANDS_TTL) return command_id def pop_runtime_command(service: str) -> dict[str, Any] | None: service_key = _service_key(service) key = _runtime_commands_key(service_key) queued = list(cache.get(key) or []) if not queued: return None command = dict(queued.pop(0) or {}) cache.set(key, queued, timeout=_RUNTIME_COMMANDS_TTL) return command def set_runtime_command_result(service: str, command_id: str, result: dict | None = None): service_key = _service_key(service) result_key = _runtime_command_result_key(service_key, command_id) payload = dict(result or {}) payload.setdefault("completed_at", int(time.time())) cache.set(result_key, payload, timeout=_RUNTIME_COMMAND_RESULT_TTL) async def wait_runtime_command_result(service: str, command_id: str, timeout: float = 20.0): service_key = _service_key(service) result_key = _runtime_command_result_key(service_key, command_id) deadline = time.monotonic() + max(0.1, float(timeout or 0.0)) while time.monotonic() < deadline: payload = cache.get(result_key) if payload is not None: cache.delete(result_key) if isinstance(payload, dict): return dict(payload) return {} await asyncio.sleep(0.2) return None def list_accounts(service: str): """ Return account identifiers for service UI list. """ service_key = _service_key(service) if service_key == "signal": import requests base = str(getattr(settings, "SIGNAL_HTTP_URL", "http://signal:8080")).rstrip("/") try: response = requests.get(f"{base}/v1/accounts", timeout=20) if not response.ok: return [] payload = orjson.loads(response.text or "[]") if isinstance(payload, list): return payload except Exception: return [] return [] state = get_runtime_state(service_key) accounts = state.get("accounts") or [] if service_key == "whatsapp" and not accounts: contacts = state.get("contacts") or [] recovered = [] seen = set() for row in contacts: if not isinstance(row, dict): continue candidate = str(row.get("identifier") or row.get("jid") or "").strip() if not candidate or candidate in seen: continue seen.add(candidate) recovered.append(candidate) if recovered: accounts = recovered update_runtime_state(service_key, accounts=recovered) if isinstance(accounts, list): return accounts return [] def _account_key(value: str) -> str: raw = str(value or "").strip().lower() if "@" in raw: raw = raw.split("@", 1)[0] return raw def unlink_account(service: str, account: str) -> bool: service_key = _service_key(service) account_value = str(account or "").strip() if not account_value: return False if service_key == "signal": import requests base = str(getattr(settings, "SIGNAL_HTTP_URL", "http://signal:8080")).rstrip("/") target = quote_plus(account_value) for path in (f"/v1/accounts/{target}", f"/v1/account/{target}"): try: response = requests.delete(f"{base}{path}", timeout=20) if response.ok: return True except Exception: continue return False if service_key in {"whatsapp", "instagram"}: state = get_runtime_state(service_key) key = _account_key(account_value) raw_accounts = state.get("accounts") or [] accounts = [] for row in raw_accounts: value = str(row or "").strip() if not value: continue if _account_key(value) == key: continue accounts.append(value) raw_contacts = state.get("contacts") or [] contacts = [] for row in raw_contacts: if not isinstance(row, dict): continue identifier = str(row.get("identifier") or "").strip() jid = str(row.get("jid") or "").strip() if _account_key(identifier) == key or _account_key(jid) == key: continue contacts.append(row) update_runtime_state( service_key, accounts=accounts, contacts=contacts, connected=bool(accounts), pair_status=("connected" if accounts else ""), pair_qr="", warning=("" if accounts else "Account unlinked. Add account to link again."), last_event="account_unlinked", last_error="", ) return True return False def get_service_warning(service: str) -> str: service_key = _service_key(service) if service_key == "signal": return "" state = get_runtime_state(service_key) warning = str(state.get("warning") or "").strip() if warning: return warning if not state.get("connected"): return ( f"{service_key.title()} runtime is not connected yet. " "Start UR with the service enabled, open Services -> " f"{service_key.title()} -> Add Account, then scan the QR from " "WhatsApp Linked Devices." ) return "" def request_pairing(service: str, device_name: str = ""): """ Mark a runtime pairing request so UR clients can refresh QR/pair state. """ service_key = _service_key(service) if service_key not in {"whatsapp", "instagram"}: return state = get_runtime_state(service_key) existing_accounts = state.get("accounts") or [] is_connected = bool(state.get("connected")) pair_status = str(state.get("pair_status") or "").strip().lower() if existing_accounts and (is_connected or pair_status == "connected"): update_runtime_state( service_key, warning="Account already linked.", pair_status="connected", pair_qr="", ) return device = str(device_name or "GIA Device").strip() or "GIA Device" update_runtime_state( service_key, pair_device=device, pair_requested_at=int(time.time()), pair_status="pending", pair_qr="", pair_request_source="web", ) async def _gateway_json(method: str, url: str, payload=None): timeout = aiohttp.ClientTimeout(total=20) async with aiohttp.ClientSession(timeout=timeout) as session: request = getattr(session, method.lower()) async with request(url, json=payload) as response: body = await response.read() if not body: return response.status, None try: return response.status, orjson.loads(body) except Exception: return response.status, None async def _normalize_gateway_attachment(service: str, row: dict, session): normalized = dict(row or {}) content = normalized.get("content") if isinstance(content, memoryview): content = content.tobytes() if isinstance(content, bytes): blob_key = media_bridge.put_blob( service=service, content=content, filename=normalized.get("filename") or "attachment.bin", content_type=normalized.get("content_type") or "application/octet-stream", ) return { "blob_key": blob_key, "filename": normalized.get("filename") or "attachment.bin", "content_type": normalized.get("content_type") or "application/octet-stream", "size": normalized.get("size") or len(content), } if normalized.get("blob_key"): return normalized source_url = normalized.get("url") if source_url: try: async with session.get(source_url) as response: if response.status == 200: payload = await response.read() blob_key = media_bridge.put_blob( service=service, content=payload, filename=normalized.get("filename") or source_url.rstrip("/").split("/")[-1] or "attachment.bin", content_type=normalized.get("content_type") or response.headers.get( "Content-Type", "application/octet-stream" ), ) return { "blob_key": blob_key, "filename": normalized.get("filename") or source_url.rstrip("/").split("/")[-1] or "attachment.bin", "content_type": normalized.get("content_type") or response.headers.get( "Content-Type", "application/octet-stream" ), "size": normalized.get("size") or len(payload), } except Exception: log.warning("%s attachment fetch failed for %s", service, source_url) return normalized async def _gateway_send(service: str, recipient: str, text=None, attachments=None): base = _gateway_base(service) url = f"{base}/v1/send" timeout = aiohttp.ClientTimeout(total=20) async with aiohttp.ClientSession(timeout=timeout) as media_session: normalized_attachments = await asyncio.gather( *[ _normalize_gateway_attachment(service, dict(att or {}), media_session) for att in (attachments or []) ] ) data = { "recipient": recipient, "text": text or "", "attachments": normalized_attachments, } status, payload = await _gateway_json("post", url, data) if 200 <= status < 300: ts = _parse_timestamp(payload) return ts if ts else True log.warning("%s gateway send failed (%s): %s", service, status, payload) return False async def _gateway_typing(service: str, recipient: str, started: bool): base = _gateway_base(service) action = "start" if started else "stop" url = f"{base}/v1/typing/{action}" payload = {"recipient": recipient} status, _ = await _gateway_json("post", url, payload) if 200 <= status < 300: return True return False async def send_message_raw(service: str, recipient: str, text=None, attachments=None): """ Unified outbound send path used by models/views/UR. """ service_key = _service_key(service) if service_key == "signal": return await signalapi.send_message_raw(recipient, text, attachments or []) if service_key == "whatsapp": runtime_client = get_runtime_client(service_key) if runtime_client and hasattr(runtime_client, "send_message_raw"): try: runtime_result = await runtime_client.send_message_raw( recipient, text=text, attachments=attachments or [], ) if runtime_result is not False and runtime_result is not None: return runtime_result except Exception as exc: log.warning("%s runtime send failed: %s", service_key, exc) # Web/UI process cannot access UR in-process runtime client directly. # Hand off send to UR via shared cache command queue. command_attachments = [] for att in attachments or []: row = dict(att or {}) # Keep payload cache-friendly and avoid embedding raw bytes. for key in ("content",): row.pop(key, None) command_attachments.append(row) command_id = enqueue_runtime_command( service_key, "send_message_raw", { "recipient": recipient, "text": text or "", "attachments": command_attachments, }, ) command_result = await wait_runtime_command_result( service_key, command_id, timeout=20.0, ) if isinstance(command_result, dict): if command_result.get("ok"): ts = _parse_timestamp(command_result) return ts if ts else True err = str(command_result.get("error") or "").strip() log.warning("whatsapp queued send failed: %s", err or "unknown") return False log.warning("whatsapp queued send timed out waiting for runtime result") return False if service_key == "instagram": runtime_client = get_runtime_client(service_key) if runtime_client and hasattr(runtime_client, "send_message_raw"): try: runtime_result = await runtime_client.send_message_raw( recipient, text=text, attachments=attachments or [], ) if runtime_result is not False and runtime_result is not None: return runtime_result except Exception as exc: log.warning("%s runtime send failed: %s", service_key, exc) return await _gateway_send( service_key, recipient, text=text, attachments=attachments or [], ) if service_key == "xmpp": raise NotImplementedError("Direct XMPP send is handled by the XMPP client.") raise NotImplementedError(f"Unsupported service: {service}") async def start_typing(service: str, recipient: str): service_key = _service_key(service) if service_key == "signal": await signalapi.start_typing(recipient) return True if service_key == "whatsapp": runtime_client = get_runtime_client(service_key) if runtime_client and hasattr(runtime_client, "start_typing"): try: result = await runtime_client.start_typing(recipient) if result: return True except Exception as exc: log.warning("%s runtime start_typing failed: %s", service_key, exc) return False if service_key == "instagram": runtime_client = get_runtime_client(service_key) if runtime_client and hasattr(runtime_client, "start_typing"): try: result = await runtime_client.start_typing(recipient) if result: return True except Exception as exc: log.warning("%s runtime start_typing failed: %s", service_key, exc) return await _gateway_typing(service_key, recipient, started=True) return False async def stop_typing(service: str, recipient: str): service_key = _service_key(service) if service_key == "signal": await signalapi.stop_typing(recipient) return True if service_key == "whatsapp": runtime_client = get_runtime_client(service_key) if runtime_client and hasattr(runtime_client, "stop_typing"): try: result = await runtime_client.stop_typing(recipient) if result: return True except Exception as exc: log.warning("%s runtime stop_typing failed: %s", service_key, exc) return False if service_key == "instagram": runtime_client = get_runtime_client(service_key) if runtime_client and hasattr(runtime_client, "stop_typing"): try: result = await runtime_client.stop_typing(recipient) if result: return True except Exception as exc: log.warning("%s runtime stop_typing failed: %s", service_key, exc) return await _gateway_typing(service_key, recipient, started=False) return False async def fetch_attachment(service: str, attachment_ref: dict): """ Fetch attachment bytes from a source service or URL. """ service_key = _service_key(service) if service_key == "signal": attachment_id = attachment_ref.get("id") or attachment_ref.get("attachment_id") if not attachment_id: return None return await signalapi.fetch_signal_attachment(attachment_id) runtime_client = get_runtime_client(service_key) if runtime_client and hasattr(runtime_client, "fetch_attachment"): try: from_runtime = await runtime_client.fetch_attachment(attachment_ref) if from_runtime: return from_runtime except Exception as exc: log.warning("%s runtime attachment fetch failed: %s", service_key, exc) direct_url = attachment_ref.get("url") blob_key = attachment_ref.get("blob_key") if blob_key: return media_bridge.get_blob(blob_key) if direct_url: timeout = aiohttp.ClientTimeout(total=20) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(direct_url) as response: if response.status != 200: return None content = await response.read() return { "content": content, "content_type": response.headers.get( "Content-Type", attachment_ref.get("content_type", "application/octet-stream"), ), "filename": attachment_ref.get("filename") or direct_url.rstrip("/").split("/")[-1] or "attachment.bin", "size": len(content), } return None def _qr_from_runtime_state(service: str) -> bytes | None: state = get_runtime_state(service) qr_payload = str(state.get("pair_qr") or "").strip() if not qr_payload: return None if qr_payload.startswith("data:image/") and "," in qr_payload: _, b64_data = qr_payload.split(",", 1) try: return base64.b64decode(b64_data) except Exception: return None return _as_qr_png(qr_payload) def get_link_qr(service: str, device_name: str): """ Returns PNG bytes for account-linking QR. - Signal: uses signal-cli REST endpoint. - WhatsApp/Instagram: runtime QR from shared state when available. Falls back to local pairing token QR in development. """ service_key = _service_key(service) device = (device_name or "GIA Device").strip() if service_key == "signal": import requests base = str(getattr(settings, "SIGNAL_HTTP_URL", "http://signal:8080")).rstrip("/") response = requests.get( f"{base}/v1/qrcodelink", params={"device_name": device}, timeout=20, ) response.raise_for_status() return response.content if service_key in {"whatsapp", "instagram"}: runtime_client = get_runtime_client(service_key) if runtime_client and hasattr(runtime_client, "get_link_qr_png"): try: image_bytes = runtime_client.get_link_qr_png(device) if image_bytes: return image_bytes except Exception: pass cached = _qr_from_runtime_state(service_key) if cached: return cached if service_key == "whatsapp": state = get_runtime_state(service_key) existing_accounts = state.get("accounts") or [] pair_status = str(state.get("pair_status") or "").strip().lower() if existing_accounts and ( bool(state.get("connected")) or pair_status == "connected" ): raise RuntimeError( "WhatsApp account already linked in this runtime. " "Only one active linked device is supported. " "Unlink the current account first, then add a new one." ) raise RuntimeError( "Neonize has not provided a pairing QR yet. " "Ensure UR is running with WHATSAPP_ENABLED=true and retry." ) token = secrets.token_urlsafe(24) uri = f"gia://{service_key}/link?device={device}&token={token}" update_runtime_state( service_key, pair_device=device, pair_requested_at=int(time.time()), warning=( "Waiting for runtime pairing QR. " "If this persists, check UR logs and Neonize session state." ), ) return _as_qr_png(uri) raise NotImplementedError(f"Unsupported service for QR linking: {service}") def image_bytes_to_base64(image_bytes: bytes) -> str: return base64.b64encode(image_bytes).decode("utf-8")