From 3d32834ccf0938ef3b4f63a0cfacd84f15c8dd53 Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Mon, 16 Feb 2026 00:39:16 +0000 Subject: [PATCH] Continue implementing WhatsApp --- app/urls.py | 5 + core/clients/transport.py | 107 ++++++ core/clients/whatsapp.py | 346 +++++++++++++++++- core/templates/mixins/wm/widget.html | 54 --- .../pages/compose-contact-match.html | 31 +- .../compose-workspace-contacts-widget.html | 8 +- .../partials/nav-contacts-dropdown.html | 3 +- core/templates/partials/signal-accounts.html | 32 +- .../partials/whatsapp-account-add.html | 20 +- core/views/compose.py | 114 +++++- core/views/whatsapp.py | 191 +++++++++- docker-compose.yml | 5 + 12 files changed, 796 insertions(+), 120 deletions(-) diff --git a/app/urls.py b/app/urls.py index 9e551d1..5ed01e5 100644 --- a/app/urls.py +++ b/app/urls.py @@ -119,6 +119,11 @@ urlpatterns = [ whatsapp.WhatsAppAccountAdd.as_view(), name="whatsapp_account_add", ), + path( + "services/whatsapp//unlink//", + whatsapp.WhatsAppAccountUnlink.as_view(), + name="whatsapp_account_unlink", + ), path( "services/instagram//add/", instagram.InstagramAccountAdd.as_view(), diff --git a/core/clients/transport.py b/core/clients/transport.py index 4e058bf..1071814 100644 --- a/core/clients/transport.py +++ b/core/clients/transport.py @@ -3,6 +3,7 @@ import base64 import io import secrets import time +from urllib.parse import quote_plus from typing import Any import aiohttp @@ -99,11 +100,94 @@ def list_accounts(service: str): state = get_runtime_state(service_key) accounts = state.get("accounts") or [] + if service_key == "whatsapp" and not accounts: + contacts = state.get("contacts") or [] + recovered = [] + seen = set() + for row in contacts: + if not isinstance(row, dict): + continue + candidate = str(row.get("identifier") or row.get("jid") or "").strip() + if not candidate or candidate in seen: + continue + seen.add(candidate) + recovered.append(candidate) + if recovered: + accounts = recovered + update_runtime_state(service_key, accounts=recovered) if isinstance(accounts, list): return accounts return [] +def _account_key(value: str) -> str: + raw = str(value or "").strip().lower() + if "@" in raw: + raw = raw.split("@", 1)[0] + return raw + + +def unlink_account(service: str, account: str) -> bool: + service_key = _service_key(service) + account_value = str(account or "").strip() + if not account_value: + return False + + if service_key == "signal": + import requests + + base = str(getattr(settings, "SIGNAL_HTTP_URL", "http://signal:8080")).rstrip("/") + target = quote_plus(account_value) + for path in (f"/v1/accounts/{target}", f"/v1/account/{target}"): + try: + response = requests.delete(f"{base}{path}", timeout=20) + if response.ok: + return True + except Exception: + continue + return False + + if service_key in {"whatsapp", "instagram"}: + state = get_runtime_state(service_key) + key = _account_key(account_value) + + raw_accounts = state.get("accounts") or [] + accounts = [] + for row in raw_accounts: + value = str(row or "").strip() + if not value: + continue + if _account_key(value) == key: + continue + accounts.append(value) + + raw_contacts = state.get("contacts") or [] + contacts = [] + for row in raw_contacts: + if not isinstance(row, dict): + continue + identifier = str(row.get("identifier") or "").strip() + jid = str(row.get("jid") or "").strip() + if _account_key(identifier) == key or _account_key(jid) == key: + continue + contacts.append(row) + + update_runtime_state( + service_key, + accounts=accounts, + contacts=contacts, + connected=bool(accounts), + pair_status=("connected" if accounts else ""), + pair_qr="", + warning=("" if accounts else "Account unlinked. Add account to link again."), + last_event="account_unlinked", + last_error="", + ) + return True + + return False + + def get_service_warning(service: str) -> str: service_key = _service_key(service) if service_key == "signal": @@ -131,6 +215,18 @@ def request_pairing(service: str, device_name: str = ""): service_key = _service_key(service) if service_key not in {"whatsapp", "instagram"}: return + state = get_runtime_state(service_key) + existing_accounts = state.get("accounts") or [] + is_connected = bool(state.get("connected")) + pair_status = str(state.get("pair_status") or "").strip().lower() + if existing_accounts and (is_connected or pair_status == "connected"): + update_runtime_state( + service_key, + warning="Account already linked.", + pair_status="connected", + pair_qr="", + ) + return device = str(device_name or "GIA Device").strip() or "GIA Device" update_runtime_state( service_key, @@ -454,6 +550,17 @@ def get_link_qr(service: str, device_name: str): return cached if service_key == "whatsapp": + state = get_runtime_state(service_key) + existing_accounts = state.get("accounts") or [] + pair_status = str(state.get("pair_status") or "").strip().lower() + if existing_accounts and ( + bool(state.get("connected")) or pair_status == "connected" + ): + raise RuntimeError( + "WhatsApp account already linked in this runtime. " + "Only one active linked device is supported. " + "Unlink the current account first, then add a new one." + ) raise RuntimeError( "Neonize has not provided a pairing QR yet. " "Ensure UR is running with WHATSAPP_ENABLED=true and retry." diff --git a/core/clients/whatsapp.py b/core/clients/whatsapp.py index 9e35cb4..1d6eb28 100644 --- a/core/clients/whatsapp.py +++ b/core/clients/whatsapp.py @@ -1,6 +1,7 @@ import asyncio import os import re +import sqlite3 import time from urllib.parse import quote_plus @@ -51,9 +52,21 @@ class WhatsAppClient(ClientBase): getattr(settings, "WHATSAPP_DATABASE_URL", "") ).strip() safe_name = re.sub(r"[^a-zA-Z0-9_.-]+", "_", self.client_name) or "gia_whatsapp" - self.session_db = self.database_url or f"/tmp/{safe_name}.db" + # Use a persistent default path (under project mount) instead of /tmp so + # link state and contact cache survive container restarts. + default_db_dir = str( + getattr(settings, "WHATSAPP_DB_DIR", "/var/tmp/whatsapp") + ).strip() + self.session_db = self.database_url or os.path.join( + default_db_dir, + f"{safe_name}_neonize_v2.db", + ) transport.register_runtime_client(self.service, self) + prior_state = transport.get_runtime_state(self.service) + prior_accounts = prior_state.get("accounts") + if not isinstance(prior_accounts, list): + prior_accounts = [] self._publish_state( connected=False, warning=( @@ -61,7 +74,7 @@ class WhatsAppClient(ClientBase): if not self.enabled else "" ), - accounts=[], + accounts=prior_accounts, last_event="init", session_db=self.session_db, ) @@ -171,11 +184,15 @@ class WhatsAppClient(ClientBase): # Keep task alive so state/callbacks remain active. next_heartbeat_at = 0.0 + next_contacts_sync_at = 0.0 while not self._stopping: now = time.time() if now >= next_heartbeat_at: self._publish_state(runtime_seen_at=int(now)) next_heartbeat_at = now + 5.0 + if now >= next_contacts_sync_at: + await self._sync_contacts_from_client() + next_contacts_sync_at = now + 30.0 self._mark_qr_wait_timeout() await self._sync_pair_request() await self._probe_pending_qr(now) @@ -279,6 +296,9 @@ class WhatsAppClient(ClientBase): last_error=str(exc), ) + if self._connected: + await self._sync_contacts_from_client() + # Neonize does not always emit QR callbacks after reconnect. Try explicit # QR-link fetch when available to surface pair data to the UI. try: @@ -427,6 +447,8 @@ class WhatsAppClient(ClientBase): presence_ev = getattr(wa_events, "PresenceEv", None) pair_ev = getattr(wa_events, "PairStatusEv", None) qr_ev = getattr(wa_events, "QREv", None) + history_sync_ev = getattr(wa_events, "HistorySyncEv", None) + offline_sync_completed_ev = getattr(wa_events, "OfflineSyncCompletedEv", None) self._register_qr_handler() support = { @@ -435,6 +457,8 @@ class WhatsAppClient(ClientBase): "qr_ev": bool(qr_ev), "message_ev": bool(message_ev), "receipt_ev": bool(receipt_ev), + "history_sync_ev": bool(history_sync_ev), + "offline_sync_completed_ev": bool(offline_sync_completed_ev), } self._publish_state( event_hook_callable=bool(getattr(self._client, "event", None)), @@ -447,12 +471,6 @@ class WhatsAppClient(ClientBase): async def on_connected(client, event: connected_ev): self._connected = True account = await self._resolve_account_identifier() - if account: - self._remember_contact( - account, - jid=account, - name="Linked Account", - ) self._publish_state( connected=True, warning="", @@ -463,6 +481,7 @@ class WhatsAppClient(ClientBase): connected_at=int(time.time()), last_error="", ) + await self._sync_contacts_from_client() self._register_event(connected_ev, on_connected) @@ -513,12 +532,6 @@ class WhatsAppClient(ClientBase): status_text = str(status_raw or "").strip().lower() if status_text in {"2", "success"}: account = await self._resolve_account_identifier() - if account: - self._remember_contact( - account, - jid=account, - name="Linked Account", - ) self._connected = True self._publish_state( connected=True, @@ -530,6 +543,7 @@ class WhatsAppClient(ClientBase): connected_at=int(time.time()), last_error="", ) + await self._sync_contacts_from_client() elif status_text in {"1", "error"}: error_text = str(self._pluck(event, "Error") or "").strip() self._publish_state( @@ -544,6 +558,11 @@ class WhatsAppClient(ClientBase): if qr_ev is not None: async def on_qr_event(client, event: qr_ev): + # Once connected, ignore late/stale QR emissions so runtime state + # does not regress from connected -> qr_ready. + state = transport.get_runtime_state(self.service) + if self._connected or bool(state.get("connected")): + return qr_payload = self._extract_pair_qr(event) if not qr_payload: return @@ -561,6 +580,21 @@ class WhatsAppClient(ClientBase): self._register_event(qr_ev, on_qr_event) + if history_sync_ev is not None: + + async def on_history_sync(client, event: history_sync_ev): + await self._handle_history_sync_event(event) + + self._register_event(history_sync_ev, on_history_sync) + + if offline_sync_completed_ev is not None: + + async def on_offline_sync_completed(client, event: offline_sync_completed_ev): + self._publish_state(last_event="offline_sync_completed") + await self._sync_contacts_from_client() + + self._register_event(offline_sync_completed_ev, on_offline_sync_completed) + def _mark_qr_wait_timeout(self): state = transport.get_runtime_state(self.service) if str(state.get("pair_status") or "").strip().lower() != "pending": @@ -613,6 +647,15 @@ class WhatsAppClient(ClientBase): value = self._pluck(me, *path) if value: return str(value) + if hasattr(self._client, "get_all_devices"): + try: + devices = await self._maybe_await(self._client.get_all_devices()) + if devices: + jid = self._jid_to_identifier(self._pluck(devices[0], "JID")) + if jid: + return jid + except Exception: + pass return self.client_name def _pluck(self, obj, *path): @@ -657,8 +700,283 @@ class WhatsAppClient(ClientBase): out.add(f"+{digits}") return out + async def _sync_contacts_from_client(self): + if self._client is None: + return + connected_now = await self._is_contact_sync_ready() + if not connected_now: + self._publish_state( + last_event="contacts_sync_skipped_disconnected", + contacts_source="disconnected", + ) + return + # NOTE: Neonize get_all_contacts has crashed some runtime builds with a Go panic. + # Read contact-like rows directly from the session sqlite DB instead. + contacts, source = await self._sync_contacts_from_sqlite() + if not contacts: + self.log.info("whatsapp contacts sync empty (%s)", source or "unknown") + self._publish_state( + last_event="contacts_sync_empty", + contacts_source=source or "unknown", + ) + return + self.log.info( + "whatsapp contacts synced: count=%s source=%s", + len(contacts), + source or "unknown", + ) + self._publish_state( + contacts=contacts, + contacts_synced_at=int(time.time()), + contacts_sync_count=len(contacts), + last_event="contacts_synced", + contacts_source=source or "unknown", + last_error="", + ) + + async def _sync_contacts_from_sqlite(self): + def _extract(): + if not self.session_db or not os.path.exists(self.session_db): + return [], "sqlite_missing" + try: + conn = sqlite3.connect(self.session_db) + conn.row_factory = sqlite3.Row + except Exception: + return [], "sqlite_open_failed" + try: + cur = conn.cursor() + table_rows = cur.execute( + "SELECT name FROM sqlite_master WHERE type='table'" + ).fetchall() + table_names = [str(row[0]) for row in table_rows if row and row[0]] + account_keys = { + str(value or "").strip().split("@", 1)[0].lower() + for value in ( + transport.get_runtime_state(self.service).get("accounts") or [] + ) + if str(value or "").strip() + } + seen = set() + out = [] + for table in table_names: + try: + columns = [ + str(row[1] or "") + for row in cur.execute( + f'PRAGMA table_info("{table}")' + ).fetchall() + ] + except Exception: + continue + if not columns: + continue + jid_cols = [ + col + for col in columns + if any( + token in col.lower() + for token in ( + "jid", + "phone", + "chat", + "sender", + "recipient", + "participant", + "from", + "to", + "user", + ) + ) + ] + name_cols = [ + col + for col in columns + if "name" in col.lower() or "push" in col.lower() + ] + if not jid_cols: + continue + select_cols = list(dict.fromkeys(jid_cols + name_cols))[:6] + quoted = ", ".join(f'"{col}"' for col in select_cols) + try: + rows = cur.execute( + f'SELECT {quoted} FROM "{table}" LIMIT 1000' + ).fetchall() + except Exception: + continue + for row in rows: + row_map = {col: row[idx] for idx, col in enumerate(select_cols)} + jid_value = "" + for col in jid_cols: + raw = str(row_map.get(col) or "").strip() + if "@s.whatsapp.net" in raw: + m = re.search(r"([0-9]{6,20})@s\.whatsapp\.net", raw) + jid_value = ( + f"{m.group(1)}@s.whatsapp.net" + if m + else raw.split()[0] + ) + break + if raw.endswith("@lid"): + jid_value = raw + break + digits = re.sub(r"[^0-9]", "", raw) + if digits: + jid_value = f"{digits}@s.whatsapp.net" + break + if not jid_value: + continue + identifier = jid_value.split("@", 1)[0].strip() + if not identifier: + continue + if identifier.lower() in account_keys: + continue + if identifier in seen: + continue + seen.add(identifier) + display_name = "" + for col in name_cols: + candidate = str(row_map.get(col) or "").strip() + if candidate and candidate not in {"~", "-", "_"}: + display_name = candidate + break + out.append( + { + "identifier": identifier, + "jid": jid_value, + "name": display_name, + "chat": "", + "seen_at": int(time.time()), + } + ) + if len(out) >= 500: + return out, "sqlite_tables" + return out, "sqlite_tables" + finally: + conn.close() + + return await asyncio.to_thread(_extract) + + async def _is_contact_sync_ready(self) -> bool: + if self._client is None: + return False + if self._connected: + return True + state = transport.get_runtime_state(self.service) + if bool(state.get("connected")): + return True + pair_status = str(state.get("pair_status") or "").strip().lower() + if pair_status == "connected": + return True + check_connected = getattr(self._client, "is_connected", None) + if check_connected is None: + return False + try: + value = ( + await self._maybe_await(check_connected()) + if callable(check_connected) + else await self._maybe_await(check_connected) + ) + except Exception: + return False + if value: + self._connected = True + self._publish_state(connected=True, warning="", pair_status="connected") + return True + if hasattr(self._client, "get_me"): + try: + me = await self._maybe_await(self._client.get_me()) + if me: + self._connected = True + self._publish_state(connected=True, warning="", pair_status="connected") + return True + except Exception: + pass + return False + + async def _handle_history_sync_event(self, event): + data = self._pluck(event, "Data") or self._pluck(event, "data") + if data is None: + return + + pushname_rows = ( + self._pluck(data, "pushnames") + or self._pluck(data, "Pushnames") + or [] + ) + pushname_map = {} + for row in pushname_rows: + raw_id = ( + self._jid_to_identifier(self._pluck(row, "ID")) + or self._jid_to_identifier(self._pluck(row, "id")) + ) + if not raw_id: + continue + pushname = str( + self._pluck(row, "pushname") + or self._pluck(row, "Pushname") + or "" + ).strip() + if not pushname: + continue + pushname_map[raw_id] = pushname + pushname_map[raw_id.split("@", 1)[0]] = pushname + + conversation_rows = ( + self._pluck(data, "conversations") + or self._pluck(data, "Conversations") + or [] + ) + found = 0 + for row in conversation_rows: + jid = "" + for candidate in ( + self._pluck(row, "ID"), + self._pluck(row, "id"), + self._pluck(row, "pnJID"), + self._pluck(row, "pnJid"), + self._pluck(row, "newJID"), + self._pluck(row, "newJid"), + self._pluck(row, "oldJID"), + self._pluck(row, "oldJid"), + ): + parsed = self._jid_to_identifier(candidate) + if parsed: + jid = parsed + break + if not jid or "@g.us" in jid or "@broadcast" in jid: + continue + identifier = jid.split("@", 1)[0].strip() + if not identifier or not re.search(r"[0-9]{6,}", identifier): + continue + name = str( + self._pluck(row, "displayName") + or self._pluck(row, "DisplayName") + or self._pluck(row, "name") + or self._pluck(row, "Name") + or self._pluck(row, "username") + or self._pluck(row, "Username") + or pushname_map.get(jid) + or pushname_map.get(identifier) + or "" + ).strip() + self._remember_contact(identifier, jid=jid, name=name) + found += 1 + + if found: + state = transport.get_runtime_state(self.service) + current_count = int(state.get("contacts_sync_count") or 0) + self._publish_state( + contacts_source="history_sync", + contacts_synced_at=int(time.time()), + contacts_sync_count=max(current_count, found), + last_event="history_sync_contacts", + last_error="", + ) + def _remember_contact(self, identifier, *, jid="", name="", chat=""): cleaned = str(identifier or "").strip() + if "@" in cleaned: + cleaned = cleaned.split("@", 1)[0] if not cleaned: return state = transport.get_runtime_state(self.service) diff --git a/core/templates/mixins/wm/widget.html b/core/templates/mixins/wm/widget.html index 2d089fc..8cd36bc 100644 --- a/core/templates/mixins/wm/widget.html +++ b/core/templates/mixins/wm/widget.html @@ -1,13 +1,8 @@
- -
- - {% block custom_end %} {% endblock %} diff --git a/core/templates/pages/compose-contact-match.html b/core/templates/pages/compose-contact-match.html index 6c0ded7..2ea4072 100644 --- a/core/templates/pages/compose-contact-match.html +++ b/core/templates/pages/compose-contact-match.html @@ -83,9 +83,11 @@ - + + + @@ -93,12 +95,35 @@ {% for row in candidates %} - + + +
ContactPersonDetected Name Service IdentifierSuggested Match Status
{{ row.person_name }}{{ row.linked_person_name|default:"-" }}{{ row.detected_name|default:"-" }} - {{ row.service|title }} {{ row.identifier }} + {% if not row.linked_person and row.suggestions %} +
+ {% for suggestion in row.suggestions %} +
+ {% csrf_token %} + + + + +
+ {% endfor %} +
+ {% else %} + - + {% endif %} +
{% if row.linked_person %} linked diff --git a/core/templates/partials/compose-workspace-contacts-widget.html b/core/templates/partials/compose-workspace-contacts-widget.html index 9812854..d04db71 100644 --- a/core/templates/partials/compose-workspace-contacts-widget.html +++ b/core/templates/partials/compose-workspace-contacts-widget.html @@ -61,11 +61,6 @@ margin: 0; white-space: nowrap; "> - - - - - + {{ item.person_name }} · {{ item.service|title }} {% if not item.linked_person %} · unlinked diff --git a/core/templates/partials/signal-accounts.html b/core/templates/partials/signal-accounts.html index 7c5f2e1..17f2259 100644 --- a/core/templates/partials/signal-accounts.html +++ b/core/templates/partials/signal-accounts.html @@ -1,6 +1,5 @@ -{% load cache %} {% include 'mixins/partials/notify.html' %} -{% cache 600 objects_signal_accounts request.user.id object_list type service %} +
{% if service_warning %}
{{ service_warning }} @@ -8,10 +7,10 @@ {% endif %} @@ -24,12 +23,17 @@
diff --git a/core/templates/partials/whatsapp-account-add.html b/core/templates/partials/whatsapp-account-add.html index bfdca33..af1052a 100644 --- a/core/templates/partials/whatsapp-account-add.html +++ b/core/templates/partials/whatsapp-account-add.html @@ -1,6 +1,15 @@
{{ service_label|default:"Service" }} account