diff --git a/INSTALL.md b/INSTALL.md new file mode 100644 index 0000000..49d94b5 --- /dev/null +++ b/INSTALL.md @@ -0,0 +1,151 @@ +# INSTALL + +This guide covers installation, environment configuration, and operational startup for GIA. + +Use this first. Then read `README.md` for feature and operation-mode details. + +## 1) Prerequisites + +- Linux host with either Podman + podman-compose wrapper or Docker Compose compatibility. +- Git. +- Network access for service images and Python dependencies. + +## 2) Clone and initialize + +```bash +git clone GIA +cd GIA +cp stack.env.example stack.env +``` + +## 3) Configure environment (`stack.env`) + +At minimum, set: + +- `SECRET_KEY` +- `DOMAIN` +- `URL` +- `ALLOWED_HOSTS` +- `CSRF_TRUSTED_ORIGINS` +- `APP_DATABASE_FILE` +- `APP_LOCAL_SETTINGS` +- `STATIC_ROOT` + +Enable transport services as needed: + +- `SIGNAL_NUMBER` +- `SIGNAL_HTTP_URL` +- `WHATSAPP_ENABLED` +- `WHATSAPP_DB_DIR` +- `INSTAGRAM_ENABLED` +- `COMPOSE_WS_ENABLED` + +XMPP bridge settings: + +- `XMPP_ADDRESS` +- `XMPP_JID` +- `XMPP_PORT` +- `XMPP_SECRET` + +For XMPP media upload, configure one of: + +- `XMPP_UPLOAD_SERVICE` +- `XMPP_UPLOAD_JID` + +If omitted, runtime attempts XEP-0363 discovery. + +## 4) Build and start + +```bash +make build +make run +``` + +## 5) Bootstrap database and admin + +```bash +make migrate +make auth +``` + +Optional static token helper: + +```bash +make token +``` + +## 6) Logs and health checks + +Tail logs: + +```bash +make log +``` + +Basic stack status: + +```bash +docker-compose --env-file=stack.env ps +``` + +## 7) Restart conventions + +### Full environment recycle (preferred when dependency state is broken) + +```bash +make stop && make run +``` + +Use the explicit `make stop && make run` command sequence when a full recycle is needed. + +### Single service restart + +```bash +docker-compose --env-file=stack.env restart +``` + +If single-service restart fails due to dependency/pod state, use full recycle above. + +## 8) Runtime code change policy + +After changing UR/runtime code (`core/clients/*`, transport, relay paths), restart runtime services before validating behavior. + +Minimum target: + +```bash +docker-compose --env-file=stack.env restart ur +``` + +If blocked, use full recycle. + +## 9) Service endpoints + +- Home: `/` +- Signal service page: `/services/signal/` +- WhatsApp service page: `/services/whatsapp/` +- Instagram service page: `/services/instagram/` +- Manual compose: `/compose/page/` +- AI workspace: `/ai/workspace/` +- OSINT search: `/search/page/` + +## 10) Common troubleshooting + +### A) Compose restart errors / dependency improper state + +Use: + +```bash +make stop && make run +``` + +### B) XMPP attachment upload issues + +- Confirm upload host TLS and cert chain are valid. +- Confirm `XMPP_UPLOAD_SERVICE`/`XMPP_UPLOAD_JID` is set, or discovery works. +- Check runtime logs for slot request and upload errors. + +### C) Signal or WhatsApp send failures + +- Verify account/link status in service pages. +- Verify `ur` service is running. +- Inspect `ur` logs for transport-specific errors. diff --git a/LLM_CODING_STANDARDS.md b/LLM_CODING_STANDARDS.md new file mode 100644 index 0000000..2cf275f --- /dev/null +++ b/LLM_CODING_STANDARDS.md @@ -0,0 +1,102 @@ +# LLM Coding Standards (GIA) + +This document defines implementation standards for coding LLM agents working in this repository. + +## 1) Repository Context + +GIA is a multi-transport communication workspace with these major domains: + +- Transport runtime and relay (`core/modules/router.py`, `core/clients/*`) +- Shared transport abstraction (`core/clients/transport.py`) +- Manual compose UX and APIs (`core/views/compose.py` + templates) +- AI workspace and mitigation operations (`core/views/workspace.py`) +- OSINT/search UX (`core/views/osint.py`) +- Core identity/history models (`core/models.py`) + +The stack is containerized and commonly managed via `make` commands in `Makefile`. + +## 2) Core Design Principles + +- Fix root causes first; avoid papering over with UI-only patches. +- Keep behavior symmetric across `signal`, `whatsapp`, `xmpp`, `instagram` where protocol permits. +- Centralize shared logic in one place; avoid copy/paste service forks. +- Prefer small reusable helpers over monolithic in-function branching. +- Preserve existing UX unless a task explicitly changes interaction behavior. + +## 3) Runtime / Restart Policy + +- If runtime code changes (`core/clients/*`, router, relay, transport), restart runtime before validating. +- Preferred full stack recycle: `make stop && make run`. +- Single-service restart is acceptable, but if dependency state is improper, use full recycle. + +## 4) Media Pipeline Rules + +- Use one shared attachment prep pipeline (`transport` layer) for outbound service sends. +- Service adapters should only execute provider-specific API calls after shared prep. +- Preserve MIME type and filename semantics; infer when source metadata is missing. +- Never inject internal `/compose/media/blob/...` links as relay body text for attachment-only messages. +- Blob links are fallback for web/history continuity, not preferred transport payloads. + +## 5) XMPP Upload/Share Rules + +- Preferred config: `XMPP_UPLOAD_SERVICE` or `XMPP_UPLOAD_JID`. +- If config absent, use XEP-0363 discovery and normalize discovery to a valid JID. +- Do not pass raw stanza XML where JID is expected. +- Keep one place for upload slot and upload error handling semantics. + +## 6) Transport Symmetry & Performance + +- Improvements for one service’s media path should be mirrored for others when applicable. +- Keep performance tuning entry points centralized in shared transport helpers. +- One TODO location per shared bottleneck (e.g. stream-as-completed optimization), not repeated per service. + +## 7) Logging and Diagnostics + +- Keep production logs high signal: + - lifecycle at info + - warnings/errors for operational failures + - high-volume traces at debug +- Temporary deep diagnostics must be gated (e.g. `WHATSAPP_DEBUG`) and removable in one patch. +- Avoid ad-hoc/unprofessional debug strings. + +## 8) Large File Refactoring Policy + +When touching large files (especially ~2000+ lines): + +- Extract minimal reusable helpers in-place or to shared modules. +- Add short docstrings for shared utility functions. +- Avoid introducing new repeated service-specific branches. + +## 9) Documentation Maintenance Standards + +Maintain docs as follows: + +- `INSTALL.md` owns setup/config/restart/troubleshooting. +- `README.md` owns capability map, operation modes, architecture, and interface workflows. +- Keep README setup instructions short and point to INSTALL first. +- Update both docs when operational commands/env requirements change. +- Reflect significant behavior changes (relay semantics, media fallback rules, runtime handoff) in docs immediately. + +## 10) Validation Workflow + +- Run targeted checks for changed files. +- Validate runtime behavior with real channel tests after restart. +- For media/relay changes, validate both directions: + - XMPP -> service + - service -> XMPP +- Confirm no regression in web compose display semantics. + +## 11) Security & TLS Operational Baseline + +- Keep upload endpoints compatible and modern (`TLSv1.2` + `TLSv1.3`). +- Maintain valid certificate chain/stapling where configured. +- Treat TLS/protocol mismatch as infra blocker before application-layer debugging. + +## 12) Current Operational Priorities (Living Guidance) + +As of current project state: + +- Stabilize live WhatsApp<->XMPP sync behavior. +- Complete media relay parity and offline/backfill consistency. +- Continue reducing transport-specific duplication through shared abstractions. +- Keep compose and workspace indicators clear and non-ambiguous. diff --git a/README.md b/README.md index 53b3070..9c09e72 100644 --- a/README.md +++ b/README.md @@ -1,198 +1,170 @@ # GIA -GIA is a Django-based communications workspace that combines: -- Multi-platform messaging transport (Signal, WhatsApp via Neonize runtime, Instagram gateway/runtime, XMPP bridge). -- AI-assisted drafting, summaries, pattern extraction, and mitigation planning. -- Queueing and approval workflows for controlled outbound messaging. +GIA is a multi-transport communication workspace that unifies Signal, WhatsApp, Instagram, and XMPP into one operator interface with AI-assisted analysis, drafting, and mitigation workflows. -This document covers architecture, setup, runtime behavior, and development workflow. +> Start with `INSTALL.md` first. This README focuses on capabilities and operation modes. -## Architecture +## What GIA Does -### High-level flow -1. Transport clients receive messages (Signal/WhatsApp/Instagram/XMPP). -2. UR (`manage.py ur`) normalizes events and routes them through unified handlers. -3. Message history is persisted in Django models (`Message`, `ChatSession`, identifiers). -4. UI pages (workspace + manual compose) read/write through Django views. -5. AI operations run through configured `AI` provider rows and store results in workspace models. +- Unifies chats from multiple protocols in one interface. +- Keeps conversation history in a shared model (`Person`, `PersonIdentifier`, `ChatSession`, `Message`). +- Supports manual, queue-driven, and AI-assisted outbound messaging. +- Bridges messages across transports (including XMPP) with attachment handling. +- Tracks delivery/read metadata and typing state events. +- Provides AI workspace analytics, mitigation plans, and insight visualizations. -### Core components -- `core/modules/router.py`: - - Unified Router entrypoint. - - Starts transport clients. - - Cross-protocol typing/read events. -- `core/clients/`: - - `signal.py` + `signalapi.py`: Signal transport integration. - - `whatsapp.py`: Neonize-backed WhatsApp runtime client. - - `instagram.py`: Instagram runtime/gateway integration. - - `xmpp.py`: XMPP bridge client. - - `transport.py`: Unified send/typing/QR API used by views/models. -- `core/messaging/`: - - AI execution wrapper. - - Message history persistence and enrichment helpers. -- `core/views/`: - - `workspace.py`: AI workspace, insights, mitigation plans/artifacts. - - `compose.py`: Manual chat mode, AI drafts/summary/engage shortcuts. - - `signal.py`, `whatsapp.py`, `instagram.py`: Service setup/account pages. +## Operation Modes -### Data model highlights -- `Person` + `PersonIdentifier`: - - Identity layer across services (`signal`, `whatsapp`, `instagram`, `xmpp`). -- `ChatSession` + `Message`: - - Conversation storage. - - Delivery/read metadata fields on `Message`. -- `WorkspaceConversation`: - - AI workspace context per person/thread. - - Stability + commitment scores and confidence. -- `WorkspaceMetricSnapshot`: - - Historical metric snapshots for trend charts. -- Pattern mitigation models: - - `PatternMitigationPlan`, `Rule`, `Game`, `Correction`, exports, and auto settings. +## 1) Service Management Mode -## Runtime services +Entry points: -`docker-compose.yml` defines these primary services: -- `app`: Django web process. -- `ur`: Unified Router runtime (transport clients). -- `scheduling`: Scheduled jobs. -- `migration`: Startup migrations. -- `collectstatic`: Static asset collection. -- `redis`: Cache/runtime signaling. -- `signal-cli-rest-api`: Signal backend. +- `/services/signal/` +- `/services/whatsapp/` +- `/services/instagram/` -## Installation +Capabilities: -### Prerequisites -- Podman + Podman Compose (preferred in this repo), or Docker Compose. -- Linux environment with bind-mount access for configured paths. +- Account linking/unlinking. +- Contact and chat discovery views. +- Message list inspection (service-native views). +- Runtime state visibility (connection/linking behavior through logs + UI state). -### 1) Clone and configure -```bash -git clone GIA -cd GIA -cp stack.env.example stack.env -``` +## 2) Manual Compose Mode -Edit `stack.env` with at least: -- `SECRET_KEY` -- `ALLOWED_HOSTS` -- `CSRF_TRUSTED_ORIGINS` -- `SIGNAL_NUMBER` (if Signal enabled) -- `WHATSAPP_ENABLED=true|false` -- `INSTAGRAM_ENABLED=true|false` -- XMPP values if XMPP bridge is enabled +Entry points: -### 2) Build and start -```bash -make build -make run -``` +- `/compose/page/` +- `/compose/widget/` +- `/compose/workspace/` -### 3) Run migrations and create admin user -```bash -make migrate -make auth -``` +Capabilities: -### 4) Follow logs -```bash -make log -``` +- Per-person/per-service chat thread view. +- Outbound send with runtime handoff for relevant services. +- Thread refresh/history sync path. +- Real-time updates via compose websocket path (with fallback behavior). +- Inline metadata for delivery/read timing. +- Latency and block-gap indicators for conversational cadence. -## Local developer commands +Compose operational behaviors: -```bash -make makemigrations -make migrate -make test -``` +- Manual sends are persisted to history and propagated to runtime transports. +- Runtime command-result polling handles queued sends where needed. +- Attachments are normalized through shared transport media prep logic. +- Blob URLs are fallback paths; source/shared media URLs are preferred when available. -Run ad-hoc manage commands: -```bash -docker-compose --env-file=stack.env run --rm app \ - sh -c ". /venv/bin/activate && python manage.py " -``` +## 3) AI Workspace Mode -## Service setup pages +Entry point: -- Signal: `/services/signal/` -- WhatsApp: `/services/whatsapp/` -- Instagram: `/services/instagram/` +- `/ai/workspace/` -### WhatsApp pairing (Neonize) -WhatsApp account linking expects a real Neonize runtime QR: -1. Start stack with `WHATSAPP_ENABLED=true`. -2. Ensure `ur` is running and WhatsApp runtime initialized. -3. Open `Services -> WhatsApp -> Add Account`. -4. Scan QR from WhatsApp Linked Devices. +Capabilities: -If runtime QR is not available yet, UI returns a clear warning instead of a synthetic QR. +- Person-centric workspace widgets and timelines. +- Draft generation, summary generation, quick insights. +- Insight detail pages and graph visualizations. +- Mitigation planning (rules, games, corrections, fundamentals, automation settings). +- Engage flows (preview + send). -## Manual compose mode +## 4) Queue / Approval Mode -- Route: `/compose/page/?service=&identifier=[&person=]` -- Features: - - Chat thread view. - - Send with two-step failsafe (`Arm Send` + `Confirm Intent`). - - AI overlay tools: - - Draft suggestions. - - Conversation summary. - - Quick engage send (shared framing, with its own two-step failsafe). - - Click-outside dismiss behavior for AI overlays. +Entry points: -### Live updates and traffic minimization -- WebSocket endpoint: `/ws/compose/thread/` (ASGI path). -- Compose clients open one persistent WS per panel and receive incremental updates. -- HTTP polling remains as a fallback at a slower interval when WS is unavailable. +- `/queue//` +- API: + - `/api/v1/queue/message/accept//` + - `/api/v1/queue/message/reject//` -This reduces repeated client GET requests compared with short polling loops. +Capabilities: -## AI workspace +- Hold suggested outbound actions. +- Human acceptance/rejection gates. +- Queue-first messaging flows for controlled operations. -Route: `/ai/workspace/` +## 5) OSINT Mode -Key capabilities: -- Summary, draft, pattern extraction. -- Mitigation planning and artifacts (rules/games/corrections). -- Insight metrics: - - Detail pages per metric. - - All-graphs page using Chart.js. - - Scoring help page describing formulas and interpretation. +Entry points: -## Configuration notes +- `/search//` +- `/osint/workspace/` -- `app/local_settings.py` is imported by `app/settings.py`. -- Redis cache is expected at `/var/run/gia-redis.sock` unless overridden. -- Service runtime URLs are read by `core/clients/transport.py`: - - `SIGNAL_HTTP_URL` - - `WHATSAPP_HTTP_URL` - - `INSTAGRAM_HTTP_URL` +Capabilities: -## ASGI / WebSocket note +- Search pages and widgets for scoped entity views. +- Workspace tab widgets and cross-context lookup UX. +- Foundation for auto-search workflows and graph/widget integration. -`app/asgi.py` now multiplexes: -- HTTP to Django ASGI app. -- WebSocket path `/ws/compose/thread/` to compose realtime handler. +## 6) Cross-Protocol Bridge Mode -Use an ASGI-capable server in environments where WebSockets are required. +Core behavior: -## Troubleshooting +- Inbound and outbound events are routed through UR runtime. +- XMPP bridge supports text, attachments, typing, and chat-state paths. +- Signal and WhatsApp media relay paths are normalized via shared transport/media logic. -### “Send failed. Check service account state.” -- Verify service account is linked and runtime is connected. -- Check `ur` and service logs (`make log`). +Key design points: -### WhatsApp QR not available -- Confirm `WHATSAPP_ENABLED=true`. -- Confirm Neonize initializes in `ur` logs. -- Retry Add Account after runtime is ready. +- Prefer shared media preparation over per-service duplicated logic. +- Preserve attachment semantics (image/video/audio/document) by MIME and filename inference. +- Avoid leaking internal `/compose/media/blob/...` links as transport relay text for attachment-only events. -### No compose realtime updates -- Verify ASGI deployment path and websocket upgrade support. -- Fallback polling should still update messages on interval. +## Runtime Architecture -## Security and operational safety +Core components: -- Outbound actions in manual compose/engage are guarded by two-step failsafe toggles. -- Queue workflow exists for additional human approval in managed flows. -- Deploy only with lawful consent and compliant data handling for your jurisdiction. +- `core/modules/router.py`: Unified Router runtime orchestration. +- `core/clients/transport.py`: Shared send/typing/attachment prep/runtime-command layer. +- `core/clients/signal.py`, `core/clients/signalapi.py`: Signal event + REST transport handling. +- `core/clients/whatsapp.py`: Neonize-backed runtime transport. +- `core/clients/xmpp.py`: XMPP component bridge and media upload relay. +- `core/views/compose.py`: Manual compose UX, polling/ws, send pipeline, media blob endpoint. +- `core/views/workspace.py`: AI workspace operations and insight surfaces. +- `core/views/osint.py`: Search/workspace OSINT interactions. + +Services in compose stack: + +- `app` (Django) +- `asgi` (websocket-capable app path) +- `ur` (runtime router) +- `scheduling` +- `redis` +- `signal-cli-rest-api` +- one-shot startup services (`migration`, `collectstatic`) + +## Interface Setup (Inside the App) + +After environment setup from `INSTALL.md`: + +1. Sign in as an admin user. +2. Open service pages and link required accounts. +3. Create/verify `Person` entries and `PersonIdentifier` mappings. +4. Open manual compose and test per-service send/receive. +5. Open AI workspace for analysis/mitigation workflows. +6. Verify queue workflows if approval mode is used. + +Recommended functional smoke test: + +- Signal -> web compose history +- WhatsApp -> web compose history +- XMPP -> service relay and back +- attachment send in both directions where bridge is enabled + +## Important Operational Notes + +- After runtime code changes, restart runtime services before validation. +- Full environment recycle convention: `make stop && make run`. +- If single-service restart fails due to dependency state, use full recycle. + +## Security & Reliability Notes + +- Keep upload endpoints on modern TLS (`TLSv1.2` + `TLSv1.3`) with valid chain. +- Keep logs high-signal in production; use debug-gated diagnostics temporarily. +- Preserve symmetry across transports where protocol capabilities permit. + +## Documentation Map + +- `INSTALL.md`: installation, env, startup, restart, troubleshooting. +- `LLM_CODING_STANDARDS.md`: coding-agent standards and repository practices. +- This `README.md`: capability map, operation modes, architecture, interface workflow. diff --git a/core/clients/signal.py b/core/clients/signal.py index 752c686..09d6f84 100644 --- a/core/clients/signal.py +++ b/core/clients/signal.py @@ -31,6 +31,23 @@ else: SIGNAL_URL = f"{SIGNAL_HOST}:{SIGNAL_PORT}" +def _is_internal_compose_blob_url(value: str) -> bool: + raw = str(value or "").strip() + if not raw: + return False + if raw.startswith("/compose/media/blob/"): + return True + parsed = urlparse(raw if "://" in raw else f"https://dummy{raw}") + return str(parsed.path or "").startswith("/compose/media/blob/") + + +def _is_compose_blob_only_text(text_value: str) -> bool: + lines = [line.strip() for line in str(text_value or "").splitlines() if line.strip()] + if not lines: + return False + return all(_is_internal_compose_blob_url(line) for line in lines) + + def _get_nested(payload, path): current = payload for key in path: @@ -129,6 +146,41 @@ def _extract_receipt_timestamps(receipt_payload): return [] +def _extract_signal_reaction(envelope): + paths = [ + ("dataMessage", "reaction"), + ("syncMessage", "sentMessage", "message", "reaction"), + ("syncMessage", "sentMessage", "reaction"), + ] + node = None + for path in paths: + candidate = _get_nested(envelope, path) + if isinstance(candidate, dict): + node = candidate + break + if not isinstance(node, dict): + return None + emoji = str(node.get("emoji") or "").strip() + target_ts = node.get("targetSentTimestamp") + if target_ts is None: + target_ts = node.get("targetTimestamp") + try: + target_ts = int(target_ts) + except Exception: + target_ts = 0 + remove = bool(node.get("remove") or node.get("isRemove")) + if not emoji and not remove: + return None + if target_ts <= 0: + return None + return { + "emoji": emoji, + "target_ts": target_ts, + "remove": remove, + "raw": dict(node), + } + + def _typing_started(typing_payload): action = str(typing_payload.get("action") or "").strip().lower() if action in {"started", "start", "typing", "composing"}: @@ -343,6 +395,32 @@ class HandleMessage(Command): ) return + reaction_payload = _extract_signal_reaction(envelope) + if isinstance(reaction_payload, dict): + log.debug( + "reaction-bridge signal-inbound target_ts=%s emoji=%s remove=%s identifiers=%s", + int(reaction_payload.get("target_ts") or 0), + str(reaction_payload.get("emoji") or "") or "-", + bool(reaction_payload.get("remove")), + len(identifiers), + ) + for identifier in identifiers: + try: + await self.ur.xmpp.client.apply_external_reaction( + identifier.user, + identifier, + source_service="signal", + emoji=str(reaction_payload.get("emoji") or ""), + remove=bool(reaction_payload.get("remove")), + upstream_message_id="", + upstream_ts=int(reaction_payload.get("target_ts") or 0), + actor=(source_uuid or source_number or ""), + payload=reaction_payload.get("raw") or {}, + ) + except Exception as exc: + log.warning("Signal reaction relay to XMPP failed: %s", exc) + return + # Handle attachments across multiple Signal payload variants. attachment_list = _extract_attachments(raw) xmpp_attachments = [] @@ -385,8 +463,11 @@ class HandleMessage(Command): f"/compose/media/blob/?key={quote_plus(str(blob_key))}" ) - if (not text) and compose_media_urls: - text = "\n".join(compose_media_urls) + # Keep relay payload text clean for XMPP. Blob URLs are web/history fallback + # only and should not be injected into XMPP body text. + relay_text = text + if attachment_list and _is_compose_blob_only_text(relay_text): + relay_text = "" # Forward incoming Signal messages to XMPP and apply mutate rules. identifier_text_overrides = {} @@ -407,7 +488,7 @@ class HandleMessage(Command): uploaded_urls = [] for manip in mutate_manips: prompt = replies.generate_mutate_reply_prompt( - text, + relay_text, None, manip, None, @@ -423,8 +504,13 @@ class HandleMessage(Command): result, is_outgoing_message, attachments=xmpp_attachments, + source_ref={ + "upstream_message_id": "", + "upstream_author": str(source_uuid or source_number or ""), + "upstream_ts": int(ts or 0), + }, ) - resolved_text = text + resolved_text = relay_text if (not resolved_text) and uploaded_urls: resolved_text = "\n".join(uploaded_urls) elif (not resolved_text) and compose_media_urls: @@ -437,11 +523,16 @@ class HandleMessage(Command): uploaded_urls = await self.ur.xmpp.client.send_from_external( user, identifier, - text, + relay_text, is_outgoing_message, attachments=xmpp_attachments, + source_ref={ + "upstream_message_id": "", + "upstream_author": str(source_uuid or source_number or ""), + "upstream_ts": int(ts or 0), + }, ) - resolved_text = text + resolved_text = relay_text if (not resolved_text) and uploaded_urls: resolved_text = "\n".join(uploaded_urls) elif (not resolved_text) and compose_media_urls: @@ -463,7 +554,7 @@ class HandleMessage(Command): session_cache[session_key] = chat_session sender_key = source_uuid or source_number or identifier_candidates[0] message_key = (chat_session.id, ts, sender_key) - message_text = identifier_text_overrides.get(session_key, text) + message_text = identifier_text_overrides.get(session_key, relay_text) if message_key not in stored_messages: await history.store_message( session=chat_session, diff --git a/core/clients/signalapi.py b/core/clients/signalapi.py index 37c4f71..991f9a1 100644 --- a/core/clients/signalapi.py +++ b/core/clients/signalapi.py @@ -28,7 +28,7 @@ async def stop_typing(uuid): return await response.text() # Optional: Return response content -async def download_and_encode_base64(file_url, filename, content_type): +async def download_and_encode_base64(file_url, filename, content_type, session=None): """ Downloads a file from a given URL asynchronously, converts it to Base64, and returns it in Signal's expected format. @@ -42,10 +42,17 @@ async def download_and_encode_base64(file_url, filename, content_type): str | None: The Base64 encoded attachment string in Signal's expected format, or None on failure. """ try: - async with aiohttp.ClientSession() as session: + if session is not None: async with session.get(file_url, timeout=10) as response: if response.status != 200: - # log.error(f"Failed to download file: {file_url}, status: {response.status}") + return None + file_data = await response.read() + base64_encoded = base64.b64encode(file_data).decode("utf-8") + return f"data:{content_type};filename={filename};base64,{base64_encoded}" + + async with aiohttp.ClientSession() as local_session: + async with local_session.get(file_url, timeout=10) as response: + if response.status != 200: return None file_data = await response.read() @@ -82,19 +89,39 @@ async def send_message_raw(recipient_uuid, text=None, attachments=None): "base64_attachments": [], } - # Asynchronously download and encode all attachments + async def _attachment_to_base64(attachment, session): + row = dict(attachment or {}) + filename = row.get("filename") or "attachment.bin" + content_type = row.get("content_type") or "application/octet-stream" + content = row.get("content") + if isinstance(content, memoryview): + content = content.tobytes() + elif isinstance(content, bytearray): + content = bytes(content) + if isinstance(content, bytes): + encoded = base64.b64encode(content).decode("utf-8") + return f"data:{content_type};filename={filename};base64,{encoded}" + file_url = row.get("url") + if not file_url: + return None + return await download_and_encode_base64(file_url, filename, content_type, session) + + # Asynchronously resolve and encode all attachments attachments = attachments or [] - tasks = [ - download_and_encode_base64(att["url"], att["filename"], att["content_type"]) - for att in attachments - ] - encoded_attachments = await asyncio.gather(*tasks) + async with aiohttp.ClientSession() as session: + tasks = [_attachment_to_base64(att, session) for att in attachments] + encoded_attachments = await asyncio.gather(*tasks) # Filter out failed downloads (None values) data["base64_attachments"] = [att for att in encoded_attachments if att] # Remove the message body if it only contains an attachment link - if text and (text.strip() in [att["url"] for att in attachments]): + attachment_urls = { + str((att or {}).get("url") or "").strip() + for att in attachments + if str((att or {}).get("url") or "").strip() + } + if text and text.strip() in attachment_urls: # log.info("Removing message body since it only contains an attachment link.") text = None # Don't send the link as text @@ -112,6 +139,42 @@ async def send_message_raw(recipient_uuid, text=None, attachments=None): return False +async def send_reaction( + recipient_uuid, + emoji, + target_timestamp=None, + target_author=None, + remove=False, +): + base = getattr(settings, "SIGNAL_HTTP_URL", "http://signal:8080").rstrip("/") + sender_number = settings.SIGNAL_NUMBER + if not recipient_uuid or not target_timestamp: + return False + + payload = { + "recipient": recipient_uuid, + "reaction": str(emoji or ""), + "target_author": str(target_author or recipient_uuid), + "timestamp": int(target_timestamp), + "remove": bool(remove), + } + + candidate_urls = [f"{base}/v1/reactions/{sender_number}"] + + timeout = aiohttp.ClientTimeout(total=20) + async with aiohttp.ClientSession(timeout=timeout) as session: + for url in candidate_urls: + for method in ("post",): + try: + request = getattr(session, method) + async with request(url, json=payload) as response: + if 200 <= response.status < 300: + return True + except Exception: + continue + return False + + async def fetch_signal_attachment(attachment_id): """ Asynchronously fetches an attachment from Signal. diff --git a/core/clients/transport.py b/core/clients/transport.py index b0c5913..462deea 100644 --- a/core/clients/transport.py +++ b/core/clients/transport.py @@ -9,6 +9,7 @@ from urllib.parse import quote_plus import aiohttp import orjson import qrcode +from asgiref.sync import sync_to_async from django.conf import settings from django.core.cache import cache @@ -21,6 +22,7 @@ log = logs.get_logger("transport") _RUNTIME_STATE_TTL = 60 * 60 * 24 _RUNTIME_COMMANDS_TTL = 60 * 15 _RUNTIME_COMMAND_RESULT_TTL = 60 +_BRIDGE_MAP_TTL = 60 * 60 * 24 * 14 _RUNTIME_CLIENTS: dict[str, Any] = {} @@ -48,6 +50,10 @@ def _runtime_command_meta_key(service: str, command_id: str) -> str: return f"gia:service:command-meta:{_service_key(service)}:{command_id}" +def _bridge_map_key(user_id: int, person_id: int, service: str) -> str: + return f"gia:bridge:map:{int(user_id)}:{int(person_id)}:{_service_key(service)}" + + def _gateway_base(service: str) -> str: key = f"{service.upper()}_HTTP_URL" default = f"http://{service}:8080" @@ -69,6 +75,50 @@ def _parse_timestamp(data: Any): return None +def _attachment_has_inline_content(attachment: dict | None) -> bool: + value = (attachment or {}).get("content") + return isinstance(value, (bytes, bytearray, memoryview)) + + +def _normalize_inline_content(attachment: dict) -> dict: + row = dict(attachment or {}) + content = row.get("content") + if isinstance(content, memoryview): + row["content"] = content.tobytes() + elif isinstance(content, bytearray): + row["content"] = bytes(content) + if isinstance(row.get("content"), bytes) and not row.get("size"): + row["size"] = len(row["content"]) + return row + + +async def prepare_outbound_attachments(service: str, attachments: list | None) -> list: + """ + Resolve outbound attachment refs into payloads once, in parallel. + + This is the shared media-prep layer for XMPP -> {Signal, WhatsApp} sends, + so attachment performance improvements live in one place. + + TODO: Stream per-attachment send as each payload resolves (as_completed) + to reduce first-byte latency for large media batches. + """ + rows = [dict(att or {}) for att in (attachments or [])] + if not rows: + return [] + + async def _resolve(row: dict): + if _attachment_has_inline_content(row): + return _normalize_inline_content(row) + fetched = await fetch_attachment(service, row) + if not fetched: + return row + merged = dict(row) + merged.update(dict(fetched or {})) + return _normalize_inline_content(merged) + + return await asyncio.gather(*[_resolve(row) for row in rows]) + + def register_runtime_client(service: str, client: Any): """ Register an in-process runtime client (UR process). @@ -96,6 +146,178 @@ def update_runtime_state(service: str, **updates): return state +def record_bridge_mapping( + *, + user_id: int, + person_id: int, + service: str, + xmpp_message_id: str = "", + xmpp_ts: int | None = None, + upstream_message_id: str = "", + upstream_author: str = "", + upstream_ts: int | None = None, + text_preview: str = "", + local_message_id: str = "", +): + key = _bridge_map_key(user_id, person_id, service) + rows = list(cache.get(key) or []) + now_ts = int(time.time() * 1000) + entry = { + "xmpp_message_id": str(xmpp_message_id or "").strip(), + "xmpp_ts": int(xmpp_ts or 0), + "upstream_message_id": str(upstream_message_id or "").strip(), + "upstream_author": str(upstream_author or "").strip(), + "upstream_ts": int(upstream_ts or 0), + "text_preview": str(text_preview or "").strip()[:1000], + "local_message_id": str(local_message_id or "").strip(), + "updated_at": now_ts, + } + if not entry["xmpp_message_id"] and not entry["upstream_message_id"]: + if entry["upstream_ts"] <= 0 and entry["xmpp_ts"] <= 0: + return None + + deduped = [] + for row in rows: + same_xmpp = bool(entry["xmpp_message_id"]) and ( + str((row or {}).get("xmpp_message_id") or "").strip() + == entry["xmpp_message_id"] + ) + same_upstream = bool(entry["upstream_message_id"]) and ( + str((row or {}).get("upstream_message_id") or "").strip() + == entry["upstream_message_id"] + ) + if same_xmpp or same_upstream: + continue + deduped.append(dict(row or {})) + + deduped.append(entry) + if len(deduped) > 2000: + deduped = deduped[-2000:] + cache.set(key, deduped, timeout=_BRIDGE_MAP_TTL) + log.debug( + "reaction-bridge map-write service=%s user=%s person=%s xmpp_id=%s upstream_id=%s upstream_ts=%s local_id=%s rows=%s", + service, + user_id, + person_id, + entry.get("xmpp_message_id") or "-", + entry.get("upstream_message_id") or "-", + entry.get("upstream_ts") or 0, + entry.get("local_message_id") or "-", + len(deduped), + ) + return entry + + +def resolve_bridge_from_xmpp( + *, user_id: int, person_id: int, service: str, xmpp_message_id: str +): + target_id = str(xmpp_message_id or "").strip() + if not target_id: + return None + key = _bridge_map_key(user_id, person_id, service) + rows = list(cache.get(key) or []) + for row in reversed(rows): + if str((row or {}).get("xmpp_message_id") or "").strip() == target_id: + log.debug( + "reaction-bridge resolve-xmpp-hit service=%s user=%s person=%s xmpp_id=%s upstream_id=%s", + service, + user_id, + person_id, + target_id, + str((row or {}).get("upstream_message_id") or "-").strip(), + ) + return dict(row or {}) + log.debug( + "reaction-bridge resolve-xmpp-miss service=%s user=%s person=%s xmpp_id=%s rows=%s", + service, + user_id, + person_id, + target_id, + len(rows), + ) + return None + + +def resolve_bridge_from_upstream( + *, + user_id: int, + person_id: int, + service: str, + upstream_message_id: str = "", + upstream_ts: int | None = None, +): + key = _bridge_map_key(user_id, person_id, service) + rows = list(cache.get(key) or []) + target_id = str(upstream_message_id or "").strip() + if target_id: + for row in reversed(rows): + if str((row or {}).get("upstream_message_id") or "").strip() == target_id: + log.debug( + "reaction-bridge resolve-upstream-id-hit service=%s user=%s person=%s upstream_id=%s xmpp_id=%s", + service, + user_id, + person_id, + target_id, + str((row or {}).get("xmpp_message_id") or "-").strip(), + ) + return dict(row or {}) + target_ts = int(upstream_ts or 0) + if target_ts > 0: + best = None + best_gap = None + for row in rows: + row_ts = int((row or {}).get("upstream_ts") or 0) + if row_ts <= 0: + continue + gap = abs(row_ts - target_ts) + row_updated = int((row or {}).get("updated_at") or 0) + best_updated = int((best or {}).get("updated_at") or 0) if best else 0 + if ( + best is None + or gap < best_gap + or (gap == best_gap and row_updated > best_updated) + ): + best = dict(row or {}) + best_gap = gap + if best is not None and best_gap is not None and best_gap <= 15_000: + log.debug( + "reaction-bridge resolve-upstream-ts-hit service=%s user=%s person=%s target_ts=%s gap_ms=%s picked_xmpp_id=%s picked_upstream_ts=%s", + service, + user_id, + person_id, + target_ts, + best_gap, + str((best or {}).get("xmpp_message_id") or "-").strip(), + int((best or {}).get("upstream_ts") or 0), + ) + return best + log.debug( + "reaction-bridge resolve-upstream-miss service=%s user=%s person=%s upstream_id=%s upstream_ts=%s rows=%s", + service, + user_id, + person_id, + target_id or "-", + target_ts, + len(rows), + ) + return None + + +def resolve_bridge_from_text_hint( + *, user_id: int, person_id: int, service: str, text_hint: str +): + hint = str(text_hint or "").strip().lower() + if not hint: + return None + key = _bridge_map_key(user_id, person_id, service) + rows = list(cache.get(key) or []) + for row in reversed(rows): + preview = str((row or {}).get("text_preview") or "").strip().lower() + if preview and (preview == hint or hint in preview): + return dict(row or {}) + return None + + def enqueue_runtime_command( service: str, action: str, payload: dict | None = None ) -> str: @@ -478,13 +700,62 @@ async def _gateway_typing(service: str, recipient: str, started: bool): return False -async def send_message_raw(service: str, recipient: str, text=None, attachments=None): +async def send_message_raw( + service: str, + recipient: str, + text=None, + attachments=None, + metadata: dict | None = None, +): """ Unified outbound send path used by models/views/UR. """ service_key = _service_key(service) if service_key == "signal": - return await signalapi.send_message_raw(recipient, text, attachments or []) + prepared_attachments = await prepare_outbound_attachments( + service_key, attachments or [] + ) + result = await signalapi.send_message_raw(recipient, text, prepared_attachments) + meta = dict(metadata or {}) + xmpp_source_id = str(meta.get("xmpp_source_id") or "").strip() + if xmpp_source_id and result: + from core.models import PersonIdentifier + + identifier_row = await sync_to_async( + lambda: PersonIdentifier.objects.filter( + service="signal", + identifier=recipient, + ) + .select_related("user", "person") + .first() + )() + if identifier_row is not None: + record_bridge_mapping( + user_id=identifier_row.user_id, + person_id=identifier_row.person_id, + service="signal", + xmpp_message_id=xmpp_source_id, + xmpp_ts=int(meta.get("xmpp_source_ts") or 0), + upstream_message_id="", + upstream_author=str(meta.get("upstream_author") or ""), + upstream_ts=int(result) if isinstance(result, int) else 0, + text_preview=str(meta.get("xmpp_body") or text or ""), + local_message_id=str(meta.get("legacy_message_id") or ""), + ) + from core.messaging import history + + await history.save_bridge_ref( + user=identifier_row.user, + identifier=identifier_row, + source_service="signal", + local_message_id=str(meta.get("legacy_message_id") or ""), + local_ts=int(meta.get("xmpp_source_ts") or 0), + xmpp_message_id=xmpp_source_id, + upstream_message_id="", + upstream_author=str(meta.get("upstream_author") or ""), + upstream_ts=int(result) if isinstance(result, int) else 0, + ) + return result if service_key == "whatsapp": runtime_client = get_runtime_client(service_key) @@ -493,7 +764,10 @@ async def send_message_raw(service: str, recipient: str, text=None, attachments= runtime_result = await runtime_client.send_message_raw( recipient, text=text, - attachments=attachments or [], + attachments=await prepare_outbound_attachments( + service_key, attachments or [] + ), + metadata=dict(metadata or {}), ) if runtime_result is not False and runtime_result is not None: return runtime_result @@ -501,8 +775,11 @@ async def send_message_raw(service: str, recipient: str, text=None, attachments= log.warning("%s runtime send failed: %s", service_key, exc) # Web/UI process cannot access UR in-process runtime client directly. # Hand off send to UR via shared cache command queue. + prepared_attachments = await prepare_outbound_attachments( + service_key, attachments or [] + ) command_attachments = [] - for att in attachments or []: + for att in prepared_attachments: row = dict(att or {}) # Keep payload cache-friendly and avoid embedding raw bytes. for key in ("content",): @@ -515,6 +792,7 @@ async def send_message_raw(service: str, recipient: str, text=None, attachments= "recipient": recipient, "text": text or "", "attachments": command_attachments, + "metadata": dict(metadata or {}), }, ) command_result = await wait_runtime_command_result( @@ -540,6 +818,7 @@ async def send_message_raw(service: str, recipient: str, text=None, attachments= recipient, text=text, attachments=attachments or [], + metadata=dict(metadata or {}), ) if runtime_result is not False and runtime_result is not None: return runtime_result @@ -557,6 +836,85 @@ async def send_message_raw(service: str, recipient: str, text=None, attachments= raise NotImplementedError(f"Unsupported service: {service}") +async def send_reaction( + service: str, + recipient: str, + *, + emoji: str, + target_message_id: str = "", + target_timestamp: int | None = None, + target_author: str = "", + remove: bool = False, +): + service_key = _service_key(service) + if not str(emoji or "").strip() and not remove: + return False + + if service_key == "signal": + log.debug( + "reaction-bridge send service=signal recipient=%s target_ts=%s target_author=%s remove=%s", + recipient, + int(target_timestamp or 0), + str(target_author or recipient), + bool(remove), + ) + return await signalapi.send_reaction( + recipient_uuid=recipient, + emoji=str(emoji or ""), + target_timestamp=target_timestamp, + target_author=str(target_author or recipient), + remove=remove, + ) + + if service_key == "whatsapp": + runtime_client = get_runtime_client(service_key) + if runtime_client and hasattr(runtime_client, "send_reaction"): + try: + log.debug( + "reaction-bridge send service=whatsapp runtime recipient=%s target_id=%s target_ts=%s remove=%s", + recipient, + str(target_message_id or "") or "-", + int(target_timestamp or 0), + bool(remove), + ) + result = await runtime_client.send_reaction( + recipient, + emoji=str(emoji or ""), + target_message_id=str(target_message_id or ""), + target_timestamp=(int(target_timestamp) if target_timestamp else 0), + remove=bool(remove), + ) + if result: + return True + except Exception as exc: + log.warning("%s runtime reaction failed: %s", service_key, exc) + + command_id = enqueue_runtime_command( + service_key, + "send_reaction", + { + "recipient": recipient, + "emoji": str(emoji or ""), + "target_message_id": str(target_message_id or ""), + "target_timestamp": int(target_timestamp or 0), + "remove": bool(remove), + }, + ) + command_result = await wait_runtime_command_result( + service_key, + command_id, + timeout=20.0, + ) + log.debug( + "reaction-bridge send service=whatsapp queued-result ok=%s command_id=%s", + bool(isinstance(command_result, dict) and command_result.get("ok")), + command_id, + ) + return bool(isinstance(command_result, dict) and command_result.get("ok")) + + return False + + async def start_typing(service: str, recipient: str): service_key = _service_key(service) if service_key == "signal": diff --git a/core/clients/whatsapp.py b/core/clients/whatsapp.py index 5045f2a..9306e90 100644 --- a/core/clients/whatsapp.py +++ b/core/clients/whatsapp.py @@ -1,6 +1,7 @@ import asyncio import inspect import logging +import mimetypes import os import re import sqlite3 @@ -695,6 +696,7 @@ class WhatsAppClient(ClientBase): recipient = str(payload.get("recipient") or "").strip() text = payload.get("text") attachments = payload.get("attachments") or [] + metadata = dict(payload.get("metadata") or {}) send_timeout_s = 18.0 try: # Include command_id so send_message_raw can observe cancel requests @@ -704,6 +706,7 @@ class WhatsAppClient(ClientBase): text=text, attachments=attachments, command_id=command_id, + metadata=metadata, ), timeout=send_timeout_s, ) @@ -775,6 +778,41 @@ class WhatsAppClient(ClientBase): ) return + if action == "send_reaction": + recipient = str(payload.get("recipient") or "").strip() + emoji = str(payload.get("emoji") or "") + target_message_id = str(payload.get("target_message_id") or "").strip() + target_timestamp = int(payload.get("target_timestamp") or 0) + remove = bool(payload.get("remove")) + try: + ok = await self.send_reaction( + recipient=recipient, + emoji=emoji, + target_message_id=target_message_id, + target_timestamp=target_timestamp, + remove=remove, + ) + transport.set_runtime_command_result( + self.service, + command_id, + { + "ok": bool(ok), + "timestamp": int(time.time() * 1000), + "error": "" if ok else "reaction_send_failed", + }, + ) + return + except Exception as exc: + transport.set_runtime_command_result( + self.service, + command_id, + { + "ok": False, + "error": str(exc), + }, + ) + return + if action == "force_history_sync": target_identifier = str(payload.get("identifier") or "").strip() try: @@ -2066,6 +2104,50 @@ class WhatsAppClient(ClientBase): return True return False + def _infer_media_content_type(self, message_obj): + if self._pluck(message_obj, "imageMessage") or self._pluck( + message_obj, "image_message" + ): + return "image/jpeg" + if self._pluck(message_obj, "videoMessage") or self._pluck( + message_obj, "video_message" + ): + return "video/mp4" + if self._pluck(message_obj, "audioMessage") or self._pluck( + message_obj, "audio_message" + ): + return "audio/ogg" + if self._pluck(message_obj, "stickerMessage") or self._pluck( + message_obj, "sticker_message" + ): + return "image/webp" + return "application/octet-stream" + + def _extract_reaction_event(self, message_obj): + node = self._pluck(message_obj, "reactionMessage") or self._pluck( + message_obj, "reaction_message" + ) + if not node: + return None + emoji = str( + self._pluck(node, "text") or self._pluck(node, "emoji") or "" + ).strip() + target_msg_id = str( + self._pluck(node, "key", "id") + or self._pluck(node, "key", "ID") + or self._pluck(node, "targetMessageKey", "id") + or self._pluck(node, "target_message_key", "id") + or "" + ).strip() + remove = bool(not emoji) + if not target_msg_id: + return None + return { + "emoji": emoji, + "target_message_id": target_msg_id, + "remove": remove, + } + async def _download_event_media(self, event): if not self._client: return [] @@ -2089,15 +2171,21 @@ class WhatsAppClient(ClientBase): filename = ( self._pluck(msg_obj, "documentMessage", "fileName") or self._pluck(msg_obj, "document_message", "file_name") - or f"wa-{int(time.time())}.bin" ) content_type = ( self._pluck(msg_obj, "documentMessage", "mimetype") or self._pluck(msg_obj, "document_message", "mimetype") or self._pluck(msg_obj, "imageMessage", "mimetype") or self._pluck(msg_obj, "image_message", "mimetype") - or "application/octet-stream" + or self._pluck(msg_obj, "videoMessage", "mimetype") + or self._pluck(msg_obj, "video_message", "mimetype") + or self._pluck(msg_obj, "audioMessage", "mimetype") + or self._pluck(msg_obj, "audio_message", "mimetype") + or self._infer_media_content_type(msg_obj) ) + if not filename: + ext = mimetypes.guess_extension(str(content_type or "").split(";", 1)[0].strip().lower()) + filename = f"wa-{int(time.time())}{ext or '.bin'}" blob_key = media_bridge.put_blob( service="whatsapp", content=bytes(payload), @@ -2119,7 +2207,7 @@ class WhatsAppClient(ClientBase): msg_obj = self._pluck(event, "message") or self._pluck(event, "Message") text = self._message_text(msg_obj, event) if not text: - self.log.info( + self.log.debug( "whatsapp empty-text event shape: msg_keys=%s event_keys=%s type=%s", self._shape_keys(msg_obj), self._shape_keys(event), @@ -2158,6 +2246,54 @@ class WhatsAppClient(ClientBase): or "" ).strip() ts = self._normalize_timestamp(raw_ts) + + reaction_payload = self._extract_reaction_event(msg_obj) + if reaction_payload: + self.log.debug( + "reaction-bridge whatsapp-inbound msg_id=%s target_id=%s emoji=%s remove=%s sender=%s chat=%s", + msg_id or "-", + str(reaction_payload.get("target_message_id") or "") or "-", + str(reaction_payload.get("emoji") or "") or "-", + bool(reaction_payload.get("remove")), + sender or "-", + chat or "-", + ) + identifier_values = self._normalize_identifier_candidates(sender, chat) + if not identifier_values: + self.log.warning( + "reaction-bridge whatsapp-identifiers-miss sender=%s chat=%s", + sender or "-", + chat or "-", + ) + return + identifiers = await sync_to_async(list)( + PersonIdentifier.objects.filter( + service="whatsapp", + identifier__in=list(identifier_values), + ) + ) + for identifier in identifiers: + try: + await self.ur.xmpp.client.apply_external_reaction( + identifier.user, + identifier, + source_service="whatsapp", + emoji=str(reaction_payload.get("emoji") or ""), + remove=bool(reaction_payload.get("remove")), + upstream_message_id=str( + reaction_payload.get("target_message_id") or "" + ), + upstream_ts=0, + actor=(sender or chat or ""), + payload={ + "event": "reaction", + "message_id": msg_id, + }, + ) + except Exception as exc: + self.log.warning("whatsapp reaction relay to XMPP failed: %s", exc) + return + self._remember_contact( sender or chat, jid=sender, @@ -2206,6 +2342,11 @@ class WhatsAppClient(ClientBase): text, is_outgoing_message=is_from_me, attachments=xmpp_attachments, + source_ref={ + "upstream_message_id": str(msg_id or ""), + "upstream_author": str(sender or chat or ""), + "upstream_ts": int(ts or 0), + }, ) display_text = text if (not display_text) and uploaded_urls: @@ -2440,7 +2581,12 @@ class WhatsAppClient(ClientBase): return None async def send_message_raw( - self, recipient, text=None, attachments=None, command_id: str | None = None + self, + recipient, + text=None, + attachments=None, + command_id: str | None = None, + metadata: dict | None = None, ): self._last_send_error = "" if not self._client: @@ -2500,6 +2646,46 @@ class WhatsAppClient(ClientBase): sent_any = False sent_ts = 0 + metadata = dict(metadata or {}) + xmpp_source_id = str(metadata.get("xmpp_source_id") or "").strip() + legacy_message_id = str(metadata.get("legacy_message_id") or "").strip() + person_identifier = None + if xmpp_source_id: + candidates = list(self._normalize_identifier_candidates(recipient, jid_str)) + if candidates: + person_identifier = await sync_to_async( + lambda: PersonIdentifier.objects.filter( + service="whatsapp", + identifier__in=candidates, + ) + .select_related("user", "person") + .first() + )() + + def _extract_response_message_id(response): + return str( + self._pluck(response, "ID") + or self._pluck(response, "id") + or self._pluck(response, "Info", "ID") + or self._pluck(response, "info", "id") + or "" + ).strip() + + def _record_bridge(response, ts_value, body_hint=""): + if not xmpp_source_id or person_identifier is None: + return + transport.record_bridge_mapping( + user_id=person_identifier.user_id, + person_id=person_identifier.person_id, + service="whatsapp", + xmpp_message_id=xmpp_source_id, + xmpp_ts=int(metadata.get("xmpp_source_ts") or 0), + upstream_message_id=_extract_response_message_id(response), + upstream_ts=int(ts_value or 0), + text_preview=str(body_hint or metadata.get("xmpp_body") or ""), + local_message_id=legacy_message_id, + ) + for attachment in attachments or []: payload = await self._fetch_attachment_payload(attachment) if not payload: @@ -2510,6 +2696,22 @@ class WhatsAppClient(ClientBase): data = payload.get("content") or b"" filename = payload.get("filename") or "attachment.bin" attachment_target = jid_obj if jid_obj is not None else jid + send_method = "document" + if mime.startswith("image/") and hasattr(self._client, "send_image"): + send_method = "image" + elif mime.startswith("video/") and hasattr(self._client, "send_video"): + send_method = "video" + elif mime.startswith("audio/") and hasattr(self._client, "send_audio"): + send_method = "audio" + + if getattr(settings, "WHATSAPP_DEBUG", False): + self.log.debug( + "whatsapp media send prep: method=%s mime=%s filename=%s size=%s", + send_method, + mime, + filename, + len(data) if isinstance(data, (bytes, bytearray)) else 0, + ) try: if mime.startswith("image/") and hasattr(self._client, "send_image"): @@ -2540,7 +2742,15 @@ class WhatsAppClient(ClientBase): sent_ts, self._normalize_timestamp(self._pluck(response, "Timestamp") or 0), ) + _record_bridge(response, sent_ts, body_hint=filename) sent_any = True + if getattr(settings, "WHATSAPP_DEBUG", False): + self.log.debug( + "whatsapp media send ok: method=%s filename=%s ts=%s", + send_method, + filename, + self._normalize_timestamp(self._pluck(response, "Timestamp") or 0), + ) except Exception as exc: self.log.warning("whatsapp attachment send failed: %s", exc) @@ -2661,6 +2871,7 @@ class WhatsAppClient(ClientBase): sent_ts, self._normalize_timestamp(self._pluck(response, "Timestamp") or 0), ) + _record_bridge(response, sent_ts, body_hint=str(text or "")) if not sent_any: self._last_send_error = "no_payload_sent" @@ -2730,6 +2941,72 @@ class WhatsAppClient(ClientBase): pass return False + async def send_reaction( + self, + recipient, + *, + emoji, + target_message_id="", + target_timestamp=0, + remove=False, + ): + if not self._client: + return False + jid = self._to_jid(recipient) + if not jid: + return False + target_id = str(target_message_id or "").strip() + if not target_id: + return False + + reaction_emoji = "" if remove else str(emoji or "").strip() + candidate_names = ( + "send_reaction", + "react", + "send_message_reaction", + "reaction", + ) + self.log.debug( + "reaction-bridge whatsapp-send start recipient=%s target_id=%s emoji=%s remove=%s", + recipient, + target_id, + reaction_emoji or "-", + bool(remove), + ) + for method_name in candidate_names: + method = getattr(self._client, method_name, None) + if method is None: + continue + attempts = [ + (jid, target_id, reaction_emoji), + (jid, target_id, reaction_emoji, bool(remove)), + (jid, reaction_emoji, target_id), + ] + for args in attempts: + try: + response = await self._call_client_method(method, *args, timeout=9.0) + if response is not None: + self.log.debug( + "reaction-bridge whatsapp-send ok method=%s args_len=%s", + method_name, + len(args), + ) + return True + except Exception as exc: + self.log.debug( + "reaction-bridge whatsapp-send miss method=%s args_len=%s error=%s", + method_name, + len(args), + exc, + ) + continue + self.log.warning( + "reaction-bridge whatsapp-send failed recipient=%s target_id=%s", + recipient, + target_id, + ) + return False + async def fetch_attachment(self, attachment_ref): blob_key = (attachment_ref or {}).get("blob_key") if blob_key: diff --git a/core/clients/xmpp.py b/core/clients/xmpp.py index 597443e..9db7e48 100644 --- a/core/clients/xmpp.py +++ b/core/clients/xmpp.py @@ -1,5 +1,8 @@ import asyncio +import mimetypes import re +import time +import uuid from urllib.parse import urlsplit import aiohttp @@ -12,7 +15,7 @@ from slixmpp.stanza import Message from slixmpp.xmlstream import register_stanza_plugin from slixmpp.xmlstream.stanzabase import ET -from core.clients import ClientBase +from core.clients import ClientBase, transport from core.messaging import ai, history, replies, utils from core.models import ( ChatSession, @@ -30,6 +33,9 @@ from core.models import ( from core.util import logs URL_PATTERN = re.compile(r"https?://[^\s<>'\"\\]+") +EMOJI_ONLY_PATTERN = re.compile( + r"^[\U0001F300-\U0001FAFF\u2600-\u27BF\uFE0F\u200D\u2640-\u2642\u2764]+$" +) def _clean_url(value): @@ -42,6 +48,12 @@ def _filename_from_url(url_value): return name or "attachment" +def _content_type_from_filename_or_url(url_value, default="application/octet-stream"): + filename = _filename_from_url(url_value) + guessed, _ = mimetypes.guess_type(filename) + return guessed or default + + def _extract_xml_attachment_urls(message_stanza): urls = [] @@ -74,6 +86,46 @@ def _extract_xml_attachment_urls(message_stanza): return urls +def _extract_xmpp_reaction(message_stanza): + nodes = message_stanza.xml.findall(".//{urn:xmpp:reactions:0}reactions") + if not nodes: + return None + node = nodes[0] + target_id = str(node.attrib.get("id") or "").strip() + emojis = [] + for child in node.findall("{urn:xmpp:reactions:0}reaction"): + value = str(child.text or "").strip() + if value: + emojis.append(value) + return { + "target_id": target_id, + "emoji": emojis[0] if emojis else "", + "remove": len(emojis) == 0, + } + + +def _extract_xmpp_reply_target_id(message_stanza): + reply = message_stanza.xml.find(".//{urn:xmpp:reply:0}reply") + if reply is None: + return "" + return str(reply.attrib.get("id") or reply.attrib.get("to") or "").strip() + + +def _parse_greentext_reaction(body_text): + lines = [line.strip() for line in str(body_text or "").splitlines() if line.strip()] + if len(lines) != 2: + return None + if not lines[0].startswith(">"): + return None + quoted = lines[0][1:].strip() + emoji = lines[1].strip() + if not quoted or not emoji: + return None + if not EMOJI_ONLY_PATTERN.match(emoji): + return None + return {"quoted_text": quoted, "emoji": emoji} + + class XMPPComponent(ComponentXMPP): """ @@ -82,6 +134,7 @@ class XMPPComponent(ComponentXMPP): def __init__(self, ur, jid, secret, server, port): self.ur = ur + self._upload_config_warned = False self.log = logs.get_logger("XMPP") @@ -130,6 +183,8 @@ class XMPPComponent(ComponentXMPP): self.log.error(f"Failed to enable Carbons: {e}") def get_identifier(self, msg): + xmpp_message_id = str(msg.get("id") or "").strip() + # Extract sender JID (full format: user@domain/resource) sender_jid = str(msg["from"]) @@ -798,10 +853,43 @@ class XMPPComponent(ComponentXMPP): or getattr(settings, "XMPP_UPLOAD_JID", "") ).strip() if not upload_service_jid: - self.log.error( - "XMPP upload service is not configured. Set XMPP_UPLOAD_SERVICE." - ) - return None + discovered = None + try: + discovered = await self["xep_0363"].find_upload_service() + except Exception as exc: + self.log.debug("XMPP upload service discovery failed: %s", exc) + if discovered: + discovered_jid = "" + try: + discovered_jid = str(getattr(discovered, "jid", "") or "").strip() + except Exception: + discovered_jid = "" + + if not discovered_jid: + raw_discovered = str(discovered or "").strip() + if raw_discovered.startswith("<"): + try: + node = ET.fromstring(raw_discovered) + discovered_jid = str(node.attrib.get("from") or "").strip() + except Exception: + discovered_jid = "" + else: + discovered_jid = raw_discovered + + upload_service_jid = discovered_jid + if upload_service_jid: + self.log.info( + "Discovered XMPP upload service via XEP-0363: %s", + upload_service_jid, + ) + else: + if not self._upload_config_warned: + self.log.warning( + "XMPP upload service not configured/discoverable; skipping attachment upload. " + "Set XMPP_UPLOAD_SERVICE (or XMPP_UPLOAD_JID)." + ) + self._upload_config_warned = True + return None try: slot = await self["xep_0363"].request_slot( @@ -849,6 +937,8 @@ class XMPPComponent(ComponentXMPP): def sym(value): msg.reply(f"[>] {value}").send() + xmpp_message_id = str(msg.get("id") or "").strip() + # Extract sender JID (full format: user@domain/resource) sender_jid = str(msg["from"]) @@ -872,6 +962,9 @@ class XMPPComponent(ComponentXMPP): # Extract message body body = msg["body"] if msg["body"] else "" + parsed_reaction = _extract_xmpp_reaction(msg) + parsed_reply_target = _extract_xmpp_reply_target_id(msg) + greentext_reaction = _parse_greentext_reaction(body) attachments = [] self.log.debug( @@ -898,11 +991,12 @@ class XMPPComponent(ComponentXMPP): url_value = _clean_url(oob.text) if not url_value: continue + guessed_content_type = _content_type_from_filename_or_url(url_value) attachments.append( { "url": url_value, "filename": _filename_from_url(url_value), - "content_type": "application/octet-stream", + "content_type": guessed_content_type, } ) @@ -912,11 +1006,12 @@ class XMPPComponent(ComponentXMPP): for url_value in extracted_urls: if url_value in existing_urls: continue + guessed_content_type = _content_type_from_filename_or_url(url_value) attachments.append( { "url": url_value, "filename": _filename_from_url(url_value), - "content_type": "application/octet-stream", + "content_type": guessed_content_type, } ) @@ -931,6 +1026,17 @@ class XMPPComponent(ComponentXMPP): if attachment_urls: body = "\n".join(attachment_urls) + relay_body = body + attachment_urls_for_body = [ + str(item.get("url") or "").strip() + for item in attachments + if str(item.get("url") or "").strip() + ] + if attachment_urls_for_body: + joined_urls = "\n".join(attachment_urls_for_body).strip() + if str(relay_body or "").strip() == joined_urls: + relay_body = "" + self.log.debug("Extracted %s attachments from XMPP message", len(attachments)) # Log extracted information with variable name annotations log_message = ( @@ -1021,6 +1127,106 @@ class XMPPComponent(ComponentXMPP): # sym(str(person.__dict__)) # sym(f"Service: {recipient_service}") + if parsed_reaction or greentext_reaction: + # TODO(web-ui-react): expose explicit web compose reaction actions + # that call this same bridge path (without text heuristics). + # TODO(edit-sync): extend bridge mapping to include edit message ids + # and reconcile upstream edit capability differences in UI. + # TODO(retract-sync): propagate delete/retract state through this + # same mapping layer for protocol parity. + reaction_payload = parsed_reaction or { + "target_id": parsed_reply_target, + "emoji": str((greentext_reaction or {}).get("emoji") or ""), + "remove": False, + } + if not str(reaction_payload.get("target_id") or "").strip(): + text_hint = str((greentext_reaction or {}).get("quoted_text") or "") + hint_match = transport.resolve_bridge_from_text_hint( + user_id=identifier.user_id, + person_id=identifier.person_id, + service=recipient_service, + text_hint=text_hint, + ) + reaction_payload["target_id"] = str( + (hint_match or {}).get("xmpp_message_id") or "" + ) + + self.log.debug( + "reaction-bridge xmpp-inbound actor=%s service=%s target_xmpp_id=%s emoji=%s remove=%s via=%s", + sender_username, + recipient_service, + str(reaction_payload.get("target_id") or "") or "-", + str(reaction_payload.get("emoji") or "") or "-", + bool(reaction_payload.get("remove")), + "xmpp:reactions" if parsed_reaction else "greentext", + ) + + bridge = transport.resolve_bridge_from_xmpp( + user_id=identifier.user_id, + person_id=identifier.person_id, + service=recipient_service, + xmpp_message_id=str(reaction_payload.get("target_id") or ""), + ) + if not bridge: + bridge = await history.resolve_bridge_ref( + user=identifier.user, + identifier=identifier, + source_service=recipient_service, + xmpp_message_id=str(reaction_payload.get("target_id") or ""), + ) + if not bridge: + self.log.warning( + "reaction-bridge xmpp-resolve-miss actor=%s service=%s target_xmpp_id=%s", + sender_username, + recipient_service, + str(reaction_payload.get("target_id") or "") or "-", + ) + sym("Could not find upstream message for this reaction.") + return + + sent_ok = await transport.send_reaction( + recipient_service, + identifier.identifier, + emoji=str(reaction_payload.get("emoji") or ""), + target_message_id=str((bridge or {}).get("upstream_message_id") or ""), + target_timestamp=int((bridge or {}).get("upstream_ts") or 0), + target_author=str((bridge or {}).get("upstream_author") or ""), + remove=bool(reaction_payload.get("remove")), + ) + if not sent_ok: + self.log.warning( + "reaction-bridge upstream-send-failed actor=%s service=%s recipient=%s target_upstream_id=%s target_upstream_ts=%s", + sender_username, + recipient_service, + identifier.identifier, + str((bridge or {}).get("upstream_message_id") or "") or "-", + int((bridge or {}).get("upstream_ts") or 0), + ) + sym("Upstream protocol did not accept this reaction.") + return + + await history.apply_reaction( + user=identifier.user, + identifier=identifier, + target_message_id=str((bridge or {}).get("local_message_id") or ""), + target_ts=int((bridge or {}).get("upstream_ts") or 0), + emoji=str(reaction_payload.get("emoji") or ""), + source_service="xmpp", + actor=sender_username, + remove=bool(reaction_payload.get("remove")), + payload={ + "target_xmpp_id": str(reaction_payload.get("target_id") or ""), + "xmpp_message_id": xmpp_message_id, + }, + ) + self.log.debug( + "reaction-bridge xmpp-apply-ok actor=%s service=%s local_message_id=%s", + sender_username, + recipient_service, + str((bridge or {}).get("local_message_id") or "") or "-", + ) + return + # tss = await identifier.send(body, attachments=attachments) # AM FIXING https://git.zm.is/XF/GIA/issues/5 session, _ = await sync_to_async(ChatSession.objects.get_or_create)( @@ -1028,7 +1234,7 @@ class XMPPComponent(ComponentXMPP): user=identifier.user, ) self.log.debug("Storing outbound XMPP message in history") - await history.store_message( + local_message = await history.store_message( session=session, sender="XMPP", text=body, @@ -1051,8 +1257,14 @@ class XMPPComponent(ComponentXMPP): payload={"reason": "message_sent"}, ) await identifier.send( - body, + relay_body, attachments, + metadata={ + "xmpp_source_id": xmpp_message_id, + "xmpp_source_ts": int(now().timestamp() * 1000), + "xmpp_body": relay_body, + "legacy_message_id": str(local_message.id), + }, ) self.log.debug("Message sent unaltered") return @@ -1061,7 +1273,7 @@ class XMPPComponent(ComponentXMPP): chat_history = await history.get_chat_history(session) await utils.update_last_interaction(session) prompt = replies.generate_mutate_reply_prompt( - body, + relay_body, identifier.person, manip, chat_history, @@ -1082,6 +1294,12 @@ class XMPPComponent(ComponentXMPP): await identifier.send( result, attachments, + metadata={ + "xmpp_source_id": xmpp_message_id, + "xmpp_source_ts": int(now().timestamp() * 1000), + "xmpp_body": result, + "legacy_message_id": str(local_message.id), + }, ) self.log.debug("Message sent with modifications") @@ -1123,10 +1341,13 @@ class XMPPComponent(ComponentXMPP): ) # Send XMPP message immediately after successful upload - await self.send_xmpp_message( + xmpp_msg_id = await self.send_xmpp_message( recipient_jid, sender_jid, upload_url, attachment_url=upload_url ) - return upload_url + return { + "url": upload_url, + "xmpp_message_id": xmpp_msg_id, + } except Exception as e: self.log.error(f"Error uploading {att['filename']} to XMPP: {e}") @@ -1137,6 +1358,9 @@ class XMPPComponent(ComponentXMPP): ): """Sends an XMPP message with either text or an attachment URL.""" msg = self.make_message(mto=recipient_jid, mfrom=sender_jid, mtype="chat") + if not msg.get("id"): + msg["id"] = uuid.uuid4().hex + msg_id = str(msg.get("id") or "").strip() msg["body"] = body_text # Body must contain only text or the URL if attachment_url: @@ -1148,6 +1372,127 @@ class XMPPComponent(ComponentXMPP): self.log.debug("Sending XMPP message: %s", msg.xml) msg.send() + return msg_id + + async def send_xmpp_reaction( + self, + recipient_jid, + sender_jid, + *, + target_xmpp_id: str, + emoji: str, + remove: bool = False, + ): + msg = self.make_message(mto=recipient_jid, mfrom=sender_jid, mtype="chat") + if not msg.get("id"): + msg["id"] = uuid.uuid4().hex + msg["body"] = "" + reactions_node = ET.Element( + "{urn:xmpp:reactions:0}reactions", + {"id": str(target_xmpp_id or "").strip()}, + ) + if not remove and str(emoji or "").strip(): + reaction_node = ET.SubElement( + reactions_node, + "{urn:xmpp:reactions:0}reaction", + ) + reaction_node.text = str(emoji) + msg.xml.append(reactions_node) + msg.send() + return str(msg.get("id") or "").strip() + + async def apply_external_reaction( + self, + user, + person_identifier, + *, + source_service, + emoji, + remove, + upstream_message_id="", + upstream_ts=0, + actor="", + payload=None, + ): + self.log.debug( + "reaction-bridge external-in source=%s user=%s person=%s upstream_id=%s upstream_ts=%s emoji=%s remove=%s", + source_service, + user.id, + person_identifier.person_id, + str(upstream_message_id or "") or "-", + int(upstream_ts or 0), + str(emoji or "") or "-", + bool(remove), + ) + bridge = transport.resolve_bridge_from_upstream( + user_id=user.id, + person_id=person_identifier.person_id, + service=source_service, + upstream_message_id=str(upstream_message_id or ""), + upstream_ts=int(upstream_ts or 0), + ) + if not bridge: + bridge = await history.resolve_bridge_ref( + user=user, + identifier=person_identifier, + source_service=source_service, + upstream_message_id=str(upstream_message_id or ""), + upstream_author=str(actor or ""), + upstream_ts=int(upstream_ts or 0), + ) + if not bridge: + self.log.warning( + "reaction-bridge external-resolve-miss source=%s user=%s person=%s upstream_id=%s upstream_ts=%s", + source_service, + user.id, + person_identifier.person_id, + str(upstream_message_id or "") or "-", + int(upstream_ts or 0), + ) + return False + + target_xmpp_id = str((bridge or {}).get("xmpp_message_id") or "").strip() + if not target_xmpp_id: + self.log.warning( + "reaction-bridge external-target-missing source=%s user=%s person=%s", + source_service, + user.id, + person_identifier.person_id, + ) + return False + + sender_jid = ( + f"{person_identifier.person.name.lower()}|" + f"{person_identifier.service}@{settings.XMPP_JID}" + ) + recipient_jid = f"{user.username}@{settings.XMPP_ADDRESS}" + await self.send_xmpp_reaction( + recipient_jid, + sender_jid, + target_xmpp_id=target_xmpp_id, + emoji=str(emoji or ""), + remove=bool(remove), + ) + await history.apply_reaction( + user=user, + identifier=person_identifier, + target_message_id=str((bridge or {}).get("local_message_id") or ""), + target_ts=int((bridge or {}).get("upstream_ts") or 0), + emoji=str(emoji or ""), + source_service=source_service, + actor=str(actor or person_identifier.identifier), + remove=bool(remove), + payload=dict(payload or {}), + ) + self.log.debug( + "reaction-bridge external-apply-ok source=%s user=%s person=%s xmpp_id=%s local_message_id=%s", + source_service, + user.id, + person_identifier.person_id, + target_xmpp_id, + str((bridge or {}).get("local_message_id") or "") or "-", + ) + return True async def send_chat_state(self, recipient_jid, sender_jid, started): """Send XMPP chat-state update to the client.""" @@ -1173,18 +1518,74 @@ class XMPPComponent(ComponentXMPP): await self.send_chat_state(recipient_jid, sender_jid, started) async def send_from_external( - self, user, person_identifier, text, is_outgoing_message, attachments=[] + self, + user, + person_identifier, + text, + is_outgoing_message, + attachments=[], + source_ref=None, ): """Handles sending XMPP messages with text and attachments.""" sender_jid = f"{person_identifier.person.name.lower()}|{person_identifier.service}@{settings.XMPP_JID}" recipient_jid = f"{person_identifier.user.username}@{settings.XMPP_ADDRESS}" if is_outgoing_message: - await self.send_xmpp_message(recipient_jid, sender_jid, f"YOU: {text}") + xmpp_id = await self.send_xmpp_message( + recipient_jid, + sender_jid, + f"YOU: {text}", + ) + transport.record_bridge_mapping( + user_id=user.id, + person_id=person_identifier.person_id, + service=person_identifier.service, + xmpp_message_id=xmpp_id, + xmpp_ts=int(time.time() * 1000), + upstream_message_id=str((source_ref or {}).get("upstream_message_id") or ""), + upstream_author=str((source_ref or {}).get("upstream_author") or ""), + upstream_ts=int((source_ref or {}).get("upstream_ts") or 0), + text_preview=str(text or ""), + local_message_id=str((source_ref or {}).get("legacy_message_id") or ""), + ) + await history.save_bridge_ref( + user=user, + identifier=person_identifier, + source_service=person_identifier.service, + local_message_id=str((source_ref or {}).get("legacy_message_id") or ""), + local_ts=int((source_ref or {}).get("xmpp_source_ts") or int(time.time() * 1000)), + xmpp_message_id=xmpp_id, + upstream_message_id=str((source_ref or {}).get("upstream_message_id") or ""), + upstream_author=str((source_ref or {}).get("upstream_author") or ""), + upstream_ts=int((source_ref or {}).get("upstream_ts") or 0), + ) # Step 1: Send text message separately elif text: - await self.send_xmpp_message(recipient_jid, sender_jid, text) + xmpp_id = await self.send_xmpp_message(recipient_jid, sender_jid, text) + transport.record_bridge_mapping( + user_id=user.id, + person_id=person_identifier.person_id, + service=person_identifier.service, + xmpp_message_id=xmpp_id, + xmpp_ts=int(time.time() * 1000), + upstream_message_id=str((source_ref or {}).get("upstream_message_id") or ""), + upstream_author=str((source_ref or {}).get("upstream_author") or ""), + upstream_ts=int((source_ref or {}).get("upstream_ts") or 0), + text_preview=str(text or ""), + local_message_id=str((source_ref or {}).get("legacy_message_id") or ""), + ) + await history.save_bridge_ref( + user=user, + identifier=person_identifier, + source_service=person_identifier.service, + local_message_id=str((source_ref or {}).get("legacy_message_id") or ""), + local_ts=int((source_ref or {}).get("xmpp_source_ts") or int(time.time() * 1000)), + xmpp_message_id=xmpp_id, + upstream_message_id=str((source_ref or {}).get("upstream_message_id") or ""), + upstream_author=str((source_ref or {}).get("upstream_author") or ""), + upstream_ts=int((source_ref or {}).get("upstream_ts") or 0), + ) if not attachments: return [] # No attachments to process @@ -1193,7 +1594,7 @@ class XMPPComponent(ComponentXMPP): valid_uploads = await self.request_upload_slots(recipient_jid, attachments) self.log.debug("Got upload slots") if not valid_uploads: - self.log.warning("No valid upload slots obtained.") + self.log.debug("No valid upload slots obtained; attachment relay skipped") return [] # Step 3: Upload each file and send its message immediately after upload @@ -1201,8 +1602,33 @@ class XMPPComponent(ComponentXMPP): self.upload_and_send(att, slot, recipient_jid, sender_jid) for att, slot in valid_uploads ] - uploaded_urls = await asyncio.gather(*upload_tasks) # Upload files concurrently - return [url for url in uploaded_urls if url] + uploaded_rows = await asyncio.gather(*upload_tasks) # Upload files concurrently + normalized_rows = [dict(row or {}) for row in uploaded_rows if row] + for row in normalized_rows: + transport.record_bridge_mapping( + user_id=user.id, + person_id=person_identifier.person_id, + service=person_identifier.service, + xmpp_message_id=str(row.get("xmpp_message_id") or "").strip(), + xmpp_ts=int(time.time() * 1000), + upstream_message_id=str((source_ref or {}).get("upstream_message_id") or ""), + upstream_author=str((source_ref or {}).get("upstream_author") or ""), + upstream_ts=int((source_ref or {}).get("upstream_ts") or 0), + text_preview=str(row.get("url") or text or ""), + local_message_id=str((source_ref or {}).get("legacy_message_id") or ""), + ) + await history.save_bridge_ref( + user=user, + identifier=person_identifier, + source_service=person_identifier.service, + local_message_id=str((source_ref or {}).get("legacy_message_id") or ""), + local_ts=int((source_ref or {}).get("xmpp_source_ts") or int(time.time() * 1000)), + xmpp_message_id=str(row.get("xmpp_message_id") or "").strip(), + upstream_message_id=str((source_ref or {}).get("upstream_message_id") or ""), + upstream_author=str((source_ref or {}).get("upstream_author") or ""), + upstream_ts=int((source_ref or {}).get("upstream_ts") or 0), + ) + return [str(row.get("url") or "").strip() for row in normalized_rows if str(row.get("url") or "").strip()] class XMPPClient(ClientBase): diff --git a/core/messaging/history.py b/core/messaging/history.py index 8f4c7f3..b60de0e 100644 --- a/core/messaging/history.py +++ b/core/messaging/history.py @@ -243,3 +243,309 @@ async def apply_read_receipts( await sync_to_async(message.save)(update_fields=dirty) updated += 1 return updated + + +async def apply_reaction( + user, + identifier, + *, + target_message_id="", + target_ts=0, + emoji="", + source_service="", + actor="", + remove=False, + payload=None, +): + log.debug( + "reaction-bridge history-apply start user=%s person_identifier=%s target_message_id=%s target_ts=%s source=%s actor=%s remove=%s emoji=%s", + getattr(user, "id", "-"), + getattr(identifier, "id", "-"), + str(target_message_id or "") or "-", + int(target_ts or 0), + str(source_service or "") or "-", + str(actor or "") or "-", + bool(remove), + str(emoji or "") or "-", + ) + queryset = Message.objects.filter( + user=user, + session__identifier=identifier, + ).select_related("session") + + target = None + target_uuid = str(target_message_id or "").strip() + if target_uuid: + target = await sync_to_async( + lambda: queryset.filter(id=target_uuid).order_by("-ts").first() + )() + + if target is None: + try: + ts_value = int(target_ts or 0) + except Exception: + ts_value = 0 + if ts_value > 0: + lower = ts_value - 10_000 + upper = ts_value + 10_000 + window_rows = await sync_to_async(list)( + queryset.filter(ts__gte=lower, ts__lte=upper).order_by("ts")[:200] + ) + if window_rows: + target = min( + window_rows, + key=lambda row: ( + abs(int(row.ts or 0) - ts_value), + -int(row.ts or 0), + ), + ) + log.debug( + "reaction-bridge history-apply ts-match target_ts=%s picked_message_id=%s picked_ts=%s candidates=%s", + ts_value, + str(target.id), + int(target.ts or 0), + len(window_rows), + ) + + if target is None: + log.warning( + "reaction-bridge history-apply miss user=%s person_identifier=%s target_message_id=%s target_ts=%s", + getattr(user, "id", "-"), + getattr(identifier, "id", "-"), + str(target_message_id or "") or "-", + int(target_ts or 0), + ) + return None + + reactions = list((target.receipt_payload or {}).get("reactions") or []) + reaction_key = ( + str(source_service or "").strip().lower(), + str(actor or "").strip(), + str(emoji or "").strip(), + ) + + merged = [] + replaced = False + for item in reactions: + row = dict(item or {}) + row_key = ( + str(row.get("source_service") or "").strip().lower(), + str(row.get("actor") or "").strip(), + str(row.get("emoji") or "").strip(), + ) + if row_key == reaction_key: + row["removed"] = bool(remove) + row["updated_at"] = int(target_ts or target.ts or 0) + row["payload"] = dict(payload or {}) + merged.append(row) + replaced = True + continue + merged.append(row) + + if not replaced: + merged.append( + { + "emoji": str(emoji or ""), + "source_service": str(source_service or ""), + "actor": str(actor or ""), + "removed": bool(remove), + "updated_at": int(target_ts or target.ts or 0), + "payload": dict(payload or {}), + } + ) + + receipt_payload = dict(target.receipt_payload or {}) + receipt_payload["reactions"] = merged + target.receipt_payload = receipt_payload + await sync_to_async(target.save)(update_fields=["receipt_payload"]) + log.debug( + "reaction-bridge history-apply ok message_id=%s reactions=%s", + str(target.id), + len(merged), + ) + return target + + +def _iter_bridge_refs(receipt_payload, source_service): + payload = dict(receipt_payload or {}) + refs = payload.get("bridge_refs") or {} + rows = refs.get(str(source_service or "").strip().lower()) or [] + return [dict(row or {}) for row in rows if isinstance(row, dict)] + + +def _set_bridge_refs(receipt_payload, source_service, rows): + payload = dict(receipt_payload or {}) + refs = dict(payload.get("bridge_refs") or {}) + refs[str(source_service or "").strip().lower()] = list(rows or []) + payload["bridge_refs"] = refs + return payload + + +async def save_bridge_ref( + user, + identifier, + *, + source_service, + local_message_id="", + local_ts=0, + xmpp_message_id="", + upstream_message_id="", + upstream_author="", + upstream_ts=0, +): + # TODO(edit-sync): persist upstream edit identifiers/version vectors here so + # edit operations can target exact upstream message revisions. + # TODO(delete-sync): persist upstream deletion tombstone metadata here and + # keep bridge refs resolvable even after local message redaction. + source_key = str(source_service or "").strip().lower() + if not source_key: + return None + + queryset = Message.objects.filter( + user=user, + session__identifier=identifier, + ).select_related("session") + + target = None + message_id = str(local_message_id or "").strip() + if message_id: + target = await sync_to_async( + lambda: queryset.filter(id=message_id).order_by("-ts").first() + )() + + if target is None: + try: + ts_value = int(local_ts or 0) + except Exception: + ts_value = 0 + if ts_value > 0: + lower = ts_value - 10_000 + upper = ts_value + 10_000 + rows = await sync_to_async(list)( + queryset.filter(ts__gte=lower, ts__lte=upper).order_by("-ts")[:200] + ) + if rows: + target = min( + rows, + key=lambda row: ( + abs(int(row.ts or 0) - ts_value), + -int(row.ts or 0), + ), + ) + + if target is None: + return None + + row = { + "xmpp_message_id": str(xmpp_message_id or "").strip(), + "upstream_message_id": str(upstream_message_id or "").strip(), + "upstream_author": str(upstream_author or "").strip(), + "upstream_ts": int(upstream_ts or 0), + "updated_at": int(local_ts or target.ts or 0), + } + + existing = _iter_bridge_refs(target.receipt_payload or {}, source_key) + merged = [] + for item in existing: + same_xmpp = row["xmpp_message_id"] and ( + str(item.get("xmpp_message_id") or "").strip() == row["xmpp_message_id"] + ) + same_upstream = row["upstream_message_id"] and ( + str(item.get("upstream_message_id") or "").strip() + == row["upstream_message_id"] + ) + if same_xmpp or same_upstream: + continue + merged.append(item) + merged.append(row) + if len(merged) > 100: + merged = merged[-100:] + + target.receipt_payload = _set_bridge_refs( + target.receipt_payload or {}, + source_key, + merged, + ) + await sync_to_async(target.save)(update_fields=["receipt_payload"]) + return { + "local_message_id": str(target.id), + "local_ts": int(target.ts or 0), + **row, + } + + +async def resolve_bridge_ref( + user, + identifier, + *, + source_service, + xmpp_message_id="", + upstream_message_id="", + upstream_author="", + upstream_ts=0, +): + source_key = str(source_service or "").strip().lower() + if not source_key: + return None + + rows = await sync_to_async(list)( + Message.objects.filter( + user=user, + session__identifier=identifier, + ) + .order_by("-ts") + .only("id", "ts", "receipt_payload")[:500] + ) + + xmpp_id = str(xmpp_message_id or "").strip() + upstream_id = str(upstream_message_id or "").strip() + author = str(upstream_author or "").strip() + try: + target_ts = int(upstream_ts or 0) + except Exception: + target_ts = 0 + + # 1) exact IDs first + for message in rows: + refs = _iter_bridge_refs(message.receipt_payload or {}, source_key) + for ref in refs: + if xmpp_id and str(ref.get("xmpp_message_id") or "").strip() == xmpp_id: + return { + "local_message_id": str(message.id), + "local_ts": int(message.ts or 0), + **dict(ref or {}), + } + if upstream_id and ( + str(ref.get("upstream_message_id") or "").strip() == upstream_id + ): + return { + "local_message_id": str(message.id), + "local_ts": int(message.ts or 0), + **dict(ref or {}), + } + + # 2) timestamp proximity with optional author tie-break + best = None + best_key = None + if target_ts > 0: + for message in rows: + refs = _iter_bridge_refs(message.receipt_payload or {}, source_key) + for ref in refs: + row_ts = int(ref.get("upstream_ts") or 0) + if row_ts <= 0: + continue + gap = abs(row_ts - target_ts) + if gap > 15_000: + continue + row_author = str(ref.get("upstream_author") or "").strip() + author_penalty = 0 if (not author or author == row_author) else 1 + freshness = int(ref.get("updated_at") or 0) + key = (gap, author_penalty, -freshness) + if best is None or key < best_key: + best = { + "local_message_id": str(message.id), + "local_ts": int(message.ts or 0), + **dict(ref or {}), + } + best_key = key + return best diff --git a/core/models.py b/core/models.py index afd9a14..cbfba14 100644 --- a/core/models.py +++ b/core/models.py @@ -157,7 +157,7 @@ class PersonIdentifier(models.Model): def __str__(self): return f"{self.person} ({self.service})" - async def send(self, text, attachments=None): + async def send(self, text, attachments=None, metadata=None): """ Send this contact a text. """ @@ -166,6 +166,7 @@ class PersonIdentifier(models.Model): self.identifier, text=text, attachments=attachments or [], + metadata=dict(metadata or {}), ) diff --git a/core/realtime/compose_ws.py b/core/realtime/compose_ws.py index 440174f..cf53eac 100644 --- a/core/realtime/compose_ws.py +++ b/core/realtime/compose_ws.py @@ -162,6 +162,15 @@ async def compose_ws_application(scope, receive, send): return await send({"type": "websocket.accept"}) + + # TODO(reactions): stream incremental reaction add/remove events over WS + # instead of relying on message row refresh polling windows. + # TODO(edits): add edit event envelopes so compose can update message text + # in place when upstream supports edits. + # TODO(retractions): add retract/delete event envelopes and tombstone UI. + # TODO(capability): surface per-service capability notices (e.g. "edited + # locally but upstream protocol does not support edits"). + last_ts = 0 limit = 100 last_typing_key = "" diff --git a/core/templates/partials/compose-panel.html b/core/templates/partials/compose-panel.html index b7eaeee..273f5ba 100644 --- a/core/templates/partials/compose-panel.html +++ b/core/templates/partials/compose-panel.html @@ -293,6 +293,17 @@ {% else %} {% endif %} + {% if msg.reactions %} +
+ {% for reaction in msg.reactions %} + + {{ reaction.emoji }} + + {% endfor %} +
+ {% endif %}

