import asyncio import base64 import io import secrets import time from typing import Any from urllib.parse import quote_plus import aiohttp import orjson import qrcode from asgiref.sync import sync_to_async from django.conf import settings from django.core.cache import cache from core.clients import signalapi from core.messaging import media_bridge from core.util import logs log = logs.get_logger("transport") _RUNTIME_STATE_TTL = 60 * 60 * 24 _RUNTIME_COMMANDS_TTL = 60 * 15 _RUNTIME_COMMAND_RESULT_TTL = 60 _BRIDGE_MAP_TTL = 60 * 60 * 24 * 14 _RUNTIME_CLIENTS: dict[str, Any] = {} def _service_key(service: str) -> str: return str(service or "").strip().lower() def _runtime_key(service: str) -> str: return f"gia:service:runtime:{_service_key(service)}" def _runtime_commands_key(service: str) -> str: return f"gia:service:commands:{_service_key(service)}" def _runtime_command_result_key(service: str, command_id: str) -> str: return f"gia:service:command-result:{_service_key(service)}:{command_id}" def _runtime_command_cancel_key(service: str, command_id: str) -> str: return f"gia:service:command-cancel:{_service_key(service)}:{command_id}" def _runtime_command_meta_key(service: str, command_id: str) -> str: return f"gia:service:command-meta:{_service_key(service)}:{command_id}" def _bridge_map_key(user_id: int, person_id: int, service: str) -> str: return f"gia:bridge:map:{int(user_id)}:{int(person_id)}:{_service_key(service)}" def _gateway_base(service: str) -> str: key = f"{service.upper()}_HTTP_URL" default = f"http://{service}:8080" return str(getattr(settings, key, default)).rstrip("/") def _as_qr_png(data: str) -> bytes: image = qrcode.make(data) stream = io.BytesIO() image.save(stream, format="PNG") return stream.getvalue() def _parse_timestamp(data: Any): if isinstance(data, dict): ts = data.get("timestamp") if ts: return ts return None def _attachment_has_inline_content(attachment: dict | None) -> bool: value = (attachment or {}).get("content") return isinstance(value, (bytes, bytearray, memoryview)) def _normalize_inline_content(attachment: dict) -> dict: row = dict(attachment or {}) content = row.get("content") if isinstance(content, memoryview): row["content"] = content.tobytes() elif isinstance(content, bytearray): row["content"] = bytes(content) if isinstance(row.get("content"), bytes) and not row.get("size"): row["size"] = len(row["content"]) return row async def prepare_outbound_attachments(service: str, attachments: list | None) -> list: """ Resolve outbound attachment refs into payloads once, in parallel. This is the shared media-prep layer for XMPP -> {Signal, WhatsApp} sends, so attachment performance improvements live in one place. TODO: Stream per-attachment send as each payload resolves (as_completed) to reduce first-byte latency for large media batches. """ rows = [dict(att or {}) for att in (attachments or [])] if not rows: return [] async def _resolve(row: dict): if _attachment_has_inline_content(row): return _normalize_inline_content(row) fetched = await fetch_attachment(service, row) if not fetched: return row merged = dict(row) merged.update(dict(fetched or {})) return _normalize_inline_content(merged) return await asyncio.gather(*[_resolve(row) for row in rows]) def register_runtime_client(service: str, client: Any): """ Register an in-process runtime client (UR process). """ _RUNTIME_CLIENTS[_service_key(service)] = client def get_runtime_client(service: str): return _RUNTIME_CLIENTS.get(_service_key(service)) def get_runtime_state(service: str) -> dict[str, Any]: return dict(cache.get(_runtime_key(service)) or {}) def update_runtime_state(service: str, **updates): """ Persist runtime state to shared cache so web/UI process can read it. """ key = _runtime_key(service) state = dict(cache.get(key) or {}) state.update(updates) state["updated_at"] = int(time.time()) cache.set(key, state, timeout=_RUNTIME_STATE_TTL) return state def record_bridge_mapping( *, user_id: int, person_id: int, service: str, xmpp_message_id: str = "", xmpp_ts: int | None = None, upstream_message_id: str = "", upstream_author: str = "", upstream_ts: int | None = None, text_preview: str = "", local_message_id: str = "", ): key = _bridge_map_key(user_id, person_id, service) rows = list(cache.get(key) or []) now_ts = int(time.time() * 1000) entry = { "xmpp_message_id": str(xmpp_message_id or "").strip(), "xmpp_ts": int(xmpp_ts or 0), "upstream_message_id": str(upstream_message_id or "").strip(), "upstream_author": str(upstream_author or "").strip(), "upstream_ts": int(upstream_ts or 0), "text_preview": str(text_preview or "").strip()[:1000], "local_message_id": str(local_message_id or "").strip(), "updated_at": now_ts, } if not entry["xmpp_message_id"] and not entry["upstream_message_id"]: if entry["upstream_ts"] <= 0 and entry["xmpp_ts"] <= 0: return None deduped = [] for row in rows: same_xmpp = bool(entry["xmpp_message_id"]) and ( str((row or {}).get("xmpp_message_id") or "").strip() == entry["xmpp_message_id"] ) same_upstream = bool(entry["upstream_message_id"]) and ( str((row or {}).get("upstream_message_id") or "").strip() == entry["upstream_message_id"] ) if same_xmpp or same_upstream: continue deduped.append(dict(row or {})) deduped.append(entry) if len(deduped) > 2000: deduped = deduped[-2000:] cache.set(key, deduped, timeout=_BRIDGE_MAP_TTL) log.debug( "reaction-bridge map-write service=%s user=%s person=%s xmpp_id=%s upstream_id=%s upstream_ts=%s local_id=%s rows=%s", service, user_id, person_id, entry.get("xmpp_message_id") or "-", entry.get("upstream_message_id") or "-", entry.get("upstream_ts") or 0, entry.get("local_message_id") or "-", len(deduped), ) return entry def resolve_bridge_from_xmpp( *, user_id: int, person_id: int, service: str, xmpp_message_id: str ): target_id = str(xmpp_message_id or "").strip() if not target_id: return None key = _bridge_map_key(user_id, person_id, service) rows = list(cache.get(key) or []) for row in reversed(rows): if str((row or {}).get("xmpp_message_id") or "").strip() == target_id: log.debug( "reaction-bridge resolve-xmpp-hit service=%s user=%s person=%s xmpp_id=%s upstream_id=%s", service, user_id, person_id, target_id, str((row or {}).get("upstream_message_id") or "-").strip(), ) return dict(row or {}) log.debug( "reaction-bridge resolve-xmpp-miss service=%s user=%s person=%s xmpp_id=%s rows=%s", service, user_id, person_id, target_id, len(rows), ) return None def resolve_bridge_from_upstream( *, user_id: int, person_id: int, service: str, upstream_message_id: str = "", upstream_ts: int | None = None, ): key = _bridge_map_key(user_id, person_id, service) rows = list(cache.get(key) or []) target_id = str(upstream_message_id or "").strip() if target_id: for row in reversed(rows): if str((row or {}).get("upstream_message_id") or "").strip() == target_id: log.debug( "reaction-bridge resolve-upstream-id-hit service=%s user=%s person=%s upstream_id=%s xmpp_id=%s", service, user_id, person_id, target_id, str((row or {}).get("xmpp_message_id") or "-").strip(), ) return dict(row or {}) target_ts = int(upstream_ts or 0) if target_ts > 0: best = None best_gap = None for row in rows: row_ts = int((row or {}).get("upstream_ts") or 0) if row_ts <= 0: continue gap = abs(row_ts - target_ts) row_updated = int((row or {}).get("updated_at") or 0) best_updated = int((best or {}).get("updated_at") or 0) if best else 0 if ( best is None or gap < best_gap or (gap == best_gap and row_updated > best_updated) ): best = dict(row or {}) best_gap = gap if best is not None and best_gap is not None and best_gap <= 15_000: log.debug( "reaction-bridge resolve-upstream-ts-hit service=%s user=%s person=%s target_ts=%s gap_ms=%s picked_xmpp_id=%s picked_upstream_ts=%s", service, user_id, person_id, target_ts, best_gap, str((best or {}).get("xmpp_message_id") or "-").strip(), int((best or {}).get("upstream_ts") or 0), ) return best log.debug( "reaction-bridge resolve-upstream-miss service=%s user=%s person=%s upstream_id=%s upstream_ts=%s rows=%s", service, user_id, person_id, target_id or "-", target_ts, len(rows), ) return None def resolve_bridge_from_text_hint( *, user_id: int, person_id: int, service: str, text_hint: str ): hint = str(text_hint or "").strip().lower() if not hint: return None key = _bridge_map_key(user_id, person_id, service) rows = list(cache.get(key) or []) for row in reversed(rows): preview = str((row or {}).get("text_preview") or "").strip().lower() if preview and (preview == hint or hint in preview): return dict(row or {}) return None def enqueue_runtime_command( service: str, action: str, payload: dict | None = None ) -> str: service_key = _service_key(service) command_id = secrets.token_hex(12) command = { "id": command_id, "action": str(action or "").strip(), "payload": dict(payload or {}), "created_at": int(time.time()), } key = _runtime_commands_key(service_key) queued = list(cache.get(key) or []) queued.append(command) # Keep queue bounded to avoid unbounded growth. if len(queued) > 200: queued = queued[-200:] cache.set(key, queued, timeout=_RUNTIME_COMMANDS_TTL) cache.set( _runtime_command_meta_key(service_key, command_id), { "created_at": int(command.get("created_at") or int(time.time())), "action": str(command.get("action") or ""), }, timeout=_RUNTIME_COMMANDS_TTL, ) return command_id def pop_runtime_command(service: str) -> dict[str, Any] | None: service_key = _service_key(service) key = _runtime_commands_key(service_key) queued = list(cache.get(key) or []) if not queued: return None command = dict(queued.pop(0) or {}) cache.set(key, queued, timeout=_RUNTIME_COMMANDS_TTL) return command def set_runtime_command_result( service: str, command_id: str, result: dict | None = None ): service_key = _service_key(service) result_key = _runtime_command_result_key(service_key, command_id) payload = dict(result or {}) payload.setdefault("completed_at", int(time.time())) cache.set(result_key, payload, timeout=_RUNTIME_COMMAND_RESULT_TTL) cache.delete(_runtime_command_meta_key(service_key, command_id)) def cancel_runtime_command(service: str, command_id: str): """Mark a runtime command as cancelled and set a result so waiters are released.""" service_key = _service_key(service) result_key = _runtime_command_result_key(service_key, command_id) cancel_key = _runtime_command_cancel_key(service_key, command_id) payload = {"ok": False, "error": "cancelled", "completed_at": int(time.time())} cache.set(result_key, payload, timeout=_RUNTIME_COMMAND_RESULT_TTL) cache.set(cancel_key, True, timeout=60) cache.delete(_runtime_command_meta_key(service_key, command_id)) return True def runtime_command_age_seconds(service: str, command_id: str) -> float | None: service_key = _service_key(service) meta = cache.get(_runtime_command_meta_key(service_key, command_id)) if not isinstance(meta, dict): return None try: created_at = int(meta.get("created_at") or 0) except Exception: created_at = 0 if created_at <= 0: return None return max(0.0, time.time() - created_at) def cancel_runtime_commands_for_recipient(service: str, recipient: str) -> list[str]: """Cancel any queued runtime commands for the given recipient and return their ids.""" service_key = _service_key(service) key = _runtime_commands_key(service_key) queued = list(cache.get(key) or []) cancelled = [] for cmd in list(queued): payload = dict(cmd.get("payload") or {}) if str(payload.get("recipient") or "").strip() == str(recipient or "").strip(): cmd_id = str(cmd.get("id") or "").strip() if cmd_id: cancel_runtime_command(service_key, cmd_id) cancelled.append(cmd_id) return cancelled async def wait_runtime_command_result( service: str, command_id: str, timeout: float = 20.0 ): service_key = _service_key(service) result_key = _runtime_command_result_key(service_key, command_id) deadline = time.monotonic() + max(0.1, float(timeout or 0.0)) while time.monotonic() < deadline: payload = cache.get(result_key) if payload is not None: cache.delete(result_key) if isinstance(payload, dict): return dict(payload) return {} await asyncio.sleep(0.2) return None def list_accounts(service: str): """ Return account identifiers for service UI list. """ service_key = _service_key(service) if service_key == "signal": import requests base = str(getattr(settings, "SIGNAL_HTTP_URL", "http://signal:8080")).rstrip( "/" ) try: response = requests.get(f"{base}/v1/accounts", timeout=20) if not response.ok: return [] payload = orjson.loads(response.text or "[]") if isinstance(payload, list): return payload except Exception: return [] return [] state = get_runtime_state(service_key) accounts = state.get("accounts") or [] if service_key == "whatsapp" and not accounts: contacts = state.get("contacts") or [] recovered = [] seen = set() for row in contacts: if not isinstance(row, dict): continue candidate = str(row.get("identifier") or row.get("jid") or "").strip() if not candidate or candidate in seen: continue seen.add(candidate) recovered.append(candidate) if recovered: accounts = recovered update_runtime_state(service_key, accounts=recovered) if isinstance(accounts, list): return accounts return [] def _account_key(value: str) -> str: raw = str(value or "").strip().lower() if "@" in raw: raw = raw.split("@", 1)[0] return raw def unlink_account(service: str, account: str) -> bool: service_key = _service_key(service) account_value = str(account or "").strip() if not account_value: return False if service_key == "signal": import requests base = str(getattr(settings, "SIGNAL_HTTP_URL", "http://signal:8080")).rstrip( "/" ) target = quote_plus(account_value) for path in (f"/v1/accounts/{target}", f"/v1/account/{target}"): try: response = requests.delete(f"{base}{path}", timeout=20) if response.ok: return True except Exception: continue return False if service_key in {"whatsapp", "instagram"}: state = get_runtime_state(service_key) key = _account_key(account_value) raw_accounts = state.get("accounts") or [] accounts = [] for row in raw_accounts: value = str(row or "").strip() if not value: continue if _account_key(value) == key: continue accounts.append(value) raw_contacts = state.get("contacts") or [] contacts = [] for row in raw_contacts: if not isinstance(row, dict): continue identifier = str(row.get("identifier") or "").strip() jid = str(row.get("jid") or "").strip() if _account_key(identifier) == key or _account_key(jid) == key: continue contacts.append(row) update_runtime_state( service_key, accounts=accounts, contacts=contacts, connected=bool(accounts), pair_status=("connected" if accounts else ""), pair_qr="", warning=( "" if accounts else "Account unlinked. Add account to link again." ), last_event="account_unlinked", last_error="", ) return True return False def get_service_warning(service: str) -> str: service_key = _service_key(service) if service_key == "signal": return "" state = get_runtime_state(service_key) warning = str(state.get("warning") or "").strip() if warning: return warning if not state.get("connected"): return ( f"{service_key.title()} runtime is not connected yet. " "Start UR with the service enabled, open Services -> " f"{service_key.title()} -> Add Account, then scan the QR from " "WhatsApp Linked Devices." ) return "" def request_pairing(service: str, device_name: str = ""): """ Mark a runtime pairing request so UR clients can refresh QR/pair state. """ service_key = _service_key(service) if service_key not in {"whatsapp", "instagram"}: return state = get_runtime_state(service_key) existing_accounts = state.get("accounts") or [] is_connected = bool(state.get("connected")) pair_status = str(state.get("pair_status") or "").strip().lower() if existing_accounts and (is_connected or pair_status == "connected"): update_runtime_state( service_key, warning="Account already linked.", pair_status="connected", pair_qr="", ) return device = str(device_name or "GIA Device").strip() or "GIA Device" update_runtime_state( service_key, pair_device=device, pair_requested_at=int(time.time()), pair_status="pending", pair_qr="", pair_request_source="web", ) async def _gateway_json(method: str, url: str, payload=None): timeout = aiohttp.ClientTimeout(total=20) async with aiohttp.ClientSession(timeout=timeout) as session: request = getattr(session, method.lower()) async with request(url, json=payload) as response: body = await response.read() if not body: return response.status, None try: return response.status, orjson.loads(body) except Exception: return response.status, None async def _normalize_gateway_attachment(service: str, row: dict, session): normalized = dict(row or {}) content = normalized.get("content") if isinstance(content, memoryview): content = content.tobytes() if isinstance(content, bytes): blob_key = media_bridge.put_blob( service=service, content=content, filename=normalized.get("filename") or "attachment.bin", content_type=normalized.get("content_type") or "application/octet-stream", ) return { "blob_key": blob_key, "filename": normalized.get("filename") or "attachment.bin", "content_type": normalized.get("content_type") or "application/octet-stream", "size": normalized.get("size") or len(content), } if normalized.get("blob_key"): return normalized source_url = normalized.get("url") if source_url: try: async with session.get(source_url) as response: if response.status == 200: payload = await response.read() blob_key = media_bridge.put_blob( service=service, content=payload, filename=normalized.get("filename") or source_url.rstrip("/").split("/")[-1] or "attachment.bin", content_type=normalized.get("content_type") or response.headers.get( "Content-Type", "application/octet-stream" ), ) return { "blob_key": blob_key, "filename": normalized.get("filename") or source_url.rstrip("/").split("/")[-1] or "attachment.bin", "content_type": normalized.get("content_type") or response.headers.get( "Content-Type", "application/octet-stream" ), "size": normalized.get("size") or len(payload), } except Exception: log.warning("%s attachment fetch failed for %s", service, source_url) return normalized async def _gateway_send(service: str, recipient: str, text=None, attachments=None): base = _gateway_base(service) url = f"{base}/v1/send" timeout = aiohttp.ClientTimeout(total=20) async with aiohttp.ClientSession(timeout=timeout) as media_session: normalized_attachments = await asyncio.gather( *[ _normalize_gateway_attachment(service, dict(att or {}), media_session) for att in (attachments or []) ] ) data = { "recipient": recipient, "text": text or "", "attachments": normalized_attachments, } status, payload = await _gateway_json("post", url, data) if 200 <= status < 300: ts = _parse_timestamp(payload) return ts if ts else True log.warning("%s gateway send failed (%s): %s", service, status, payload) return False async def _gateway_typing(service: str, recipient: str, started: bool): base = _gateway_base(service) action = "start" if started else "stop" url = f"{base}/v1/typing/{action}" payload = {"recipient": recipient} status, _ = await _gateway_json("post", url, payload) if 200 <= status < 300: return True return False async def send_message_raw( service: str, recipient: str, text=None, attachments=None, metadata: dict | None = None, ): """ Unified outbound send path used by models/views/UR. """ service_key = _service_key(service) if service_key == "signal": prepared_attachments = await prepare_outbound_attachments( service_key, attachments or [] ) result = await signalapi.send_message_raw(recipient, text, prepared_attachments) meta = dict(metadata or {}) xmpp_source_id = str(meta.get("xmpp_source_id") or "").strip() if xmpp_source_id and result: from core.models import PersonIdentifier identifier_row = await sync_to_async( lambda: PersonIdentifier.objects.filter( service="signal", identifier=recipient, ) .select_related("user", "person") .first() )() if identifier_row is not None: record_bridge_mapping( user_id=identifier_row.user_id, person_id=identifier_row.person_id, service="signal", xmpp_message_id=xmpp_source_id, xmpp_ts=int(meta.get("xmpp_source_ts") or 0), upstream_message_id="", upstream_author=str(meta.get("upstream_author") or ""), upstream_ts=int(result) if isinstance(result, int) else 0, text_preview=str(meta.get("xmpp_body") or text or ""), local_message_id=str(meta.get("legacy_message_id") or ""), ) from core.messaging import history await history.save_bridge_ref( user=identifier_row.user, identifier=identifier_row, source_service="signal", local_message_id=str(meta.get("legacy_message_id") or ""), local_ts=int(meta.get("xmpp_source_ts") or 0), xmpp_message_id=xmpp_source_id, upstream_message_id="", upstream_author=str(meta.get("upstream_author") or ""), upstream_ts=int(result) if isinstance(result, int) else 0, ) return result if service_key == "whatsapp": runtime_client = get_runtime_client(service_key) if runtime_client and hasattr(runtime_client, "send_message_raw"): try: runtime_result = await runtime_client.send_message_raw( recipient, text=text, attachments=await prepare_outbound_attachments( service_key, attachments or [] ), metadata=dict(metadata or {}), ) if runtime_result is not False and runtime_result is not None: return runtime_result except Exception as exc: log.warning("%s runtime send failed: %s", service_key, exc) # Web/UI process cannot access UR in-process runtime client directly. # Hand off send to UR via shared cache command queue. prepared_attachments = await prepare_outbound_attachments( service_key, attachments or [] ) command_attachments = [] for att in prepared_attachments: row = dict(att or {}) # Keep payload cache-friendly and avoid embedding raw bytes. for key in ("content",): row.pop(key, None) command_attachments.append(row) command_id = enqueue_runtime_command( service_key, "send_message_raw", { "recipient": recipient, "text": text or "", "attachments": command_attachments, "metadata": dict(metadata or {}), }, ) command_result = await wait_runtime_command_result( service_key, command_id, timeout=20.0, ) if isinstance(command_result, dict): if command_result.get("ok"): ts = _parse_timestamp(command_result) return ts if ts else True err = str(command_result.get("error") or "").strip() log.warning("whatsapp queued send failed: %s", err or "unknown") return False log.warning("whatsapp queued send timed out waiting for runtime result") return False if service_key == "instagram": runtime_client = get_runtime_client(service_key) if runtime_client and hasattr(runtime_client, "send_message_raw"): try: runtime_result = await runtime_client.send_message_raw( recipient, text=text, attachments=attachments or [], metadata=dict(metadata or {}), ) if runtime_result is not False and runtime_result is not None: return runtime_result except Exception as exc: log.warning("%s runtime send failed: %s", service_key, exc) return await _gateway_send( service_key, recipient, text=text, attachments=attachments or [], ) if service_key == "xmpp": raise NotImplementedError("Direct XMPP send is handled by the XMPP client.") raise NotImplementedError(f"Unsupported service: {service}") async def send_reaction( service: str, recipient: str, *, emoji: str, target_message_id: str = "", target_timestamp: int | None = None, target_author: str = "", remove: bool = False, ): service_key = _service_key(service) if not str(emoji or "").strip() and not remove: return False if service_key == "signal": log.debug( "reaction-bridge send service=signal recipient=%s target_ts=%s target_author=%s remove=%s", recipient, int(target_timestamp or 0), str(target_author or recipient), bool(remove), ) return await signalapi.send_reaction( recipient_uuid=recipient, emoji=str(emoji or ""), target_timestamp=target_timestamp, target_author=str(target_author or recipient), remove=remove, ) if service_key == "whatsapp": runtime_client = get_runtime_client(service_key) if runtime_client and hasattr(runtime_client, "send_reaction"): try: log.debug( "reaction-bridge send service=whatsapp runtime recipient=%s target_id=%s target_ts=%s remove=%s", recipient, str(target_message_id or "") or "-", int(target_timestamp or 0), bool(remove), ) result = await runtime_client.send_reaction( recipient, emoji=str(emoji or ""), target_message_id=str(target_message_id or ""), target_timestamp=(int(target_timestamp) if target_timestamp else 0), remove=bool(remove), ) if result: return True except Exception as exc: log.warning("%s runtime reaction failed: %s", service_key, exc) command_id = enqueue_runtime_command( service_key, "send_reaction", { "recipient": recipient, "emoji": str(emoji or ""), "target_message_id": str(target_message_id or ""), "target_timestamp": int(target_timestamp or 0), "remove": bool(remove), }, ) command_result = await wait_runtime_command_result( service_key, command_id, timeout=20.0, ) log.debug( "reaction-bridge send service=whatsapp queued-result ok=%s command_id=%s", bool(isinstance(command_result, dict) and command_result.get("ok")), command_id, ) return bool(isinstance(command_result, dict) and command_result.get("ok")) return False async def start_typing(service: str, recipient: str): service_key = _service_key(service) if service_key == "signal": await signalapi.start_typing(recipient) return True if service_key == "whatsapp": runtime_client = get_runtime_client(service_key) if runtime_client and hasattr(runtime_client, "start_typing"): try: result = await runtime_client.start_typing(recipient) if result: return True except Exception as exc: log.warning("%s runtime start_typing failed: %s", service_key, exc) return False if service_key == "instagram": runtime_client = get_runtime_client(service_key) if runtime_client and hasattr(runtime_client, "start_typing"): try: result = await runtime_client.start_typing(recipient) if result: return True except Exception as exc: log.warning("%s runtime start_typing failed: %s", service_key, exc) return await _gateway_typing(service_key, recipient, started=True) return False async def stop_typing(service: str, recipient: str): service_key = _service_key(service) if service_key == "signal": await signalapi.stop_typing(recipient) return True if service_key == "whatsapp": runtime_client = get_runtime_client(service_key) if runtime_client and hasattr(runtime_client, "stop_typing"): try: result = await runtime_client.stop_typing(recipient) if result: return True except Exception as exc: log.warning("%s runtime stop_typing failed: %s", service_key, exc) return False if service_key == "instagram": runtime_client = get_runtime_client(service_key) if runtime_client and hasattr(runtime_client, "stop_typing"): try: result = await runtime_client.stop_typing(recipient) if result: return True except Exception as exc: log.warning("%s runtime stop_typing failed: %s", service_key, exc) return await _gateway_typing(service_key, recipient, started=False) return False async def fetch_attachment(service: str, attachment_ref: dict): """ Fetch attachment bytes from a source service or URL. """ service_key = _service_key(service) if service_key == "signal": attachment_id = attachment_ref.get("id") or attachment_ref.get("attachment_id") if not attachment_id: return None return await signalapi.fetch_signal_attachment(attachment_id) runtime_client = get_runtime_client(service_key) if runtime_client and hasattr(runtime_client, "fetch_attachment"): try: from_runtime = await runtime_client.fetch_attachment(attachment_ref) if from_runtime: return from_runtime except Exception as exc: log.warning("%s runtime attachment fetch failed: %s", service_key, exc) direct_url = attachment_ref.get("url") blob_key = attachment_ref.get("blob_key") if blob_key: return media_bridge.get_blob(blob_key) if direct_url: timeout = aiohttp.ClientTimeout(total=20) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(direct_url) as response: if response.status != 200: return None content = await response.read() return { "content": content, "content_type": response.headers.get( "Content-Type", attachment_ref.get("content_type", "application/octet-stream"), ), "filename": attachment_ref.get("filename") or direct_url.rstrip("/").split("/")[-1] or "attachment.bin", "size": len(content), } return None def _qr_from_runtime_state(service: str) -> bytes | None: state = get_runtime_state(service) qr_payload = str(state.get("pair_qr") or "").strip() if not qr_payload: return None if qr_payload.startswith("data:image/") and "," in qr_payload: _, b64_data = qr_payload.split(",", 1) try: return base64.b64decode(b64_data) except Exception: return None return _as_qr_png(qr_payload) def get_link_qr(service: str, device_name: str): """ Returns PNG bytes for account-linking QR. - Signal: uses signal-cli REST endpoint. - WhatsApp/Instagram: runtime QR from shared state when available. Falls back to local pairing token QR in development. """ service_key = _service_key(service) device = (device_name or "GIA Device").strip() if service_key == "signal": import requests base = str(getattr(settings, "SIGNAL_HTTP_URL", "http://signal:8080")).rstrip( "/" ) response = requests.get( f"{base}/v1/qrcodelink", params={"device_name": device}, timeout=20, ) response.raise_for_status() return response.content if service_key in {"whatsapp", "instagram"}: runtime_client = get_runtime_client(service_key) if runtime_client and hasattr(runtime_client, "get_link_qr_png"): try: image_bytes = runtime_client.get_link_qr_png(device) if image_bytes: return image_bytes except Exception: pass cached = _qr_from_runtime_state(service_key) if cached: return cached if service_key == "whatsapp": state = get_runtime_state(service_key) existing_accounts = state.get("accounts") or [] pair_status = str(state.get("pair_status") or "").strip().lower() if existing_accounts and ( bool(state.get("connected")) or pair_status == "connected" ): raise RuntimeError( "WhatsApp account already linked in this runtime. " "Only one active linked device is supported. " "Unlink the current account first, then add a new one." ) raise RuntimeError( "Neonize has not provided a pairing QR yet. " "Ensure UR is running with WHATSAPP_ENABLED=true and retry." ) token = secrets.token_urlsafe(24) uri = f"gia://{service_key}/link?device={device}&token={token}" update_runtime_state( service_key, pair_device=device, pair_requested_at=int(time.time()), warning=( "Waiting for runtime pairing QR. " "If this persists, check UR logs and Neonize session state." ), ) return _as_qr_png(uri) raise NotImplementedError(f"Unsupported service for QR linking: {service}") def image_bytes_to_base64(image_bytes: bytes) -> str: return base64.b64encode(image_bytes).decode("utf-8")