import time from urllib.parse import urlencode from django.shortcuts import render from django.urls import reverse from django.views import View from mixins.views import ObjectList, ObjectRead from core.clients import transport from core.models import ChatSession, Message, PersonIdentifier from core.util import logs from core.views.compose import _compose_urls, _service_icon_class from core.views.manage.permissions import SuperUserRequiredMixin log = logs.get_logger("whatsapp_view") class WhatsApp(SuperUserRequiredMixin, View): template_name = "pages/signal.html" service = "whatsapp" page_title = "WhatsApp" accounts_url_name = "whatsapp_accounts" def get(self, request): return render( request, self.template_name, { "service": self.service, "service_label": self.page_title, "accounts_url_name": self.accounts_url_name, }, ) def delete(self, request, *args, **kwargs): account = str(request.GET.get("account") or "").strip() or next( ( str(item or "").strip() for item in transport.list_accounts("whatsapp") if str(item or "").strip() ), "", ) if account: transport.unlink_account("whatsapp", account) if not request.htmx: return self.get(request) current_url = str(request.headers.get("HX-Current-URL") or "") list_type = "widget" if "/widget/" in current_url else "page" rows = [] for item in transport.list_accounts("whatsapp"): if isinstance(item, dict): value = ( item.get("number") or item.get("id") or item.get("jid") or item.get("account") ) if value: rows.append(str(value)) elif item: rows.append(str(item)) context = { "service": "whatsapp", "service_label": "WhatsApp", "account_add_url_name": "whatsapp_account_add", "account_unlink_url_name": "whatsapp_account_unlink", "show_contact_actions": True, "contacts_url_name": "whatsapp_contacts", "chats_url_name": "whatsapp_chats", "service_warning": transport.get_service_warning("whatsapp"), "object_list": rows, "list_url": reverse("whatsapp_accounts", kwargs={"type": list_type}), "type": list_type, "context_object_name_singular": "WhatsApp Account", "context_object_name": "WhatsApp Accounts", } return render(request, "partials/signal-accounts.html", context) class WhatsAppAccounts(SuperUserRequiredMixin, ObjectList): list_template = "partials/signal-accounts.html" service = "whatsapp" context_object_name_singular = "WhatsApp Account" context_object_name = "WhatsApp Accounts" list_url_name = "whatsapp_accounts" list_url_args = ["type"] def _normalize_accounts(self, rows): out = [] for item in rows or []: if isinstance(item, dict): value = ( item.get("number") or item.get("id") or item.get("jid") or item.get("account") ) if value: out.append(str(value)) elif item: out.append(str(item)) return out def get_queryset(self, **kwargs): self.extra_context = { "service": "whatsapp", "service_label": "WhatsApp", "account_add_url_name": "whatsapp_account_add", "account_unlink_url_name": "whatsapp_account_unlink", "show_contact_actions": True, "contacts_url_name": "whatsapp_contacts", "chats_url_name": "whatsapp_chats", "service_warning": transport.get_service_warning("whatsapp"), } return self._normalize_accounts(transport.list_accounts("whatsapp")) class WhatsAppAccountUnlink(SuperUserRequiredMixin, View): def delete(self, request, *args, **kwargs): account = str(kwargs.get("account") or "").strip() _ = transport.unlink_account("whatsapp", account) rows = [] for item in transport.list_accounts("whatsapp"): if isinstance(item, dict): value = ( item.get("number") or item.get("id") or item.get("jid") or item.get("account") ) if value: rows.append(str(value)) elif item: rows.append(str(item)) context = { "service": "whatsapp", "service_label": "WhatsApp", "account_add_url_name": "whatsapp_account_add", "account_unlink_url_name": "whatsapp_account_unlink", "show_contact_actions": True, "contacts_url_name": "whatsapp_contacts", "chats_url_name": "whatsapp_chats", "service_warning": transport.get_service_warning("whatsapp"), "object_list": rows, "list_url": reverse("whatsapp_accounts", kwargs={"type": kwargs["type"]}), "type": kwargs["type"], "context_object_name_singular": "WhatsApp Account", "context_object_name": "WhatsApp Accounts", } return render(request, "partials/signal-accounts.html", context) class WhatsAppContactsList(SuperUserRequiredMixin, ObjectList): list_template = "partials/whatsapp-contacts-list.html" context_object_name_singular = "WhatsApp Contact" context_object_name = "WhatsApp Contacts" list_url_name = "whatsapp_contacts" list_url_args = ["type", "pk"] def _linked_identifier(self, identifier: str, jid: str): candidates = [str(identifier or "").strip(), str(jid or "").strip()] if candidates[1] and "@" in candidates[1]: candidates.append(candidates[1].split("@", 1)[0]) for candidate in candidates: if not candidate: continue linked = ( PersonIdentifier.objects.filter( user=self.request.user, service="whatsapp", identifier=candidate, ) .select_related("person") .first() ) if linked: return linked return None def get_queryset(self, *args, **kwargs): state = transport.get_runtime_state("whatsapp") runtime_contacts = state.get("contacts") or [] rows = [] seen = set() for item in runtime_contacts: if not isinstance(item, dict): continue identifier = str( item.get("identifier") or item.get("jid") or item.get("chat") or "" ).strip() if not identifier or identifier in seen: continue seen.add(identifier) jid = str(item.get("jid") or "").strip() linked = self._linked_identifier(identifier, jid) urls = _compose_urls( "whatsapp", identifier, linked.person_id if linked else None, ) rows.append( { "identifier": identifier, "jid": jid, "name": str(item.get("name") or item.get("chat") or ""), "service_icon_class": _service_icon_class("whatsapp"), "person_name": linked.person.name if linked else "", "compose_page_url": urls["page_url"], "compose_widget_url": urls["widget_url"], "match_url": ( f"{reverse('compose_contact_match')}?" f"{urlencode({'service': 'whatsapp', 'identifier': identifier})}" ), } ) # Include already-linked WhatsApp contacts not yet discovered by runtime. linked_rows = ( PersonIdentifier.objects.filter( user=self.request.user, service="whatsapp", ) .select_related("person") .order_by("person__name", "identifier") ) for row in linked_rows: identifier = str(row.identifier or "").strip() if not identifier or identifier in seen: continue seen.add(identifier) urls = _compose_urls("whatsapp", identifier, row.person_id) rows.append( { "identifier": identifier, "jid": "", "name": row.person.name, "service_icon_class": _service_icon_class("whatsapp"), "person_name": row.person.name, "compose_page_url": urls["page_url"], "compose_widget_url": urls["widget_url"], "match_url": ( f"{reverse('compose_contact_match')}?" f"{urlencode({'service': 'whatsapp', 'identifier': identifier})}" ), } ) return rows class WhatsAppChatsList(WhatsAppContactsList): list_template = "partials/whatsapp-chats-list.html" context_object_name_singular = "WhatsApp Chat" context_object_name = "WhatsApp Chats" list_url_name = "whatsapp_chats" def get_queryset(self, *args, **kwargs): rows = [] seen = set() state = transport.get_runtime_state("whatsapp") runtime_contacts = state.get("contacts") or [] runtime_name_map = {} for item in runtime_contacts: if not isinstance(item, dict): continue identifier = str(item.get("identifier") or "").strip() if not identifier: continue runtime_name_map[identifier] = str(item.get("name") or "").strip() sessions = ( ChatSession.objects.filter( user=self.request.user, identifier__service="whatsapp", ) .select_related("identifier", "identifier__person") .order_by("-last_interaction", "-id") ) for session in sessions: identifier = str(session.identifier.identifier or "").strip() if not identifier or identifier in seen: continue seen.add(identifier) latest = ( Message.objects.filter(user=self.request.user, session=session) .order_by("-ts") .first() ) urls = _compose_urls("whatsapp", identifier, session.identifier.person_id) preview = str((latest.text if latest else "") or "").strip() if len(preview) > 80: preview = f"{preview[:77]}..." display_name = ( preview or runtime_name_map.get(identifier) or session.identifier.person.name or "WhatsApp Chat" ) rows.append( { "identifier": identifier, "jid": identifier, "name": display_name, "service_icon_class": _service_icon_class("whatsapp"), "person_name": session.identifier.person.name, "compose_page_url": urls["page_url"], "compose_widget_url": urls["widget_url"], "match_url": ( f"{reverse('compose_contact_match')}?" f"{urlencode({'service': 'whatsapp', 'identifier': identifier})}" ), "last_ts": int(latest.ts or 0) if latest else 0, } ) # Fallback: show synced WhatsApp contacts as chat entries even when no # local message history exists yet. for item in runtime_contacts: if not isinstance(item, dict): continue identifier = str(item.get("identifier") or item.get("jid") or "").strip() if not identifier: continue identifier = identifier.split("@", 1)[0].strip() if not identifier or identifier in seen: continue seen.add(identifier) linked = self._linked_identifier(identifier, str(item.get("jid") or "")) urls = _compose_urls( "whatsapp", identifier, linked.person_id if linked else None, ) rows.append( { "identifier": identifier, "jid": str(item.get("jid") or identifier).strip(), "name": str(item.get("name") or "WhatsApp Chat").strip() or "WhatsApp Chat", "service_icon_class": _service_icon_class("whatsapp"), "person_name": linked.person.name if linked else "", "compose_page_url": urls["page_url"], "compose_widget_url": urls["widget_url"], "match_url": ( f"{reverse('compose_contact_match')}?" f"{urlencode({'service': 'whatsapp', 'identifier': identifier})}" ), "last_ts": 0, } ) if rows: rows.sort(key=lambda row: row.get("last_ts", 0), reverse=True) return rows return super().get_queryset(*args, **kwargs) class WhatsAppAccountAdd(SuperUserRequiredMixin, ObjectRead): detail_template = "partials/whatsapp-account-add.html" service = "whatsapp" extra_context = { "widget_options": 'gs-w="6" gs-h="13" gs-x="0" gs-y="0" gs-min-w="4"', } context_object_name_singular = "Add Account" context_object_name = "Add Account" detail_url_name = "whatsapp_account_add" detail_url_args = ["type", "device"] def _device_name(self) -> str: form_args = self.request.POST.dict() return form_args.get("device", "GIA Device") def _refresh_only(self) -> bool: form_args = self.request.POST.dict() return str(form_args.get("refresh") or "") == "1" def _detail_context(self, kwargs, obj): detail_url_args = { arg: kwargs[arg] for arg in self.detail_url_args if arg in kwargs } return { "object": obj, "detail_url": reverse(self.detail_url_name, kwargs=detail_url_args), } def _debug_state(self): state = transport.get_runtime_state(self.service) now = int(time.time()) def _age(key: str) -> str: try: value = int(state.get(key) or 0) except Exception: value = 0 if value <= 0: return "n/a" return f"{max(0, now - value)}s ago" qr_value = str(state.get("pair_qr") or "") contacts = state.get("contacts") or [] history_imported = int(state.get("history_imported_messages") or 0) sqlite_imported = int(state.get("history_sqlite_imported") or 0) sqlite_scanned = int(state.get("history_sqlite_scanned") or 0) on_demand_requested = bool(state.get("history_on_demand_requested")) on_demand_error = str(state.get("history_on_demand_error") or "").strip() or "-" on_demand_anchor = ( str(state.get("history_on_demand_anchor") or "").strip() or "-" ) history_running = bool(state.get("history_sync_running")) return [ f"connected={bool(state.get('connected'))}", f"runtime_updated={_age('updated_at')}", f"runtime_seen={_age('runtime_seen_at')}", f"pair_requested={_age('pair_requested_at')}", f"qr_received={_age('qr_received_at')}", f"pair_status={state.get('pair_status') or '-'}", f"qr_probe_result={state.get('qr_probe_result') or '-'}", f"qr_handler_supported={state.get('qr_handler_supported')}", f"qr_handler_registered={state.get('qr_handler_registered')}", f"last_event={state.get('last_event') or '-'}", f"last_error={state.get('last_error') or '-'}", f"contacts_source={state.get('contacts_source') or '-'}", f"contacts_count={len(contacts) if isinstance(contacts, list) else 0}", f"contacts_sync_count={state.get('contacts_sync_count') or 0}", f"contacts_synced={_age('contacts_synced_at')}", f"history_sync_running={history_running}", f"history_started={_age('history_sync_started_at')}", f"history_finished={_age('history_sync_finished_at')}", f"history_duration_ms={state.get('history_sync_duration_ms') or 0}", f"history_imported_messages={history_imported}", f"history_sqlite_imported={sqlite_imported}", f"history_sqlite_scanned={sqlite_scanned}", f"history_sqlite_rows={state.get('history_sqlite_rows') or 0}", f"history_sqlite_table={state.get('history_sqlite_table') or '-'}", f"history_sqlite_error={state.get('history_sqlite_error') or '-'}", f"history_sqlite_ts={_age('history_sqlite_ts')}", f"history_on_demand_requested={on_demand_requested}", f"history_on_demand_at={_age('history_on_demand_at')}", f"history_on_demand_anchor={on_demand_anchor}", f"history_on_demand_error={on_demand_error}", f"pair_qr_present={bool(qr_value)}", f"session_db={state.get('session_db') or '-'}", ] def post(self, request, *args, **kwargs): self.request = request if self._refresh_only() and request.htmx: obj = self.get_object(**kwargs) return render( request, self.detail_template, self._detail_context(kwargs, obj), ) return super().get(request, *args, **kwargs) def get_object(self, **kwargs): device_name = self._device_name() if not self._refresh_only(): transport.request_pairing(self.service, device_name) debug_lines = self._debug_state() log.info( "whatsapp add-account runtime debug [%s]: %s", ("refresh" if self._refresh_only() else "request"), " | ".join(debug_lines), ) try: image_bytes = transport.get_link_qr(self.service, device_name) return { "ok": True, "image_b64": transport.image_bytes_to_base64(image_bytes), "warning": transport.get_service_warning(self.service), "debug_lines": debug_lines, } except Exception as exc: error_text = str(exc) log.warning("whatsapp add-account get_link_qr failed: %s", error_text) return { "ok": False, "pending": "pairing qr" in error_text.lower(), "device": device_name, "error": error_text, "warning": transport.get_service_warning(self.service), "debug_lines": debug_lines, }