160 lines
5.6 KiB
Python
160 lines
5.6 KiB
Python
from django.shortcuts import render
|
|
from django.urls import reverse
|
|
from django.views import View
|
|
from mixins.views import ObjectList, ObjectRead
|
|
|
|
from core.clients import transport
|
|
from core.views.manage.permissions import SuperUserRequiredMixin
|
|
import time
|
|
|
|
|
|
class WhatsApp(SuperUserRequiredMixin, View):
|
|
template_name = "pages/signal.html"
|
|
service = "whatsapp"
|
|
page_title = "WhatsApp"
|
|
accounts_url_name = "whatsapp_accounts"
|
|
|
|
def get(self, request):
|
|
return render(
|
|
request,
|
|
self.template_name,
|
|
{
|
|
"service": self.service,
|
|
"service_label": self.page_title,
|
|
"accounts_url_name": self.accounts_url_name,
|
|
},
|
|
)
|
|
|
|
|
|
class WhatsAppAccounts(SuperUserRequiredMixin, ObjectList):
|
|
list_template = "partials/signal-accounts.html"
|
|
service = "whatsapp"
|
|
|
|
context_object_name_singular = "WhatsApp Account"
|
|
context_object_name = "WhatsApp Accounts"
|
|
list_url_name = "whatsapp_accounts"
|
|
list_url_args = ["type"]
|
|
|
|
def _normalize_accounts(self, rows):
|
|
out = []
|
|
for item in rows or []:
|
|
if isinstance(item, dict):
|
|
value = (
|
|
item.get("number")
|
|
or item.get("id")
|
|
or item.get("jid")
|
|
or item.get("account")
|
|
)
|
|
if value:
|
|
out.append(str(value))
|
|
elif item:
|
|
out.append(str(item))
|
|
return out
|
|
|
|
def get_queryset(self, **kwargs):
|
|
self.extra_context = {
|
|
"service": "whatsapp",
|
|
"service_label": "WhatsApp",
|
|
"account_add_url_name": "whatsapp_account_add",
|
|
"show_contact_actions": False,
|
|
"service_warning": transport.get_service_warning("whatsapp"),
|
|
}
|
|
return self._normalize_accounts(transport.list_accounts("whatsapp"))
|
|
|
|
|
|
class WhatsAppAccountAdd(SuperUserRequiredMixin, ObjectRead):
|
|
detail_template = "partials/whatsapp-account-add.html"
|
|
service = "whatsapp"
|
|
context_object_name_singular = "Add Account"
|
|
context_object_name = "Add Account"
|
|
detail_url_name = "whatsapp_account_add"
|
|
detail_url_args = ["type", "device"]
|
|
|
|
def _device_name(self) -> str:
|
|
form_args = self.request.POST.dict()
|
|
return form_args.get("device", "GIA Device")
|
|
|
|
def _refresh_only(self) -> bool:
|
|
form_args = self.request.POST.dict()
|
|
return str(form_args.get("refresh") or "") == "1"
|
|
|
|
def _detail_context(self, kwargs, obj):
|
|
detail_url_args = {
|
|
arg: kwargs[arg]
|
|
for arg in self.detail_url_args
|
|
if arg in kwargs
|
|
}
|
|
return {
|
|
"object": obj,
|
|
"detail_url": reverse(self.detail_url_name, kwargs=detail_url_args),
|
|
}
|
|
|
|
def _debug_state(self):
|
|
state = transport.get_runtime_state(self.service)
|
|
now = int(time.time())
|
|
|
|
def _age(key: str) -> str:
|
|
try:
|
|
value = int(state.get(key) or 0)
|
|
except Exception:
|
|
value = 0
|
|
if value <= 0:
|
|
return "n/a"
|
|
return f"{max(0, now - value)}s ago"
|
|
|
|
qr_value = str(state.get("pair_qr") or "")
|
|
return [
|
|
f"connected={bool(state.get('connected'))}",
|
|
f"runtime_updated={_age('updated_at')}",
|
|
f"runtime_seen={_age('runtime_seen_at')}",
|
|
f"pair_requested={_age('pair_requested_at')}",
|
|
f"qr_received={_age('qr_received_at')}",
|
|
f"last_qr_probe={_age('last_qr_probe_at')}",
|
|
f"pair_status={state.get('pair_status') or '-'}",
|
|
f"pair_request_source={state.get('pair_request_source') or '-'}",
|
|
f"qr_probe_result={state.get('qr_probe_result') or '-'}",
|
|
f"qr_handler_supported={state.get('qr_handler_supported')}",
|
|
f"qr_handler_registered={state.get('qr_handler_registered')}",
|
|
f"event_hook_callable={state.get('event_hook_callable')}",
|
|
f"event_support={state.get('event_support') or {}}",
|
|
f"last_event={state.get('last_event') or '-'}",
|
|
f"last_error={state.get('last_error') or '-'}",
|
|
f"pair_qr_present={bool(qr_value)} len={len(qr_value)}",
|
|
f"accounts={state.get('accounts') or []}",
|
|
f"warning={state.get('warning') or '-'}",
|
|
]
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
self.request = request
|
|
if self._refresh_only() and request.htmx:
|
|
obj = self.get_object(**kwargs)
|
|
return render(
|
|
request,
|
|
self.detail_template,
|
|
self._detail_context(kwargs, obj),
|
|
)
|
|
return super().get(request, *args, **kwargs)
|
|
|
|
def get_object(self, **kwargs):
|
|
device_name = self._device_name()
|
|
if not self._refresh_only():
|
|
transport.request_pairing(self.service, device_name)
|
|
try:
|
|
image_bytes = transport.get_link_qr(self.service, device_name)
|
|
return {
|
|
"ok": True,
|
|
"image_b64": transport.image_bytes_to_base64(image_bytes),
|
|
"warning": transport.get_service_warning(self.service),
|
|
"debug_lines": self._debug_state(),
|
|
}
|
|
except Exception as exc:
|
|
error_text = str(exc)
|
|
return {
|
|
"ok": False,
|
|
"pending": "pairing qr" in error_text.lower(),
|
|
"device": device_name,
|
|
"error": error_text,
|
|
"warning": transport.get_service_warning(self.service),
|
|
"debug_lines": self._debug_state(),
|
|
}
|