503 lines
20 KiB
Python
503 lines
20 KiB
Python
import time
|
|
from urllib.parse import urlencode
|
|
|
|
from django.shortcuts import render
|
|
from django.urls import reverse
|
|
from django.views import View
|
|
from mixins.views import ObjectList, ObjectRead
|
|
|
|
from core.clients import transport
|
|
from core.models import PersonIdentifier
|
|
from core.util import logs
|
|
from core.views.compose import _compose_urls, _service_icon_class
|
|
from core.views.manage.permissions import SuperUserRequiredMixin
|
|
|
|
log = logs.get_logger("whatsapp_view")
|
|
|
|
|
|
class WhatsApp(SuperUserRequiredMixin, View):
|
|
template_name = "pages/signal.html"
|
|
service = "whatsapp"
|
|
page_title = "WhatsApp"
|
|
accounts_url_name = "whatsapp_accounts"
|
|
|
|
def get(self, request):
|
|
return render(
|
|
request,
|
|
self.template_name,
|
|
{
|
|
"service": self.service,
|
|
"service_label": self.page_title,
|
|
"accounts_url_name": self.accounts_url_name,
|
|
},
|
|
)
|
|
|
|
def delete(self, request, *args, **kwargs):
|
|
account = str(request.GET.get("account") or "").strip() or next(
|
|
(
|
|
str(item or "").strip()
|
|
for item in transport.list_accounts("whatsapp")
|
|
if str(item or "").strip()
|
|
),
|
|
"",
|
|
)
|
|
if account:
|
|
transport.unlink_account("whatsapp", account)
|
|
if not request.htmx:
|
|
return self.get(request)
|
|
current_url = str(request.headers.get("HX-Current-URL") or "")
|
|
list_type = "widget" if "/widget/" in current_url else "page"
|
|
rows = []
|
|
for item in transport.list_accounts("whatsapp"):
|
|
if isinstance(item, dict):
|
|
value = (
|
|
item.get("number")
|
|
or item.get("id")
|
|
or item.get("jid")
|
|
or item.get("account")
|
|
)
|
|
if value:
|
|
rows.append(str(value))
|
|
elif item:
|
|
rows.append(str(item))
|
|
context = {
|
|
"service": "whatsapp",
|
|
"service_label": "WhatsApp",
|
|
"account_add_url_name": "whatsapp_account_add",
|
|
"account_unlink_url_name": "whatsapp_account_unlink",
|
|
"show_contact_actions": True,
|
|
"contacts_url_name": "whatsapp_contacts",
|
|
"chats_url_name": "whatsapp_chats",
|
|
"service_warning": transport.get_service_warning("whatsapp"),
|
|
"object_list": rows,
|
|
"list_url": reverse("whatsapp_accounts", kwargs={"type": list_type}),
|
|
"type": list_type,
|
|
"context_object_name_singular": "WhatsApp Account",
|
|
"context_object_name": "WhatsApp Accounts",
|
|
}
|
|
return render(request, "partials/signal-accounts.html", context)
|
|
|
|
|
|
class WhatsAppAccounts(SuperUserRequiredMixin, ObjectList):
|
|
list_template = "partials/signal-accounts.html"
|
|
service = "whatsapp"
|
|
|
|
context_object_name_singular = "WhatsApp Account"
|
|
context_object_name = "WhatsApp Accounts"
|
|
list_url_name = "whatsapp_accounts"
|
|
list_url_args = ["type"]
|
|
|
|
def _normalize_accounts(self, rows):
|
|
out = []
|
|
for item in rows or []:
|
|
if isinstance(item, dict):
|
|
value = (
|
|
item.get("number")
|
|
or item.get("id")
|
|
or item.get("jid")
|
|
or item.get("account")
|
|
)
|
|
if value:
|
|
out.append(str(value))
|
|
elif item:
|
|
out.append(str(item))
|
|
return out
|
|
|
|
def get_queryset(self, **kwargs):
|
|
self.extra_context = {
|
|
"service": "whatsapp",
|
|
"service_label": "WhatsApp",
|
|
"account_add_url_name": "whatsapp_account_add",
|
|
"account_unlink_url_name": "whatsapp_account_unlink",
|
|
"show_contact_actions": True,
|
|
"contacts_url_name": "whatsapp_contacts",
|
|
"chats_url_name": "whatsapp_chats",
|
|
"service_warning": transport.get_service_warning("whatsapp"),
|
|
}
|
|
return self._normalize_accounts(transport.list_accounts("whatsapp"))
|
|
|
|
|
|
class WhatsAppAccountUnlink(SuperUserRequiredMixin, View):
|
|
def delete(self, request, *args, **kwargs):
|
|
account = str(kwargs.get("account") or "").strip()
|
|
_ = transport.unlink_account("whatsapp", account)
|
|
|
|
rows = []
|
|
for item in transport.list_accounts("whatsapp"):
|
|
if isinstance(item, dict):
|
|
value = (
|
|
item.get("number")
|
|
or item.get("id")
|
|
or item.get("jid")
|
|
or item.get("account")
|
|
)
|
|
if value:
|
|
rows.append(str(value))
|
|
elif item:
|
|
rows.append(str(item))
|
|
|
|
context = {
|
|
"service": "whatsapp",
|
|
"service_label": "WhatsApp",
|
|
"account_add_url_name": "whatsapp_account_add",
|
|
"account_unlink_url_name": "whatsapp_account_unlink",
|
|
"show_contact_actions": True,
|
|
"contacts_url_name": "whatsapp_contacts",
|
|
"chats_url_name": "whatsapp_chats",
|
|
"service_warning": transport.get_service_warning("whatsapp"),
|
|
"object_list": rows,
|
|
"list_url": reverse("whatsapp_accounts", kwargs={"type": kwargs["type"]}),
|
|
"type": kwargs["type"],
|
|
"context_object_name_singular": "WhatsApp Account",
|
|
"context_object_name": "WhatsApp Accounts",
|
|
}
|
|
return render(request, "partials/signal-accounts.html", context)
|
|
|
|
|
|
class WhatsAppContactsList(SuperUserRequiredMixin, ObjectList):
|
|
list_template = "partials/whatsapp-contacts-list.html"
|
|
|
|
context_object_name_singular = "WhatsApp Contact"
|
|
context_object_name = "WhatsApp Contacts"
|
|
|
|
list_url_name = "whatsapp_contacts"
|
|
list_url_args = ["type", "pk"]
|
|
|
|
def _linked_identifier(self, identifier: str, jid: str):
|
|
candidates = [str(identifier or "").strip(), str(jid or "").strip()]
|
|
if candidates[1] and "@" in candidates[1]:
|
|
candidates.append(candidates[1].split("@", 1)[0])
|
|
for candidate in candidates:
|
|
if not candidate:
|
|
continue
|
|
linked = (
|
|
PersonIdentifier.objects.filter(
|
|
user=self.request.user,
|
|
service="whatsapp",
|
|
identifier=candidate,
|
|
)
|
|
.select_related("person")
|
|
.first()
|
|
)
|
|
if linked:
|
|
return linked
|
|
return None
|
|
|
|
def get_queryset(self, *args, **kwargs):
|
|
state = transport.get_runtime_state("whatsapp")
|
|
runtime_contacts = state.get("contacts") or []
|
|
rows = []
|
|
seen = set()
|
|
for item in runtime_contacts:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
identifier = str(
|
|
item.get("identifier") or item.get("jid") or item.get("chat") or ""
|
|
).strip()
|
|
if not identifier or identifier in seen:
|
|
continue
|
|
seen.add(identifier)
|
|
jid = str(item.get("jid") or "").strip()
|
|
linked = self._linked_identifier(identifier, jid)
|
|
urls = _compose_urls(
|
|
"whatsapp",
|
|
identifier,
|
|
linked.person_id if linked else None,
|
|
)
|
|
rows.append(
|
|
{
|
|
"identifier": identifier,
|
|
"jid": jid,
|
|
"name": str(item.get("name") or item.get("chat") or ""),
|
|
"service_icon_class": _service_icon_class("whatsapp"),
|
|
"person_name": linked.person.name if linked else "",
|
|
"compose_page_url": urls["page_url"],
|
|
"compose_widget_url": urls["widget_url"],
|
|
"match_url": (
|
|
f"{reverse('compose_contact_match')}?"
|
|
f"{urlencode({'service': 'whatsapp', 'identifier': identifier})}"
|
|
),
|
|
}
|
|
)
|
|
|
|
# Include already-linked WhatsApp contacts not yet discovered by runtime.
|
|
linked_rows = (
|
|
PersonIdentifier.objects.filter(
|
|
user=self.request.user,
|
|
service="whatsapp",
|
|
)
|
|
.select_related("person")
|
|
.order_by("person__name", "identifier")
|
|
)
|
|
for row in linked_rows:
|
|
identifier = str(row.identifier or "").strip()
|
|
if not identifier or identifier in seen:
|
|
continue
|
|
seen.add(identifier)
|
|
urls = _compose_urls("whatsapp", identifier, row.person_id)
|
|
rows.append(
|
|
{
|
|
"identifier": identifier,
|
|
"jid": "",
|
|
"name": row.person.name,
|
|
"service_icon_class": _service_icon_class("whatsapp"),
|
|
"person_name": row.person.name,
|
|
"compose_page_url": urls["page_url"],
|
|
"compose_widget_url": urls["widget_url"],
|
|
"match_url": (
|
|
f"{reverse('compose_contact_match')}?"
|
|
f"{urlencode({'service': 'whatsapp', 'identifier': identifier})}"
|
|
),
|
|
}
|
|
)
|
|
return rows
|
|
|
|
|
|
class WhatsAppChatsList(WhatsAppContactsList):
|
|
list_template = "partials/whatsapp-chats-list.html"
|
|
context_object_name_singular = "WhatsApp Chat"
|
|
context_object_name = "WhatsApp Chats"
|
|
list_url_name = "whatsapp_chats"
|
|
|
|
def get_queryset(self, *args, **kwargs):
|
|
rows = []
|
|
seen = set()
|
|
state = transport.get_runtime_state("whatsapp")
|
|
|
|
runtime_contacts = state.get("contacts") or []
|
|
runtime_groups = state.get("groups") or []
|
|
combined_contacts = []
|
|
for item in runtime_contacts + runtime_groups:
|
|
if isinstance(item, dict):
|
|
combined_contacts.append(item)
|
|
contact_index = {}
|
|
for item in combined_contacts:
|
|
raw_identifier = str(
|
|
item.get("identifier") or item.get("jid") or item.get("chat") or ""
|
|
).strip()
|
|
jid = str(item.get("jid") or "").strip()
|
|
name = str(item.get("name") or item.get("chat") or "").strip()
|
|
base_id = raw_identifier.split("@", 1)[0].strip()
|
|
jid_base = jid.split("@", 1)[0].strip()
|
|
for key in {raw_identifier, base_id, jid, jid_base}:
|
|
if key:
|
|
contact_index[key] = {"name": name, "jid": jid}
|
|
|
|
history_anchors = state.get("history_anchors") or {}
|
|
for key, anchor in (history_anchors.items() if isinstance(history_anchors, dict) else []):
|
|
identifier = str(key or "").strip()
|
|
if not identifier:
|
|
continue
|
|
identifier = identifier.split("@", 1)[0].strip() or identifier
|
|
if identifier in seen:
|
|
continue
|
|
seen.add(identifier)
|
|
anchor_jid = str((anchor or {}).get("chat_jid") or "").strip()
|
|
contact = contact_index.get(identifier) or contact_index.get(anchor_jid)
|
|
jid = (contact or {}).get("jid") or anchor_jid or identifier
|
|
linked = self._linked_identifier(identifier, jid)
|
|
urls = _compose_urls(
|
|
"whatsapp",
|
|
identifier,
|
|
linked.person_id if linked else None,
|
|
)
|
|
name = (
|
|
(contact or {}).get("name")
|
|
or (linked.person.name if linked else "")
|
|
or jid
|
|
or identifier
|
|
or "WhatsApp Chat"
|
|
)
|
|
rows.append(
|
|
{
|
|
"identifier": identifier,
|
|
"jid": jid,
|
|
"name": name,
|
|
"service_icon_class": _service_icon_class("whatsapp"),
|
|
"person_name": linked.person.name if linked else "",
|
|
"compose_page_url": urls["page_url"],
|
|
"compose_widget_url": urls["widget_url"],
|
|
"match_url": (
|
|
f"{reverse('compose_contact_match')}?"
|
|
f"{urlencode({'service': 'whatsapp', 'identifier': identifier})}"
|
|
),
|
|
"last_ts": int((anchor or {}).get("ts") or (anchor or {}).get("updated_at") or 0),
|
|
}
|
|
)
|
|
|
|
if rows:
|
|
rows.sort(key=lambda row: row.get("last_ts", 0), reverse=True)
|
|
return rows
|
|
|
|
# Fallback: if no anchors yet, surface the runtime contacts (best effort live state)
|
|
for item in combined_contacts:
|
|
identifier = str(
|
|
item.get("identifier") or item.get("jid") or item.get("chat") or ""
|
|
).strip()
|
|
if not identifier:
|
|
continue
|
|
identifier = identifier.split("@", 1)[0].strip()
|
|
if not identifier or identifier in seen:
|
|
continue
|
|
seen.add(identifier)
|
|
jid = str(item.get("jid") or "").strip()
|
|
linked = self._linked_identifier(identifier, jid)
|
|
urls = _compose_urls(
|
|
"whatsapp",
|
|
identifier,
|
|
linked.person_id if linked else None,
|
|
)
|
|
name = (
|
|
str(item.get("name") or item.get("chat") or "").strip()
|
|
or (linked.person.name if linked else "")
|
|
or jid
|
|
or identifier
|
|
or "WhatsApp Chat"
|
|
)
|
|
rows.append(
|
|
{
|
|
"identifier": identifier,
|
|
"jid": jid or identifier,
|
|
"name": name,
|
|
"service_icon_class": _service_icon_class("whatsapp"),
|
|
"person_name": linked.person.name if linked else "",
|
|
"compose_page_url": urls["page_url"],
|
|
"compose_widget_url": urls["widget_url"],
|
|
"match_url": (
|
|
f"{reverse('compose_contact_match')}?"
|
|
f"{urlencode({'service': 'whatsapp', 'identifier': identifier})}"
|
|
),
|
|
"last_ts": 0,
|
|
}
|
|
)
|
|
return rows
|
|
|
|
|
|
class WhatsAppAccountAdd(SuperUserRequiredMixin, ObjectRead):
|
|
detail_template = "partials/whatsapp-account-add.html"
|
|
service = "whatsapp"
|
|
extra_context = {
|
|
"widget_options": 'gs-w="6" gs-h="13" gs-x="0" gs-y="0" gs-min-w="4"',
|
|
}
|
|
context_object_name_singular = "Add Account"
|
|
context_object_name = "Add Account"
|
|
detail_url_name = "whatsapp_account_add"
|
|
detail_url_args = ["type", "device"]
|
|
|
|
def _device_name(self) -> str:
|
|
form_args = self.request.POST.dict()
|
|
return form_args.get("device", "GIA Device")
|
|
|
|
def _refresh_only(self) -> bool:
|
|
form_args = self.request.POST.dict()
|
|
return str(form_args.get("refresh") or "") == "1"
|
|
|
|
def _detail_context(self, kwargs, obj):
|
|
detail_url_args = {
|
|
arg: kwargs[arg] for arg in self.detail_url_args if arg in kwargs
|
|
}
|
|
return {
|
|
"object": obj,
|
|
"detail_url": reverse(self.detail_url_name, kwargs=detail_url_args),
|
|
}
|
|
|
|
def _debug_state(self):
|
|
state = transport.get_runtime_state(self.service)
|
|
now = int(time.time())
|
|
|
|
def _age(key: str) -> str:
|
|
try:
|
|
value = int(state.get(key) or 0)
|
|
except Exception:
|
|
value = 0
|
|
if value <= 0:
|
|
return "n/a"
|
|
return f"{max(0, now - value)}s ago"
|
|
|
|
qr_value = str(state.get("pair_qr") or "")
|
|
contacts = state.get("contacts") or []
|
|
history_imported = int(state.get("history_imported_messages") or 0)
|
|
sqlite_imported = int(state.get("history_sqlite_imported") or 0)
|
|
sqlite_scanned = int(state.get("history_sqlite_scanned") or 0)
|
|
on_demand_requested = bool(state.get("history_on_demand_requested"))
|
|
on_demand_error = str(state.get("history_on_demand_error") or "").strip() or "-"
|
|
on_demand_anchor = (
|
|
str(state.get("history_on_demand_anchor") or "").strip() or "-"
|
|
)
|
|
history_running = bool(state.get("history_sync_running"))
|
|
return [
|
|
f"connected={bool(state.get('connected'))}",
|
|
f"runtime_updated={_age('updated_at')}",
|
|
f"runtime_seen={_age('runtime_seen_at')}",
|
|
f"pair_requested={_age('pair_requested_at')}",
|
|
f"qr_received={_age('qr_received_at')}",
|
|
f"pair_status={state.get('pair_status') or '-'}",
|
|
f"qr_probe_result={state.get('qr_probe_result') or '-'}",
|
|
f"qr_handler_supported={state.get('qr_handler_supported')}",
|
|
f"qr_handler_registered={state.get('qr_handler_registered')}",
|
|
f"last_event={state.get('last_event') or '-'}",
|
|
f"last_error={state.get('last_error') or '-'}",
|
|
f"contacts_source={state.get('contacts_source') or '-'}",
|
|
f"contacts_count={len(contacts) if isinstance(contacts, list) else 0}",
|
|
f"contacts_sync_count={state.get('contacts_sync_count') or 0}",
|
|
f"contacts_synced={_age('contacts_synced_at')}",
|
|
f"history_sync_running={history_running}",
|
|
f"history_started={_age('history_sync_started_at')}",
|
|
f"history_finished={_age('history_sync_finished_at')}",
|
|
f"history_duration_ms={state.get('history_sync_duration_ms') or 0}",
|
|
f"history_imported_messages={history_imported}",
|
|
f"history_sqlite_imported={sqlite_imported}",
|
|
f"history_sqlite_scanned={sqlite_scanned}",
|
|
f"history_sqlite_rows={state.get('history_sqlite_rows') or 0}",
|
|
f"history_sqlite_table={state.get('history_sqlite_table') or '-'}",
|
|
f"history_sqlite_error={state.get('history_sqlite_error') or '-'}",
|
|
f"history_sqlite_ts={_age('history_sqlite_ts')}",
|
|
f"history_on_demand_requested={on_demand_requested}",
|
|
f"history_on_demand_at={_age('history_on_demand_at')}",
|
|
f"history_on_demand_anchor={on_demand_anchor}",
|
|
f"history_on_demand_error={on_demand_error}",
|
|
f"pair_qr_present={bool(qr_value)}",
|
|
f"session_db={state.get('session_db') or '-'}",
|
|
]
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
self.request = request
|
|
if self._refresh_only() and request.htmx:
|
|
obj = self.get_object(**kwargs)
|
|
return render(
|
|
request,
|
|
self.detail_template,
|
|
self._detail_context(kwargs, obj),
|
|
)
|
|
return super().get(request, *args, **kwargs)
|
|
|
|
def get_object(self, **kwargs):
|
|
device_name = self._device_name()
|
|
if not self._refresh_only():
|
|
transport.request_pairing(self.service, device_name)
|
|
debug_lines = self._debug_state()
|
|
log.info(
|
|
"whatsapp add-account runtime debug [%s]: %s",
|
|
("refresh" if self._refresh_only() else "request"),
|
|
" | ".join(debug_lines),
|
|
)
|
|
try:
|
|
image_bytes = transport.get_link_qr(self.service, device_name)
|
|
return {
|
|
"ok": True,
|
|
"image_b64": transport.image_bytes_to_base64(image_bytes),
|
|
"warning": transport.get_service_warning(self.service),
|
|
"debug_lines": debug_lines,
|
|
}
|
|
except Exception as exc:
|
|
error_text = str(exc)
|
|
log.warning("whatsapp add-account get_link_qr failed: %s", error_text)
|
|
return {
|
|
"ok": False,
|
|
"pending": "pairing qr" in error_text.lower(),
|
|
"device": device_name,
|
|
"error": error_text,
|
|
"warning": transport.get_service_warning(self.service),
|
|
"debug_lines": debug_lines,
|
|
}
|