import time from urllib.parse import urlencode from django.shortcuts import render from django.urls import reverse from django.views import View from mixins.views import ObjectList, ObjectRead from core.clients import transport from core.models import PersonIdentifier from core.util import logs from core.views.compose import _compose_urls, _service_icon_class from core.views.manage.permissions import SuperUserRequiredMixin log = logs.get_logger("whatsapp_view") class WhatsApp(SuperUserRequiredMixin, View): template_name = "pages/signal.html" service = "whatsapp" page_title = "WhatsApp" accounts_url_name = "whatsapp_accounts" def get(self, request): return render( request, self.template_name, { "service": self.service, "service_label": self.page_title, "accounts_url_name": self.accounts_url_name, }, ) def delete(self, request, *args, **kwargs): account = str(request.GET.get("account") or "").strip() or next( ( str(item or "").strip() for item in transport.list_accounts("whatsapp") if str(item or "").strip() ), "", ) if account: transport.unlink_account("whatsapp", account) if not request.htmx: return self.get(request) current_url = str(request.headers.get("HX-Current-URL") or "") list_type = "widget" if "/widget/" in current_url else "page" rows = [] for item in transport.list_accounts("whatsapp"): if isinstance(item, dict): value = ( item.get("number") or item.get("id") or item.get("jid") or item.get("account") ) if value: rows.append(str(value)) elif item: rows.append(str(item)) context = { "service": "whatsapp", "service_label": "WhatsApp", "account_add_url_name": "whatsapp_account_add", "account_unlink_url_name": "whatsapp_account_unlink", "show_contact_actions": True, "contacts_url_name": "whatsapp_contacts", "chats_url_name": "whatsapp_chats", "service_warning": transport.get_service_warning("whatsapp"), "object_list": rows, "list_url": reverse("whatsapp_accounts", kwargs={"type": list_type}), "type": list_type, "context_object_name_singular": "WhatsApp Account", "context_object_name": "WhatsApp Accounts", } return render(request, "partials/signal-accounts.html", context) class WhatsAppAccounts(SuperUserRequiredMixin, ObjectList): list_template = "partials/signal-accounts.html" service = "whatsapp" context_object_name_singular = "WhatsApp Account" context_object_name = "WhatsApp Accounts" list_url_name = "whatsapp_accounts" list_url_args = ["type"] def _normalize_accounts(self, rows): out = [] for item in rows or []: if isinstance(item, dict): value = ( item.get("number") or item.get("id") or item.get("jid") or item.get("account") ) if value: out.append(str(value)) elif item: out.append(str(item)) return out def get_queryset(self, **kwargs): self.extra_context = { "service": "whatsapp", "service_label": "WhatsApp", "account_add_url_name": "whatsapp_account_add", "account_unlink_url_name": "whatsapp_account_unlink", "show_contact_actions": True, "contacts_url_name": "whatsapp_contacts", "chats_url_name": "whatsapp_chats", "service_warning": transport.get_service_warning("whatsapp"), } return self._normalize_accounts(transport.list_accounts("whatsapp")) class WhatsAppAccountUnlink(SuperUserRequiredMixin, View): def delete(self, request, *args, **kwargs): account = str(kwargs.get("account") or "").strip() _ = transport.unlink_account("whatsapp", account) rows = [] for item in transport.list_accounts("whatsapp"): if isinstance(item, dict): value = ( item.get("number") or item.get("id") or item.get("jid") or item.get("account") ) if value: rows.append(str(value)) elif item: rows.append(str(item)) context = { "service": "whatsapp", "service_label": "WhatsApp", "account_add_url_name": "whatsapp_account_add", "account_unlink_url_name": "whatsapp_account_unlink", "show_contact_actions": True, "contacts_url_name": "whatsapp_contacts", "chats_url_name": "whatsapp_chats", "service_warning": transport.get_service_warning("whatsapp"), "object_list": rows, "list_url": reverse("whatsapp_accounts", kwargs={"type": kwargs["type"]}), "type": kwargs["type"], "context_object_name_singular": "WhatsApp Account", "context_object_name": "WhatsApp Accounts", } return render(request, "partials/signal-accounts.html", context) class WhatsAppContactsList(SuperUserRequiredMixin, ObjectList): list_template = "partials/whatsapp-contacts-list.html" context_object_name_singular = "WhatsApp Contact" context_object_name = "WhatsApp Contacts" list_url_name = "whatsapp_contacts" list_url_args = ["type", "pk"] def _linked_identifier(self, identifier: str, jid: str): candidates = [str(identifier or "").strip(), str(jid or "").strip()] if candidates[1] and "@" in candidates[1]: candidates.append(candidates[1].split("@", 1)[0]) for candidate in candidates: if not candidate: continue linked = ( PersonIdentifier.objects.filter( user=self.request.user, service="whatsapp", identifier=candidate, ) .select_related("person") .first() ) if linked: return linked return None def get_queryset(self, *args, **kwargs): state = transport.get_runtime_state("whatsapp") runtime_contacts = state.get("contacts") or [] rows = [] seen = set() for item in runtime_contacts: if not isinstance(item, dict): continue identifier = str( item.get("identifier") or item.get("jid") or item.get("chat") or "" ).strip() if not identifier or identifier in seen: continue seen.add(identifier) jid = str(item.get("jid") or "").strip() linked = self._linked_identifier(identifier, jid) urls = _compose_urls( "whatsapp", identifier, linked.person_id if linked else None, ) rows.append( { "identifier": identifier, "jid": jid, "name": str(item.get("name") or item.get("chat") or ""), "service_icon_class": _service_icon_class("whatsapp"), "person_name": linked.person.name if linked else "", "compose_page_url": urls["page_url"], "compose_widget_url": urls["widget_url"], "match_url": ( f"{reverse('compose_contact_match')}?" f"{urlencode({'service': 'whatsapp', 'identifier': identifier})}" ), } ) # Include already-linked WhatsApp contacts not yet discovered by runtime. linked_rows = ( PersonIdentifier.objects.filter( user=self.request.user, service="whatsapp", ) .select_related("person") .order_by("person__name", "identifier") ) for row in linked_rows: identifier = str(row.identifier or "").strip() if not identifier or identifier in seen: continue seen.add(identifier) urls = _compose_urls("whatsapp", identifier, row.person_id) rows.append( { "identifier": identifier, "jid": "", "name": row.person.name, "service_icon_class": _service_icon_class("whatsapp"), "person_name": row.person.name, "compose_page_url": urls["page_url"], "compose_widget_url": urls["widget_url"], "match_url": ( f"{reverse('compose_contact_match')}?" f"{urlencode({'service': 'whatsapp', 'identifier': identifier})}" ), } ) return rows class WhatsAppChatsList(WhatsAppContactsList): list_template = "partials/whatsapp-chats-list.html" context_object_name_singular = "WhatsApp Chat" context_object_name = "WhatsApp Chats" list_url_name = "whatsapp_chats" def get_queryset(self, *args, **kwargs): rows = [] seen = set() state = transport.get_runtime_state("whatsapp") runtime_contacts = state.get("contacts") or [] runtime_groups = state.get("groups") or [] combined_contacts = [] for item in runtime_contacts + runtime_groups: if isinstance(item, dict): combined_contacts.append(item) contact_index = {} for item in combined_contacts: raw_identifier = str( item.get("identifier") or item.get("jid") or item.get("chat") or "" ).strip() jid = str(item.get("jid") or "").strip() name = str(item.get("name") or item.get("chat") or "").strip() base_id = raw_identifier.split("@", 1)[0].strip() jid_base = jid.split("@", 1)[0].strip() for key in {raw_identifier, base_id, jid, jid_base}: if key: contact_index[key] = {"name": name, "jid": jid} history_anchors = state.get("history_anchors") or {} for key, anchor in (history_anchors.items() if isinstance(history_anchors, dict) else []): identifier = str(key or "").strip() if not identifier: continue identifier = identifier.split("@", 1)[0].strip() or identifier if identifier in seen: continue seen.add(identifier) anchor_jid = str((anchor or {}).get("chat_jid") or "").strip() contact = contact_index.get(identifier) or contact_index.get(anchor_jid) jid = (contact or {}).get("jid") or anchor_jid or identifier linked = self._linked_identifier(identifier, jid) urls = _compose_urls( "whatsapp", identifier, linked.person_id if linked else None, ) name = ( (contact or {}).get("name") or (linked.person.name if linked else "") or jid or identifier or "WhatsApp Chat" ) rows.append( { "identifier": identifier, "jid": jid, "name": name, "service_icon_class": _service_icon_class("whatsapp"), "person_name": linked.person.name if linked else "", "compose_page_url": urls["page_url"], "compose_widget_url": urls["widget_url"], "match_url": ( f"{reverse('compose_contact_match')}?" f"{urlencode({'service': 'whatsapp', 'identifier': identifier})}" ), "last_ts": int((anchor or {}).get("ts") or (anchor or {}).get("updated_at") or 0), } ) if rows: rows.sort(key=lambda row: row.get("last_ts", 0), reverse=True) return rows # Fallback: if no anchors yet, surface the runtime contacts (best effort live state) for item in combined_contacts: identifier = str( item.get("identifier") or item.get("jid") or item.get("chat") or "" ).strip() if not identifier: continue identifier = identifier.split("@", 1)[0].strip() if not identifier or identifier in seen: continue seen.add(identifier) jid = str(item.get("jid") or "").strip() linked = self._linked_identifier(identifier, jid) urls = _compose_urls( "whatsapp", identifier, linked.person_id if linked else None, ) name = ( str(item.get("name") or item.get("chat") or "").strip() or (linked.person.name if linked else "") or jid or identifier or "WhatsApp Chat" ) rows.append( { "identifier": identifier, "jid": jid or identifier, "name": name, "service_icon_class": _service_icon_class("whatsapp"), "person_name": linked.person.name if linked else "", "compose_page_url": urls["page_url"], "compose_widget_url": urls["widget_url"], "match_url": ( f"{reverse('compose_contact_match')}?" f"{urlencode({'service': 'whatsapp', 'identifier': identifier})}" ), "last_ts": 0, } ) return rows class WhatsAppAccountAdd(SuperUserRequiredMixin, ObjectRead): detail_template = "partials/whatsapp-account-add.html" service = "whatsapp" extra_context = { "widget_options": 'gs-w="6" gs-h="13" gs-x="0" gs-y="0" gs-min-w="4"', } context_object_name_singular = "Add Account" context_object_name = "Add Account" detail_url_name = "whatsapp_account_add" detail_url_args = ["type", "device"] def _device_name(self) -> str: form_args = self.request.POST.dict() return form_args.get("device", "GIA Device") def _refresh_only(self) -> bool: form_args = self.request.POST.dict() return str(form_args.get("refresh") or "") == "1" def _detail_context(self, kwargs, obj): detail_url_args = { arg: kwargs[arg] for arg in self.detail_url_args if arg in kwargs } return { "object": obj, "detail_url": reverse(self.detail_url_name, kwargs=detail_url_args), } def _debug_state(self): state = transport.get_runtime_state(self.service) now = int(time.time()) def _age(key: str) -> str: try: value = int(state.get(key) or 0) except Exception: value = 0 if value <= 0: return "n/a" return f"{max(0, now - value)}s ago" qr_value = str(state.get("pair_qr") or "") contacts = state.get("contacts") or [] history_imported = int(state.get("history_imported_messages") or 0) sqlite_imported = int(state.get("history_sqlite_imported") or 0) sqlite_scanned = int(state.get("history_sqlite_scanned") or 0) on_demand_requested = bool(state.get("history_on_demand_requested")) on_demand_error = str(state.get("history_on_demand_error") or "").strip() or "-" on_demand_anchor = ( str(state.get("history_on_demand_anchor") or "").strip() or "-" ) history_running = bool(state.get("history_sync_running")) return [ f"connected={bool(state.get('connected'))}", f"runtime_updated={_age('updated_at')}", f"runtime_seen={_age('runtime_seen_at')}", f"pair_requested={_age('pair_requested_at')}", f"qr_received={_age('qr_received_at')}", f"pair_status={state.get('pair_status') or '-'}", f"qr_probe_result={state.get('qr_probe_result') or '-'}", f"qr_handler_supported={state.get('qr_handler_supported')}", f"qr_handler_registered={state.get('qr_handler_registered')}", f"last_event={state.get('last_event') or '-'}", f"last_error={state.get('last_error') or '-'}", f"contacts_source={state.get('contacts_source') or '-'}", f"contacts_count={len(contacts) if isinstance(contacts, list) else 0}", f"contacts_sync_count={state.get('contacts_sync_count') or 0}", f"contacts_synced={_age('contacts_synced_at')}", f"history_sync_running={history_running}", f"history_started={_age('history_sync_started_at')}", f"history_finished={_age('history_sync_finished_at')}", f"history_duration_ms={state.get('history_sync_duration_ms') or 0}", f"history_imported_messages={history_imported}", f"history_sqlite_imported={sqlite_imported}", f"history_sqlite_scanned={sqlite_scanned}", f"history_sqlite_rows={state.get('history_sqlite_rows') or 0}", f"history_sqlite_table={state.get('history_sqlite_table') or '-'}", f"history_sqlite_error={state.get('history_sqlite_error') or '-'}", f"history_sqlite_ts={_age('history_sqlite_ts')}", f"history_on_demand_requested={on_demand_requested}", f"history_on_demand_at={_age('history_on_demand_at')}", f"history_on_demand_anchor={on_demand_anchor}", f"history_on_demand_error={on_demand_error}", f"pair_qr_present={bool(qr_value)}", f"session_db={state.get('session_db') or '-'}", ] def post(self, request, *args, **kwargs): self.request = request if self._refresh_only() and request.htmx: obj = self.get_object(**kwargs) return render( request, self.detail_template, self._detail_context(kwargs, obj), ) return super().get(request, *args, **kwargs) def get_object(self, **kwargs): device_name = self._device_name() if not self._refresh_only(): transport.request_pairing(self.service, device_name) debug_lines = self._debug_state() log.info( "whatsapp add-account runtime debug [%s]: %s", ("refresh" if self._refresh_only() else "request"), " | ".join(debug_lines), ) try: image_bytes = transport.get_link_qr(self.service, device_name) return { "ok": True, "image_b64": transport.image_bytes_to_base64(image_bytes), "warning": transport.get_service_warning(self.service), "debug_lines": debug_lines, } except Exception as exc: error_text = str(exc) log.warning("whatsapp add-account get_link_qr failed: %s", error_text) return { "ok": False, "pending": "pairing qr" in error_text.lower(), "device": device_name, "error": error_text, "warning": transport.get_service_warning(self.service), "debug_lines": debug_lines, }