{{ msg.display_ts }}{% if msg.author %} · {{ msg.author }}{% endif %} {% if msg.read_ts %} @@ -602,6 +613,25 @@ white-space: pre-wrap; word-break: break-word; } + #{{ panel_id }} .compose-reactions { + display: flex; + flex-wrap: wrap; + gap: 0.26rem; + margin: 0 0 0.28rem 0; + } + #{{ panel_id }} .compose-reaction-chip { + display: inline-flex; + align-items: center; + justify-content: center; + min-width: 1.55rem; + height: 1.35rem; + padding: 0 0.38rem; + border-radius: 0.8rem; + border: 1px solid rgba(0, 0, 0, 0.12); + background: rgba(255, 255, 255, 0.7); + font-size: 0.86rem; + line-height: 1; + } #{{ panel_id }} .compose-msg-meta, #{{ panel_id }} .compose-meta-line { color: #616161; diff --git a/core/views/compose.py b/core/views/compose.py index b5e00b2..c96e8d1 100644 --- a/core/views/compose.py +++ b/core/views/compose.py @@ -402,6 +402,28 @@ def _serialize_message(msg: Message) -> dict: receipt_payload = msg.receipt_payload or {} read_source_service = str(msg.read_source_service or "").strip() read_by_identifier = str(msg.read_by_identifier or "").strip() + reaction_rows = [] + seen_reactions = set() + for row in list(receipt_payload.get("reactions") or []): + item = dict(row or {}) + if bool(item.get("removed")): + continue + emoji = str(item.get("emoji") or "").strip() + if not emoji: + continue + actor = str(item.get("actor") or "").strip() + source = str(item.get("source_service") or "").strip().lower() + key = (emoji, actor, source) + if key in seen_reactions: + continue + seen_reactions.add(key) + reaction_rows.append( + { + "emoji": emoji, + "actor": actor, + "source_service": source, + } + ) return { "id": str(msg.id), @@ -427,6 +449,7 @@ def _serialize_message(msg: Message) -> dict: "receipt_payload": receipt_payload, "read_source_service": read_source_service, "read_by_identifier": read_by_identifier, + "reactions": reaction_rows, } diff --git a/docker/uwsgi.ini b/docker/uwsgi.ini index 93980ab..9ca4dda 100644 --- a/docker/uwsgi.ini +++ b/docker/uwsgi.ini @@ -23,7 +23,7 @@ log-level=debug # Autoreload on code changes (graceful reload) py-autoreload=1 -# In the container the repository is mounted at /code, not /code/GIA +# In the container the repository is mounted at /code # point autoreload at the actual in-container paths py-autoreload-on-edit=/code/core py-autoreload-on-edit=/code/app \ No newline at end of file