Files
GIA/core/views/whatsapp.py

252 lines
9.0 KiB
Python

from django.shortcuts import render
from django.urls import reverse
from urllib.parse import urlencode
from django.views import View
from mixins.views import ObjectList, ObjectRead
from core.clients import transport
from core.models import PersonIdentifier
from core.views.compose import _compose_urls, _service_icon_class
from core.views.manage.permissions import SuperUserRequiredMixin
import time
class WhatsApp(SuperUserRequiredMixin, View):
template_name = "pages/signal.html"
service = "whatsapp"
page_title = "WhatsApp"
accounts_url_name = "whatsapp_accounts"
def get(self, request):
return render(
request,
self.template_name,
{
"service": self.service,
"service_label": self.page_title,
"accounts_url_name": self.accounts_url_name,
},
)
class WhatsAppAccounts(SuperUserRequiredMixin, ObjectList):
list_template = "partials/signal-accounts.html"
service = "whatsapp"
context_object_name_singular = "WhatsApp Account"
context_object_name = "WhatsApp Accounts"
list_url_name = "whatsapp_accounts"
list_url_args = ["type"]
def _normalize_accounts(self, rows):
out = []
for item in rows or []:
if isinstance(item, dict):
value = (
item.get("number")
or item.get("id")
or item.get("jid")
or item.get("account")
)
if value:
out.append(str(value))
elif item:
out.append(str(item))
return out
def get_queryset(self, **kwargs):
self.extra_context = {
"service": "whatsapp",
"service_label": "WhatsApp",
"account_add_url_name": "whatsapp_account_add",
"show_contact_actions": True,
"contacts_url_name": "whatsapp_contacts",
"chats_url_name": "whatsapp_chats",
"service_warning": transport.get_service_warning("whatsapp"),
}
return self._normalize_accounts(transport.list_accounts("whatsapp"))
class WhatsAppContactsList(SuperUserRequiredMixin, ObjectList):
list_template = "partials/whatsapp-contacts-list.html"
context_object_name_singular = "WhatsApp Contact"
context_object_name = "WhatsApp Contacts"
list_url_name = "whatsapp_contacts"
list_url_args = ["type", "pk"]
def get_queryset(self, *args, **kwargs):
state = transport.get_runtime_state("whatsapp")
runtime_contacts = state.get("contacts") or []
rows = []
seen = set()
for item in runtime_contacts:
if not isinstance(item, dict):
continue
identifier = str(
item.get("identifier") or item.get("jid") or item.get("chat") or ""
).strip()
if not identifier or identifier in seen:
continue
seen.add(identifier)
linked = (
PersonIdentifier.objects.filter(
user=self.request.user,
service="whatsapp",
identifier=identifier,
)
.select_related("person")
.first()
)
urls = _compose_urls(
"whatsapp",
identifier,
linked.person_id if linked else None,
)
rows.append(
{
"identifier": identifier,
"jid": str(item.get("jid") or ""),
"name": str(item.get("name") or item.get("chat") or ""),
"service_icon_class": _service_icon_class("whatsapp"),
"person_name": linked.person.name if linked else "",
"compose_page_url": urls["page_url"],
"compose_widget_url": urls["widget_url"],
"match_url": (
f"{reverse('compose_contact_match')}?"
f"{urlencode({'service': 'whatsapp', 'identifier': identifier})}"
),
}
)
# Include already-linked WhatsApp contacts not yet discovered by runtime.
linked_rows = (
PersonIdentifier.objects.filter(
user=self.request.user,
service="whatsapp",
)
.select_related("person")
.order_by("person__name", "identifier")
)
for row in linked_rows:
identifier = str(row.identifier or "").strip()
if not identifier or identifier in seen:
continue
seen.add(identifier)
urls = _compose_urls("whatsapp", identifier, row.person_id)
rows.append(
{
"identifier": identifier,
"jid": "",
"name": row.person.name,
"service_icon_class": _service_icon_class("whatsapp"),
"person_name": row.person.name,
"compose_page_url": urls["page_url"],
"compose_widget_url": urls["widget_url"],
"match_url": (
f"{reverse('compose_contact_match')}?"
f"{urlencode({'service': 'whatsapp', 'identifier': identifier})}"
),
}
)
return rows
class WhatsAppChatsList(WhatsAppContactsList):
list_template = "partials/whatsapp-chats-list.html"
context_object_name_singular = "WhatsApp Chat"
context_object_name = "WhatsApp Chats"
list_url_name = "whatsapp_chats"
class WhatsAppAccountAdd(SuperUserRequiredMixin, ObjectRead):
detail_template = "partials/whatsapp-account-add.html"
service = "whatsapp"
context_object_name_singular = "Add Account"
context_object_name = "Add Account"
detail_url_name = "whatsapp_account_add"
detail_url_args = ["type", "device"]
def _device_name(self) -> str:
form_args = self.request.POST.dict()
return form_args.get("device", "GIA Device")
def _refresh_only(self) -> bool:
form_args = self.request.POST.dict()
return str(form_args.get("refresh") or "") == "1"
def _detail_context(self, kwargs, obj):
detail_url_args = {
arg: kwargs[arg]
for arg in self.detail_url_args
if arg in kwargs
}
return {
"object": obj,
"detail_url": reverse(self.detail_url_name, kwargs=detail_url_args),
}
def _debug_state(self):
state = transport.get_runtime_state(self.service)
now = int(time.time())
def _age(key: str) -> str:
try:
value = int(state.get(key) or 0)
except Exception:
value = 0
if value <= 0:
return "n/a"
return f"{max(0, now - value)}s ago"
qr_value = str(state.get("pair_qr") or "")
return [
f"connected={bool(state.get('connected'))}",
f"runtime_seen={_age('runtime_seen_at')}",
f"pair_requested={_age('pair_requested_at')}",
f"qr_received={_age('qr_received_at')}",
f"pair_status={state.get('pair_status') or '-'}",
f"qr_probe_result={state.get('qr_probe_result') or '-'}",
f"qr_handler_supported={state.get('qr_handler_supported')}",
f"qr_handler_registered={state.get('qr_handler_registered')}",
f"last_event={state.get('last_event') or '-'}",
f"last_error={state.get('last_error') or '-'}",
f"pair_qr_present={bool(qr_value)}",
f"session_db={state.get('session_db') or '-'}",
]
def post(self, request, *args, **kwargs):
self.request = request
if self._refresh_only() and request.htmx:
obj = self.get_object(**kwargs)
return render(
request,
self.detail_template,
self._detail_context(kwargs, obj),
)
return super().get(request, *args, **kwargs)
def get_object(self, **kwargs):
device_name = self._device_name()
if not self._refresh_only():
transport.request_pairing(self.service, device_name)
try:
image_bytes = transport.get_link_qr(self.service, device_name)
return {
"ok": True,
"image_b64": transport.image_bytes_to_base64(image_bytes),
"warning": transport.get_service_warning(self.service),
"debug_lines": self._debug_state(),
}
except Exception as exc:
error_text = str(exc)
return {
"ok": False,
"pending": "pairing qr" in error_text.lower(),
"device": device_name,
"error": error_text,
"warning": transport.get_service_warning(self.service),
"debug_lines": self._debug_state(),
}