Files
GIA/core/views/whatsapp.py

417 lines
16 KiB
Python

from django.shortcuts import render
from django.urls import reverse
from urllib.parse import urlencode
from django.views import View
from mixins.views import ObjectList, ObjectRead
from core.clients import transport
from core.models import ChatSession, Message, PersonIdentifier
from core.views.compose import _compose_urls, _service_icon_class
from core.views.manage.permissions import SuperUserRequiredMixin
from core.util import logs
import time
log = logs.get_logger("whatsapp_view")
class WhatsApp(SuperUserRequiredMixin, View):
template_name = "pages/signal.html"
service = "whatsapp"
page_title = "WhatsApp"
accounts_url_name = "whatsapp_accounts"
def get(self, request):
return render(
request,
self.template_name,
{
"service": self.service,
"service_label": self.page_title,
"accounts_url_name": self.accounts_url_name,
},
)
def delete(self, request, *args, **kwargs):
account = (
str(request.GET.get("account") or "").strip()
or next(
(
str(item or "").strip()
for item in transport.list_accounts("whatsapp")
if str(item or "").strip()
),
"",
)
)
if account:
transport.unlink_account("whatsapp", account)
if not request.htmx:
return self.get(request)
current_url = str(request.headers.get("HX-Current-URL") or "")
list_type = "widget" if "/widget/" in current_url else "page"
rows = []
for item in transport.list_accounts("whatsapp"):
if isinstance(item, dict):
value = (
item.get("number")
or item.get("id")
or item.get("jid")
or item.get("account")
)
if value:
rows.append(str(value))
elif item:
rows.append(str(item))
context = {
"service": "whatsapp",
"service_label": "WhatsApp",
"account_add_url_name": "whatsapp_account_add",
"account_unlink_url_name": "whatsapp_account_unlink",
"show_contact_actions": True,
"contacts_url_name": "whatsapp_contacts",
"chats_url_name": "whatsapp_chats",
"service_warning": transport.get_service_warning("whatsapp"),
"object_list": rows,
"list_url": reverse("whatsapp_accounts", kwargs={"type": list_type}),
"type": list_type,
"context_object_name_singular": "WhatsApp Account",
"context_object_name": "WhatsApp Accounts",
}
return render(request, "partials/signal-accounts.html", context)
class WhatsAppAccounts(SuperUserRequiredMixin, ObjectList):
list_template = "partials/signal-accounts.html"
service = "whatsapp"
context_object_name_singular = "WhatsApp Account"
context_object_name = "WhatsApp Accounts"
list_url_name = "whatsapp_accounts"
list_url_args = ["type"]
def _normalize_accounts(self, rows):
out = []
for item in rows or []:
if isinstance(item, dict):
value = (
item.get("number")
or item.get("id")
or item.get("jid")
or item.get("account")
)
if value:
out.append(str(value))
elif item:
out.append(str(item))
return out
def get_queryset(self, **kwargs):
self.extra_context = {
"service": "whatsapp",
"service_label": "WhatsApp",
"account_add_url_name": "whatsapp_account_add",
"account_unlink_url_name": "whatsapp_account_unlink",
"show_contact_actions": True,
"contacts_url_name": "whatsapp_contacts",
"chats_url_name": "whatsapp_chats",
"service_warning": transport.get_service_warning("whatsapp"),
}
return self._normalize_accounts(transport.list_accounts("whatsapp"))
class WhatsAppAccountUnlink(SuperUserRequiredMixin, View):
def delete(self, request, *args, **kwargs):
account = str(kwargs.get("account") or "").strip()
_ = transport.unlink_account("whatsapp", account)
rows = []
for item in transport.list_accounts("whatsapp"):
if isinstance(item, dict):
value = (
item.get("number")
or item.get("id")
or item.get("jid")
or item.get("account")
)
if value:
rows.append(str(value))
elif item:
rows.append(str(item))
context = {
"service": "whatsapp",
"service_label": "WhatsApp",
"account_add_url_name": "whatsapp_account_add",
"account_unlink_url_name": "whatsapp_account_unlink",
"show_contact_actions": True,
"contacts_url_name": "whatsapp_contacts",
"chats_url_name": "whatsapp_chats",
"service_warning": transport.get_service_warning("whatsapp"),
"object_list": rows,
"list_url": reverse("whatsapp_accounts", kwargs={"type": kwargs["type"]}),
"type": kwargs["type"],
"context_object_name_singular": "WhatsApp Account",
"context_object_name": "WhatsApp Accounts",
}
return render(request, "partials/signal-accounts.html", context)
class WhatsAppContactsList(SuperUserRequiredMixin, ObjectList):
list_template = "partials/whatsapp-contacts-list.html"
context_object_name_singular = "WhatsApp Contact"
context_object_name = "WhatsApp Contacts"
list_url_name = "whatsapp_contacts"
list_url_args = ["type", "pk"]
def _linked_identifier(self, identifier: str, jid: str):
candidates = [str(identifier or "").strip(), str(jid or "").strip()]
if candidates[1] and "@" in candidates[1]:
candidates.append(candidates[1].split("@", 1)[0])
for candidate in candidates:
if not candidate:
continue
linked = (
PersonIdentifier.objects.filter(
user=self.request.user,
service="whatsapp",
identifier=candidate,
)
.select_related("person")
.first()
)
if linked:
return linked
return None
def get_queryset(self, *args, **kwargs):
state = transport.get_runtime_state("whatsapp")
runtime_contacts = state.get("contacts") or []
rows = []
seen = set()
for item in runtime_contacts:
if not isinstance(item, dict):
continue
identifier = str(
item.get("identifier") or item.get("jid") or item.get("chat") or ""
).strip()
if not identifier or identifier in seen:
continue
seen.add(identifier)
jid = str(item.get("jid") or "").strip()
linked = self._linked_identifier(identifier, jid)
urls = _compose_urls(
"whatsapp",
identifier,
linked.person_id if linked else None,
)
rows.append(
{
"identifier": identifier,
"jid": jid,
"name": str(item.get("name") or item.get("chat") or ""),
"service_icon_class": _service_icon_class("whatsapp"),
"person_name": linked.person.name if linked else "",
"compose_page_url": urls["page_url"],
"compose_widget_url": urls["widget_url"],
"match_url": (
f"{reverse('compose_contact_match')}?"
f"{urlencode({'service': 'whatsapp', 'identifier': identifier})}"
),
}
)
# Include already-linked WhatsApp contacts not yet discovered by runtime.
linked_rows = (
PersonIdentifier.objects.filter(
user=self.request.user,
service="whatsapp",
)
.select_related("person")
.order_by("person__name", "identifier")
)
for row in linked_rows:
identifier = str(row.identifier or "").strip()
if not identifier or identifier in seen:
continue
seen.add(identifier)
urls = _compose_urls("whatsapp", identifier, row.person_id)
rows.append(
{
"identifier": identifier,
"jid": "",
"name": row.person.name,
"service_icon_class": _service_icon_class("whatsapp"),
"person_name": row.person.name,
"compose_page_url": urls["page_url"],
"compose_widget_url": urls["widget_url"],
"match_url": (
f"{reverse('compose_contact_match')}?"
f"{urlencode({'service': 'whatsapp', 'identifier': identifier})}"
),
}
)
return rows
class WhatsAppChatsList(WhatsAppContactsList):
list_template = "partials/whatsapp-chats-list.html"
context_object_name_singular = "WhatsApp Chat"
context_object_name = "WhatsApp Chats"
list_url_name = "whatsapp_chats"
def get_queryset(self, *args, **kwargs):
rows = []
sessions = (
ChatSession.objects.filter(
user=self.request.user,
identifier__service="whatsapp",
)
.select_related("identifier", "identifier__person")
.order_by("-last_interaction", "-id")
)
for session in sessions:
identifier = str(session.identifier.identifier or "").strip()
if not identifier:
continue
latest = (
Message.objects.filter(user=self.request.user, session=session)
.order_by("-ts")
.first()
)
urls = _compose_urls("whatsapp", identifier, session.identifier.person_id)
preview = str((latest.text if latest else "") or "").strip()
if len(preview) > 80:
preview = f"{preview[:77]}..."
rows.append(
{
"identifier": identifier,
"jid": identifier,
"name": (
preview
or session.identifier.person.name
or "WhatsApp Chat"
),
"service_icon_class": _service_icon_class("whatsapp"),
"person_name": session.identifier.person.name,
"compose_page_url": urls["page_url"],
"compose_widget_url": urls["widget_url"],
"match_url": (
f"{reverse('compose_contact_match')}?"
f"{urlencode({'service': 'whatsapp', 'identifier': identifier})}"
),
"last_ts": int(latest.ts or 0) if latest else 0,
}
)
if rows:
rows.sort(key=lambda row: row.get("last_ts", 0), reverse=True)
return rows
return super().get_queryset(*args, **kwargs)
class WhatsAppAccountAdd(SuperUserRequiredMixin, ObjectRead):
detail_template = "partials/whatsapp-account-add.html"
service = "whatsapp"
extra_context = {
"widget_options": 'gs-w="6" gs-h="13" gs-x="0" gs-y="0" gs-min-w="4"',
}
context_object_name_singular = "Add Account"
context_object_name = "Add Account"
detail_url_name = "whatsapp_account_add"
detail_url_args = ["type", "device"]
def _device_name(self) -> str:
form_args = self.request.POST.dict()
return form_args.get("device", "GIA Device")
def _refresh_only(self) -> bool:
form_args = self.request.POST.dict()
return str(form_args.get("refresh") or "") == "1"
def _detail_context(self, kwargs, obj):
detail_url_args = {
arg: kwargs[arg]
for arg in self.detail_url_args
if arg in kwargs
}
return {
"object": obj,
"detail_url": reverse(self.detail_url_name, kwargs=detail_url_args),
}
def _debug_state(self):
state = transport.get_runtime_state(self.service)
now = int(time.time())
def _age(key: str) -> str:
try:
value = int(state.get(key) or 0)
except Exception:
value = 0
if value <= 0:
return "n/a"
return f"{max(0, now - value)}s ago"
qr_value = str(state.get("pair_qr") or "")
contacts = state.get("contacts") or []
return [
f"connected={bool(state.get('connected'))}",
f"runtime_seen={_age('runtime_seen_at')}",
f"pair_requested={_age('pair_requested_at')}",
f"qr_received={_age('qr_received_at')}",
f"pair_status={state.get('pair_status') or '-'}",
f"qr_probe_result={state.get('qr_probe_result') or '-'}",
f"qr_handler_supported={state.get('qr_handler_supported')}",
f"qr_handler_registered={state.get('qr_handler_registered')}",
f"last_event={state.get('last_event') or '-'}",
f"last_error={state.get('last_error') or '-'}",
f"contacts_source={state.get('contacts_source') or '-'}",
f"contacts_count={len(contacts) if isinstance(contacts, list) else 0}",
f"contacts_sync_count={state.get('contacts_sync_count') or 0}",
f"contacts_synced={_age('contacts_synced_at')}",
f"pair_qr_present={bool(qr_value)}",
f"session_db={state.get('session_db') or '-'}",
]
def post(self, request, *args, **kwargs):
self.request = request
if self._refresh_only() and request.htmx:
obj = self.get_object(**kwargs)
return render(
request,
self.detail_template,
self._detail_context(kwargs, obj),
)
return super().get(request, *args, **kwargs)
def get_object(self, **kwargs):
device_name = self._device_name()
if not self._refresh_only():
transport.request_pairing(self.service, device_name)
debug_lines = self._debug_state()
log.info(
"whatsapp add-account runtime debug [%s]: %s",
("refresh" if self._refresh_only() else "request"),
" | ".join(debug_lines),
)
try:
image_bytes = transport.get_link_qr(self.service, device_name)
return {
"ok": True,
"image_b64": transport.image_bytes_to_base64(image_bytes),
"warning": transport.get_service_warning(self.service),
"debug_lines": debug_lines,
}
except Exception as exc:
error_text = str(exc)
log.warning("whatsapp add-account get_link_qr failed: %s", error_text)
return {
"ok": False,
"pending": "pairing qr" in error_text.lower(),
"device": device_name,
"error": error_text,
"warning": transport.get_service_warning(self.service),
"debug_lines": debug_lines,
}