1144 lines
41 KiB
Python
1144 lines
41 KiB
Python
import asyncio
|
|
import os
|
|
import re
|
|
import time
|
|
from urllib.parse import quote_plus
|
|
|
|
import aiohttp
|
|
from asgiref.sync import sync_to_async
|
|
from django.conf import settings
|
|
|
|
from core.clients import ClientBase, transport
|
|
from core.messaging import history, media_bridge
|
|
from core.models import PersonIdentifier
|
|
|
|
|
|
class WhatsAppClient(ClientBase):
|
|
"""
|
|
Async WhatsApp transport backed by Neonize.
|
|
|
|
Design notes:
|
|
- Runs in UR process.
|
|
- Publishes runtime state to shared cache via transport.
|
|
- Degrades gracefully when Neonize/session is unavailable.
|
|
"""
|
|
|
|
def __init__(self, ur, loop, service="whatsapp"):
|
|
super().__init__(ur, loop, service)
|
|
self._task = None
|
|
self._stopping = False
|
|
self._client = None
|
|
self._build_jid = None
|
|
self._connected = False
|
|
self._last_qr_payload = ""
|
|
self._accounts = []
|
|
self._chat_presence = None
|
|
self._chat_presence_media = None
|
|
self._last_pair_request = 0
|
|
self._next_qr_probe_at = 0.0
|
|
self._qr_handler_registered = False
|
|
self._qr_handler_supported = False
|
|
self._event_hook_callable = False
|
|
|
|
self.enabled = bool(
|
|
str(getattr(settings, "WHATSAPP_ENABLED", "false")).lower()
|
|
in {"1", "true", "yes", "on"}
|
|
)
|
|
self.client_name = str(
|
|
getattr(settings, "WHATSAPP_CLIENT_NAME", "gia_whatsapp")
|
|
).strip() or "gia_whatsapp"
|
|
self.database_url = str(
|
|
getattr(settings, "WHATSAPP_DATABASE_URL", "")
|
|
).strip()
|
|
safe_name = re.sub(r"[^a-zA-Z0-9_.-]+", "_", self.client_name) or "gia_whatsapp"
|
|
self.session_db = self.database_url or f"/tmp/{safe_name}.db"
|
|
|
|
transport.register_runtime_client(self.service, self)
|
|
self._publish_state(
|
|
connected=False,
|
|
warning=(
|
|
"WhatsApp runtime is disabled by settings."
|
|
if not self.enabled
|
|
else ""
|
|
),
|
|
accounts=[],
|
|
last_event="init",
|
|
session_db=self.session_db,
|
|
)
|
|
|
|
def _publish_state(self, **updates):
|
|
state = transport.update_runtime_state(self.service, **updates)
|
|
accounts = state.get("accounts")
|
|
if isinstance(accounts, list):
|
|
self._accounts = accounts
|
|
|
|
def start(self):
|
|
if not self.enabled:
|
|
self.log.info("whatsapp client disabled by settings")
|
|
return
|
|
if self._task is None:
|
|
self.log.info("whatsapp neonize client starting")
|
|
self._task = self.loop.create_task(self._run())
|
|
|
|
async def _run(self):
|
|
try:
|
|
import neonize.aioze.client as wa_client_mod
|
|
from neonize.aioze.client import NewAClient
|
|
from neonize.aioze import events as wa_events
|
|
try:
|
|
from neonize.utils.enum import ChatPresence, ChatPresenceMedia
|
|
except Exception:
|
|
ChatPresence = None
|
|
ChatPresenceMedia = None
|
|
try:
|
|
from neonize.utils import build_jid as wa_build_jid
|
|
except Exception:
|
|
wa_build_jid = None
|
|
except Exception as exc:
|
|
self._publish_state(
|
|
connected=False,
|
|
warning=f"Neonize not available: {exc}",
|
|
accounts=[],
|
|
last_event="neonize_import_failed",
|
|
last_error=str(exc),
|
|
)
|
|
self.log.warning("whatsapp neonize import failed: %s", exc)
|
|
return
|
|
|
|
# Neonize async module ships with its own global event loop object.
|
|
# In this runtime we already have a live asyncio loop; bind Neonize's
|
|
# globals to it so QR/pair callbacks and connect tasks actually execute.
|
|
try:
|
|
wa_events.event_global_loop = self.loop
|
|
wa_client_mod.event_global_loop = self.loop
|
|
self._publish_state(
|
|
neonize_loop_bound=True,
|
|
neonize_loop_type=str(type(self.loop).__name__),
|
|
last_event="neonize_loop_bound",
|
|
)
|
|
except Exception as exc:
|
|
self._publish_state(
|
|
neonize_loop_bound=False,
|
|
last_event="neonize_loop_bind_failed",
|
|
last_error=str(exc),
|
|
)
|
|
self.log.warning("failed binding neonize loop: %s", exc)
|
|
|
|
self._build_jid = wa_build_jid
|
|
self._chat_presence = ChatPresence
|
|
self._chat_presence_media = ChatPresenceMedia
|
|
try:
|
|
db_dir = os.path.dirname(self.session_db)
|
|
if db_dir:
|
|
os.makedirs(db_dir, exist_ok=True)
|
|
except Exception as exc:
|
|
self._publish_state(
|
|
connected=False,
|
|
warning=f"WhatsApp DB path setup failed: {exc}",
|
|
last_event="db_path_setup_failed",
|
|
last_error=str(exc),
|
|
)
|
|
self.log.warning("whatsapp db path setup failed: %s", exc)
|
|
return
|
|
self._client = self._build_client(NewAClient)
|
|
if self._client is None:
|
|
self._publish_state(
|
|
connected=False,
|
|
warning="Failed to initialize Neonize client.",
|
|
accounts=[],
|
|
last_event="client_init_failed",
|
|
last_error="client_none",
|
|
)
|
|
return
|
|
|
|
self._register_event_handlers(wa_events)
|
|
|
|
try:
|
|
await self._maybe_await(self._client.connect())
|
|
await self._after_connect_probe()
|
|
except asyncio.CancelledError:
|
|
raise
|
|
except Exception as exc:
|
|
self._publish_state(
|
|
connected=False,
|
|
warning=f"WhatsApp connect failed: {exc}",
|
|
accounts=[],
|
|
last_event="connect_failed",
|
|
last_error=str(exc),
|
|
)
|
|
self.log.warning("whatsapp connect failed: %s", exc)
|
|
return
|
|
|
|
# Keep task alive so state/callbacks remain active.
|
|
next_heartbeat_at = 0.0
|
|
while not self._stopping:
|
|
now = time.time()
|
|
if now >= next_heartbeat_at:
|
|
self._publish_state(runtime_seen_at=int(now))
|
|
next_heartbeat_at = now + 5.0
|
|
self._mark_qr_wait_timeout()
|
|
await self._sync_pair_request()
|
|
await self._probe_pending_qr(now)
|
|
await asyncio.sleep(1)
|
|
|
|
async def _sync_pair_request(self):
|
|
state = transport.get_runtime_state(self.service)
|
|
requested_at = int(state.get("pair_requested_at") or 0)
|
|
if requested_at <= 0 or requested_at <= self._last_pair_request:
|
|
return
|
|
self._last_pair_request = requested_at
|
|
self._publish_state(
|
|
connected=False,
|
|
pair_qr="",
|
|
warning="Waiting for WhatsApp QR from Neonize.",
|
|
last_event="pair_request_seen",
|
|
pair_status="pending",
|
|
last_error="",
|
|
pair_reconnect_attempted_at=int(time.time()),
|
|
)
|
|
self._next_qr_probe_at = time.time()
|
|
|
|
if self._client is None:
|
|
return
|
|
|
|
try:
|
|
if hasattr(self._client, "disconnect"):
|
|
await self._maybe_await(self._client.disconnect())
|
|
except Exception as exc:
|
|
self._publish_state(
|
|
last_event="pair_disconnect_failed",
|
|
last_error=str(exc),
|
|
)
|
|
self.log.warning("whatsapp disconnect before pairing failed: %s", exc)
|
|
|
|
try:
|
|
await self._maybe_await(self._client.connect())
|
|
self._publish_state(
|
|
last_event="pair_refresh_connected",
|
|
pair_refresh_connected_at=int(time.time()),
|
|
)
|
|
await self._after_connect_probe()
|
|
except Exception as exc:
|
|
self._publish_state(
|
|
connected=False,
|
|
warning=f"WhatsApp pairing refresh failed: {exc}",
|
|
last_event="pair_refresh_failed",
|
|
last_error=str(exc),
|
|
)
|
|
self.log.warning("whatsapp pairing refresh failed: %s", exc)
|
|
|
|
async def _probe_pending_qr(self, now_ts: float):
|
|
state = transport.get_runtime_state(self.service)
|
|
status = str(state.get("pair_status") or "").strip().lower()
|
|
if status != "pending":
|
|
return
|
|
if str(state.get("pair_qr") or "").strip():
|
|
return
|
|
if now_ts < float(self._next_qr_probe_at or 0.0):
|
|
return
|
|
await self._after_connect_probe()
|
|
latest = transport.get_runtime_state(self.service)
|
|
if str(latest.get("pair_qr") or "").strip():
|
|
self._next_qr_probe_at = now_ts + 30.0
|
|
return
|
|
error_text = str(latest.get("last_error") or "").strip().lower()
|
|
# Neonize may report "client is nil" for a few seconds after connect.
|
|
if "client is nil" in error_text:
|
|
self._next_qr_probe_at = now_ts + 2.0
|
|
return
|
|
self._next_qr_probe_at = now_ts + 5.0
|
|
|
|
async def _after_connect_probe(self):
|
|
if self._client is None:
|
|
return
|
|
now_ts = int(time.time())
|
|
try:
|
|
check_connected = getattr(self._client, "is_connected", None)
|
|
if check_connected is not None:
|
|
connected_value = (
|
|
await self._maybe_await(check_connected())
|
|
if callable(check_connected)
|
|
else await self._maybe_await(check_connected)
|
|
)
|
|
if connected_value:
|
|
self._connected = True
|
|
self._publish_state(
|
|
connected=True,
|
|
pair_status="connected",
|
|
last_event="connected_probe",
|
|
connected_at=now_ts,
|
|
)
|
|
except Exception as exc:
|
|
self._publish_state(
|
|
last_event="connected_probe_failed",
|
|
last_error=str(exc),
|
|
)
|
|
|
|
# Neonize does not always emit QR callbacks after reconnect. Try explicit
|
|
# QR-link fetch when available to surface pair data to the UI.
|
|
try:
|
|
if hasattr(self._client, "get_contact_qr_link"):
|
|
qr_link = await self._maybe_await(self._client.get_contact_qr_link())
|
|
qr_payload = self._decode_qr_payload(qr_link)
|
|
self._publish_state(last_qr_probe_at=now_ts)
|
|
if qr_payload:
|
|
self._last_qr_payload = qr_payload
|
|
self._publish_state(
|
|
connected=False,
|
|
pair_qr=qr_payload,
|
|
warning="Scan QR in WhatsApp Linked Devices.",
|
|
last_event="qr_probe_success",
|
|
pair_status="qr_ready",
|
|
qr_received_at=now_ts,
|
|
qr_probe_result="ok",
|
|
last_error="",
|
|
)
|
|
else:
|
|
self._publish_state(
|
|
last_event="qr_probe_empty",
|
|
qr_probe_result="empty",
|
|
)
|
|
except Exception as exc:
|
|
self._publish_state(
|
|
last_event="qr_probe_failed",
|
|
qr_probe_result="error",
|
|
last_error=str(exc),
|
|
last_qr_probe_at=now_ts,
|
|
)
|
|
|
|
def _register_event(self, event_cls, callback):
|
|
if event_cls is None:
|
|
return False
|
|
if self._client is None:
|
|
return False
|
|
event_hook = getattr(self._client, "event", None)
|
|
if not callable(event_hook):
|
|
self._event_hook_callable = False
|
|
return False
|
|
self._event_hook_callable = True
|
|
try:
|
|
decorator = event_hook(event_cls)
|
|
decorator(callback)
|
|
return True
|
|
except Exception as exc:
|
|
self.log.warning(
|
|
"whatsapp event registration failed (%s): %s",
|
|
getattr(event_cls, "__name__", str(event_cls)),
|
|
exc,
|
|
)
|
|
self._publish_state(
|
|
last_event="event_registration_failed",
|
|
last_error=str(exc),
|
|
)
|
|
return False
|
|
|
|
def _register_qr_handler(self):
|
|
if self._client is None:
|
|
self._qr_handler_supported = False
|
|
self._qr_handler_registered = False
|
|
self._publish_state(
|
|
qr_handler_supported=False,
|
|
qr_handler_registered=False,
|
|
)
|
|
return
|
|
if not hasattr(self._client, "qr"):
|
|
self._qr_handler_supported = False
|
|
self._qr_handler_registered = False
|
|
self._publish_state(
|
|
qr_handler_supported=False,
|
|
qr_handler_registered=False,
|
|
last_event="qr_api_missing",
|
|
)
|
|
return
|
|
self._qr_handler_supported = True
|
|
|
|
async def on_qr(client, raw_payload):
|
|
qr_payload = self._decode_qr_payload(raw_payload)
|
|
if not qr_payload:
|
|
return
|
|
self._last_qr_payload = qr_payload
|
|
self._publish_state(
|
|
connected=False,
|
|
pair_qr=qr_payload,
|
|
warning="Scan QR in WhatsApp Linked Devices.",
|
|
last_event="qr_handler",
|
|
pair_status="qr_ready",
|
|
qr_received_at=int(time.time()),
|
|
qr_probe_result="event",
|
|
last_error="",
|
|
)
|
|
|
|
try:
|
|
self._client.qr(on_qr)
|
|
self._qr_handler_registered = True
|
|
self._publish_state(
|
|
qr_handler_supported=True,
|
|
qr_handler_registered=True,
|
|
last_event="qr_handler_registered",
|
|
)
|
|
except Exception as exc:
|
|
self._qr_handler_registered = False
|
|
self._publish_state(
|
|
qr_handler_supported=True,
|
|
qr_handler_registered=False,
|
|
last_event="qr_handler_registration_failed",
|
|
last_error=str(exc),
|
|
)
|
|
self.log.warning("whatsapp qr handler registration failed: %s", exc)
|
|
|
|
def _decode_qr_payload(self, raw_payload):
|
|
if raw_payload is None:
|
|
return ""
|
|
if isinstance(raw_payload, memoryview):
|
|
raw_payload = raw_payload.tobytes()
|
|
if isinstance(raw_payload, bytes):
|
|
return raw_payload.decode("utf-8", errors="ignore").strip()
|
|
if isinstance(raw_payload, (list, tuple)):
|
|
for item in raw_payload:
|
|
candidate = self._decode_qr_payload(item)
|
|
if candidate:
|
|
return candidate
|
|
return ""
|
|
return str(raw_payload).strip()
|
|
|
|
def _build_client(self, cls):
|
|
# NewAClient first arg is the SQLite filename / DB string.
|
|
try:
|
|
return cls(self.session_db)
|
|
except Exception as exc:
|
|
self.log.warning("whatsapp client init failed (%s): %s", self.session_db, exc)
|
|
self._publish_state(
|
|
last_event="client_init_exception",
|
|
last_error=str(exc),
|
|
session_db=self.session_db,
|
|
)
|
|
return None
|
|
|
|
def _register_event_handlers(self, wa_events):
|
|
connected_ev = getattr(wa_events, "ConnectedEv", None)
|
|
message_ev = getattr(wa_events, "MessageEv", None)
|
|
receipt_ev = getattr(wa_events, "ReceiptEv", None)
|
|
chat_presence_ev = getattr(wa_events, "ChatPresenceEv", None)
|
|
presence_ev = getattr(wa_events, "PresenceEv", None)
|
|
pair_ev = getattr(wa_events, "PairStatusEv", None)
|
|
qr_ev = getattr(wa_events, "QREv", None)
|
|
|
|
self._register_qr_handler()
|
|
support = {
|
|
"connected_ev": bool(connected_ev),
|
|
"pair_ev": bool(pair_ev),
|
|
"qr_ev": bool(qr_ev),
|
|
"message_ev": bool(message_ev),
|
|
"receipt_ev": bool(receipt_ev),
|
|
}
|
|
self._publish_state(
|
|
event_hook_callable=bool(getattr(self._client, "event", None)),
|
|
event_support=support,
|
|
last_event="event_handlers_scanned",
|
|
)
|
|
|
|
if connected_ev is not None:
|
|
|
|
async def on_connected(client, event: connected_ev):
|
|
self._connected = True
|
|
account = await self._resolve_account_identifier()
|
|
self._publish_state(
|
|
connected=True,
|
|
warning="",
|
|
accounts=[account] if account else [self.client_name],
|
|
pair_qr="",
|
|
last_event="connected",
|
|
pair_status="connected",
|
|
connected_at=int(time.time()),
|
|
last_error="",
|
|
)
|
|
|
|
self._register_event(connected_ev, on_connected)
|
|
|
|
if message_ev is not None:
|
|
|
|
async def on_message(client, event: message_ev):
|
|
await self._handle_message_event(event)
|
|
|
|
self._register_event(message_ev, on_message)
|
|
|
|
if receipt_ev is not None:
|
|
|
|
async def on_receipt(client, event: receipt_ev):
|
|
await self._handle_receipt_event(event)
|
|
|
|
self._register_event(receipt_ev, on_receipt)
|
|
|
|
if chat_presence_ev is not None:
|
|
|
|
async def on_chat_presence(client, event: chat_presence_ev):
|
|
await self._handle_chat_presence_event(event)
|
|
|
|
self._register_event(chat_presence_ev, on_chat_presence)
|
|
|
|
if presence_ev is not None:
|
|
|
|
async def on_presence(client, event: presence_ev):
|
|
await self._handle_presence_event(event)
|
|
|
|
self._register_event(presence_ev, on_presence)
|
|
|
|
if pair_ev is not None:
|
|
|
|
async def on_pair_status(client, event: pair_ev):
|
|
qr_payload = self._extract_pair_qr(event)
|
|
if qr_payload:
|
|
self._last_qr_payload = qr_payload
|
|
self._publish_state(
|
|
pair_qr=qr_payload,
|
|
warning="Scan QR in WhatsApp Linked Devices.",
|
|
last_event="pair_status_qr",
|
|
pair_status="qr_ready",
|
|
qr_received_at=int(time.time()),
|
|
qr_probe_result="event",
|
|
last_error="",
|
|
)
|
|
status_raw = self._pluck(event, "Status")
|
|
status_text = str(status_raw or "").strip().lower()
|
|
if status_text in {"2", "success"}:
|
|
account = await self._resolve_account_identifier()
|
|
self._connected = True
|
|
self._publish_state(
|
|
connected=True,
|
|
warning="",
|
|
accounts=[account] if account else [self.client_name],
|
|
pair_qr="",
|
|
last_event="pair_status_success",
|
|
pair_status="connected",
|
|
connected_at=int(time.time()),
|
|
last_error="",
|
|
)
|
|
elif status_text in {"1", "error"}:
|
|
error_text = str(self._pluck(event, "Error") or "").strip()
|
|
self._publish_state(
|
|
warning=error_text or "WhatsApp pairing failed. Retry scan.",
|
|
last_event="pair_status_error",
|
|
pair_status="error",
|
|
last_error=error_text or "unknown_pair_error",
|
|
)
|
|
|
|
self._register_event(pair_ev, on_pair_status)
|
|
|
|
if qr_ev is not None:
|
|
|
|
async def on_qr_event(client, event: qr_ev):
|
|
qr_payload = self._extract_pair_qr(event)
|
|
if not qr_payload:
|
|
return
|
|
self._last_qr_payload = qr_payload
|
|
self._publish_state(
|
|
connected=False,
|
|
pair_qr=qr_payload,
|
|
warning="Scan QR in WhatsApp Linked Devices.",
|
|
last_event="qr_event",
|
|
pair_status="qr_ready",
|
|
qr_received_at=int(time.time()),
|
|
qr_probe_result="event",
|
|
last_error="",
|
|
)
|
|
|
|
self._register_event(qr_ev, on_qr_event)
|
|
|
|
def _mark_qr_wait_timeout(self):
|
|
state = transport.get_runtime_state(self.service)
|
|
if str(state.get("pair_status") or "").strip().lower() != "pending":
|
|
return
|
|
requested_at = int(state.get("pair_requested_at") or 0)
|
|
qr_received_at = int(state.get("qr_received_at") or 0)
|
|
if requested_at <= 0 or qr_received_at > 0:
|
|
return
|
|
now = int(time.time())
|
|
age = now - requested_at
|
|
# Avoid spamming writes while still surfacing a clear timeout state.
|
|
if age < 15 or (age % 10) != 0:
|
|
return
|
|
self._publish_state(
|
|
last_event="pair_waiting_no_qr",
|
|
warning=(
|
|
"Waiting for WhatsApp QR from Neonize. "
|
|
"No QR callback received yet."
|
|
),
|
|
)
|
|
|
|
async def _maybe_await(self, value):
|
|
if asyncio.iscoroutine(value):
|
|
return await value
|
|
return value
|
|
|
|
async def _resolve_account_identifier(self):
|
|
if self._client is None:
|
|
return ""
|
|
if not hasattr(self._client, "get_me"):
|
|
return self.client_name
|
|
try:
|
|
me = await self._maybe_await(self._client.get_me())
|
|
except Exception:
|
|
return self.client_name
|
|
# Support both dict-like and object-like payloads.
|
|
for path in (
|
|
("JID",),
|
|
("jid",),
|
|
):
|
|
value = self._jid_to_identifier(self._pluck(me, *path))
|
|
if value:
|
|
return value
|
|
for path in (
|
|
("JID", "User"),
|
|
("jid",),
|
|
("user",),
|
|
("ID",),
|
|
):
|
|
value = self._pluck(me, *path)
|
|
if value:
|
|
return str(value)
|
|
return self.client_name
|
|
|
|
def _pluck(self, obj, *path):
|
|
current = obj
|
|
for key in path:
|
|
if current is None:
|
|
return None
|
|
if isinstance(current, dict):
|
|
current = current.get(key)
|
|
continue
|
|
if hasattr(current, key):
|
|
current = getattr(current, key)
|
|
continue
|
|
return None
|
|
return current
|
|
|
|
def _normalize_timestamp(self, raw_value):
|
|
if raw_value is None:
|
|
return int(time.time() * 1000)
|
|
try:
|
|
value = int(raw_value)
|
|
except Exception:
|
|
return int(time.time() * 1000)
|
|
# WhatsApp libs often emit seconds. Promote to ms.
|
|
if value < 10**12:
|
|
return value * 1000
|
|
return value
|
|
|
|
def _normalize_identifier_candidates(self, *values):
|
|
out = set()
|
|
for value in values:
|
|
raw = self._jid_to_identifier(value)
|
|
if not raw:
|
|
continue
|
|
out.add(raw)
|
|
if "@" in raw:
|
|
out.add(raw.split("@", 1)[0])
|
|
digits = re.sub(r"[^0-9]", "", raw)
|
|
if digits:
|
|
out.add(digits)
|
|
if not digits.startswith("+"):
|
|
out.add(f"+{digits}")
|
|
return out
|
|
|
|
def _jid_to_identifier(self, value):
|
|
raw = str(value or "").strip()
|
|
if not raw:
|
|
return ""
|
|
if not isinstance(value, str):
|
|
user = self._pluck(value, "User") or self._pluck(value, "user")
|
|
server = self._pluck(value, "Server") or self._pluck(value, "server")
|
|
if user and server:
|
|
return f"{user}@{server}"
|
|
if user:
|
|
return str(user)
|
|
return raw
|
|
|
|
def _is_media_message(self, message_obj):
|
|
media_fields = (
|
|
"imageMessage",
|
|
"videoMessage",
|
|
"audioMessage",
|
|
"documentMessage",
|
|
"stickerMessage",
|
|
"image_message",
|
|
"video_message",
|
|
"audio_message",
|
|
"document_message",
|
|
"sticker_message",
|
|
)
|
|
for field in media_fields:
|
|
value = self._pluck(message_obj, field)
|
|
if value:
|
|
return True
|
|
return False
|
|
|
|
async def _download_event_media(self, event):
|
|
if not self._client:
|
|
return []
|
|
msg_obj = self._pluck(event, "message") or self._pluck(event, "Message")
|
|
if msg_obj is None or not self._is_media_message(msg_obj):
|
|
return []
|
|
if not hasattr(self._client, "download_any"):
|
|
return []
|
|
|
|
try:
|
|
payload = await self._maybe_await(self._client.download_any(msg_obj))
|
|
except Exception as exc:
|
|
self.log.warning("whatsapp media download failed: %s", exc)
|
|
return []
|
|
|
|
if isinstance(payload, memoryview):
|
|
payload = payload.tobytes()
|
|
if not isinstance(payload, (bytes, bytearray)):
|
|
return []
|
|
|
|
filename = (
|
|
self._pluck(msg_obj, "documentMessage", "fileName")
|
|
or self._pluck(msg_obj, "document_message", "file_name")
|
|
or f"wa-{int(time.time())}.bin"
|
|
)
|
|
content_type = (
|
|
self._pluck(msg_obj, "documentMessage", "mimetype")
|
|
or self._pluck(msg_obj, "document_message", "mimetype")
|
|
or self._pluck(msg_obj, "imageMessage", "mimetype")
|
|
or self._pluck(msg_obj, "image_message", "mimetype")
|
|
or "application/octet-stream"
|
|
)
|
|
blob_key = media_bridge.put_blob(
|
|
service="whatsapp",
|
|
content=bytes(payload),
|
|
filename=filename,
|
|
content_type=content_type,
|
|
)
|
|
if not blob_key:
|
|
return []
|
|
return [
|
|
{
|
|
"blob_key": blob_key,
|
|
"filename": filename,
|
|
"content_type": content_type,
|
|
"size": len(payload),
|
|
}
|
|
]
|
|
|
|
async def _handle_message_event(self, event):
|
|
msg_obj = self._pluck(event, "message") or self._pluck(event, "Message")
|
|
text = (
|
|
self._pluck(msg_obj, "conversation")
|
|
or self._pluck(msg_obj, "Conversation")
|
|
or self._pluck(msg_obj, "extendedTextMessage", "text")
|
|
or self._pluck(msg_obj, "ExtendedTextMessage", "Text")
|
|
or self._pluck(msg_obj, "extended_text_message", "text")
|
|
or ""
|
|
)
|
|
source = (
|
|
self._pluck(event, "Info", "MessageSource")
|
|
or self._pluck(event, "info", "message_source")
|
|
or self._pluck(event, "info", "messageSource")
|
|
)
|
|
is_from_me = bool(
|
|
self._pluck(source, "IsFromMe")
|
|
or self._pluck(source, "isFromMe")
|
|
)
|
|
if is_from_me:
|
|
return
|
|
|
|
sender = self._jid_to_identifier(
|
|
self._pluck(source, "Sender")
|
|
or self._pluck(source, "sender")
|
|
or self._pluck(source, "SenderAlt")
|
|
or self._pluck(source, "senderAlt")
|
|
)
|
|
chat = self._jid_to_identifier(
|
|
self._pluck(source, "Chat")
|
|
or self._pluck(source, "chat")
|
|
)
|
|
raw_ts = (
|
|
self._pluck(event, "Info", "Timestamp")
|
|
or self._pluck(event, "info", "timestamp")
|
|
or self._pluck(event, "info", "message_timestamp")
|
|
or self._pluck(event, "Timestamp")
|
|
or self._pluck(event, "timestamp")
|
|
)
|
|
ts = self._normalize_timestamp(raw_ts)
|
|
|
|
identifier_values = self._normalize_identifier_candidates(sender, chat)
|
|
if not identifier_values:
|
|
return
|
|
|
|
identifiers = await sync_to_async(list)(
|
|
PersonIdentifier.objects.filter(
|
|
service="whatsapp",
|
|
identifier__in=list(identifier_values),
|
|
)
|
|
)
|
|
if not identifiers:
|
|
return
|
|
|
|
attachments = await self._download_event_media(event)
|
|
xmpp_attachments = []
|
|
if attachments:
|
|
fetched = await asyncio.gather(
|
|
*[transport.fetch_attachment(self.service, att) for att in attachments]
|
|
)
|
|
xmpp_attachments = [row for row in fetched if row]
|
|
|
|
payload = {
|
|
"sender": str(sender or ""),
|
|
"chat": str(chat or ""),
|
|
"raw_event": str(type(event).__name__),
|
|
}
|
|
|
|
for identifier in identifiers:
|
|
uploaded_urls = await self.ur.xmpp.client.send_from_external(
|
|
identifier.user,
|
|
identifier,
|
|
text,
|
|
is_outgoing_message=False,
|
|
attachments=xmpp_attachments,
|
|
)
|
|
display_text = text
|
|
if (not display_text) and uploaded_urls:
|
|
display_text = "\n".join(uploaded_urls)
|
|
if (not display_text) and attachments:
|
|
media_urls = [
|
|
self._blob_key_to_compose_url((att or {}).get("blob_key"))
|
|
for att in attachments
|
|
]
|
|
media_urls = [url for url in media_urls if url]
|
|
if media_urls:
|
|
display_text = "\n".join(media_urls)
|
|
|
|
session = await history.get_chat_session(identifier.user, identifier)
|
|
await history.store_message(
|
|
session=session,
|
|
sender=str(sender or chat or ""),
|
|
text=display_text,
|
|
ts=ts,
|
|
outgoing=False,
|
|
)
|
|
await self.ur.message_received(
|
|
self.service,
|
|
identifier=identifier,
|
|
text=display_text,
|
|
ts=ts,
|
|
payload=payload,
|
|
)
|
|
|
|
async def _handle_receipt_event(self, event):
|
|
source = (
|
|
self._pluck(event, "MessageSource")
|
|
or self._pluck(event, "info", "message_source")
|
|
or self._pluck(event, "info", "messageSource")
|
|
)
|
|
sender = self._jid_to_identifier(
|
|
self._pluck(source, "Sender")
|
|
or self._pluck(source, "sender")
|
|
)
|
|
chat = self._jid_to_identifier(
|
|
self._pluck(source, "Chat") or self._pluck(source, "chat")
|
|
)
|
|
timestamps = []
|
|
raw_ids = self._pluck(event, "MessageIDs") or self._pluck(event, "message_ids") or []
|
|
if isinstance(raw_ids, (list, tuple, set)):
|
|
for item in raw_ids:
|
|
try:
|
|
value = int(item)
|
|
timestamps.append(value * 1000 if value < 10**12 else value)
|
|
except Exception:
|
|
continue
|
|
read_ts = self._normalize_timestamp(
|
|
self._pluck(event, "Timestamp")
|
|
or self._pluck(event, "timestamp")
|
|
or int(time.time() * 1000)
|
|
)
|
|
receipt_type = str(self._pluck(event, "Type") or "").strip()
|
|
|
|
for candidate in self._normalize_identifier_candidates(sender, chat):
|
|
await self.ur.message_read(
|
|
self.service,
|
|
identifier=candidate,
|
|
message_timestamps=timestamps,
|
|
read_ts=read_ts,
|
|
read_by=sender or chat,
|
|
payload={
|
|
"event": "receipt",
|
|
"type": receipt_type,
|
|
"sender": str(sender),
|
|
"chat": str(chat),
|
|
},
|
|
)
|
|
|
|
async def _handle_chat_presence_event(self, event):
|
|
source = (
|
|
self._pluck(event, "MessageSource")
|
|
or self._pluck(event, "message_source")
|
|
or {}
|
|
)
|
|
sender = self._jid_to_identifier(
|
|
self._pluck(source, "Sender")
|
|
or self._pluck(source, "sender")
|
|
)
|
|
chat = self._jid_to_identifier(
|
|
self._pluck(source, "Chat") or self._pluck(source, "chat")
|
|
)
|
|
state = self._pluck(event, "State") or self._pluck(event, "state")
|
|
state_text = str(state or "").strip().lower()
|
|
is_typing = state_text in {"1", "composing", "chat_presence_composing"}
|
|
|
|
for candidate in self._normalize_identifier_candidates(sender, chat):
|
|
if is_typing:
|
|
await self.ur.started_typing(
|
|
self.service,
|
|
identifier=candidate,
|
|
payload={"presence": state_text, "sender": str(sender), "chat": str(chat)},
|
|
)
|
|
else:
|
|
await self.ur.stopped_typing(
|
|
self.service,
|
|
identifier=candidate,
|
|
payload={"presence": state_text, "sender": str(sender), "chat": str(chat)},
|
|
)
|
|
|
|
async def _handle_presence_event(self, event):
|
|
sender = self._jid_to_identifier(
|
|
self._pluck(event, "From", "User")
|
|
or self._pluck(event, "from", "user")
|
|
)
|
|
is_unavailable = bool(
|
|
self._pluck(event, "Unavailable") or self._pluck(event, "unavailable")
|
|
)
|
|
|
|
for candidate in self._normalize_identifier_candidates(sender):
|
|
if is_unavailable:
|
|
await self.ur.stopped_typing(
|
|
self.service,
|
|
identifier=candidate,
|
|
payload={"presence": "offline", "sender": str(sender)},
|
|
)
|
|
|
|
def _extract_pair_qr(self, event):
|
|
codes = self._pluck(event, "Codes") or self._pluck(event, "codes") or []
|
|
decoded_codes = self._decode_qr_payload(codes)
|
|
if decoded_codes:
|
|
return decoded_codes
|
|
|
|
for path in (
|
|
("qr",),
|
|
("qr_code",),
|
|
("code",),
|
|
("pair_code",),
|
|
("pairCode",),
|
|
("url",),
|
|
):
|
|
value = self._pluck(event, *path)
|
|
if value:
|
|
return str(value)
|
|
return ""
|
|
|
|
def _to_jid(self, recipient):
|
|
raw = str(recipient or "").strip()
|
|
if not raw:
|
|
return ""
|
|
if self._build_jid is not None:
|
|
try:
|
|
return self._build_jid(raw)
|
|
except Exception:
|
|
pass
|
|
if "@" in raw:
|
|
return raw
|
|
digits = re.sub(r"[^0-9]", "", raw)
|
|
if digits:
|
|
return f"{digits}@s.whatsapp.net"
|
|
return raw
|
|
|
|
def _blob_key_to_compose_url(self, blob_key):
|
|
key = str(blob_key or "").strip()
|
|
if not key:
|
|
return ""
|
|
return f"/compose/media/blob/?key={quote_plus(key)}"
|
|
|
|
async def _fetch_attachment_payload(self, attachment):
|
|
blob_key = (attachment or {}).get("blob_key")
|
|
if blob_key:
|
|
row = media_bridge.get_blob(blob_key)
|
|
if row:
|
|
return row
|
|
|
|
content = (attachment or {}).get("content")
|
|
if isinstance(content, memoryview):
|
|
content = content.tobytes()
|
|
if isinstance(content, bytes):
|
|
return {
|
|
"content": content,
|
|
"filename": (attachment or {}).get("filename") or "attachment.bin",
|
|
"content_type": (attachment or {}).get("content_type")
|
|
or "application/octet-stream",
|
|
"size": len(content),
|
|
}
|
|
|
|
url = (attachment or {}).get("url")
|
|
if url:
|
|
timeout = aiohttp.ClientTimeout(total=20)
|
|
async with aiohttp.ClientSession(timeout=timeout) as session:
|
|
async with session.get(url) as response:
|
|
if response.status != 200:
|
|
return None
|
|
payload = await response.read()
|
|
return {
|
|
"content": payload,
|
|
"filename": (attachment or {}).get("filename")
|
|
or url.rstrip("/").split("/")[-1]
|
|
or "attachment.bin",
|
|
"content_type": (attachment or {}).get("content_type")
|
|
or response.headers.get(
|
|
"Content-Type", "application/octet-stream"
|
|
),
|
|
"size": len(payload),
|
|
}
|
|
return None
|
|
|
|
async def send_message_raw(self, recipient, text=None, attachments=None):
|
|
if not self._client:
|
|
return False
|
|
jid = self._to_jid(recipient)
|
|
if not jid:
|
|
return False
|
|
|
|
sent_any = False
|
|
sent_ts = 0
|
|
for attachment in attachments or []:
|
|
payload = await self._fetch_attachment_payload(attachment)
|
|
if not payload:
|
|
continue
|
|
mime = str(payload.get("content_type") or "application/octet-stream").lower()
|
|
data = payload.get("content") or b""
|
|
filename = payload.get("filename") or "attachment.bin"
|
|
|
|
try:
|
|
if mime.startswith("image/") and hasattr(self._client, "send_image"):
|
|
response = await self._maybe_await(
|
|
self._client.send_image(jid, data, caption="")
|
|
)
|
|
elif mime.startswith("video/") and hasattr(self._client, "send_video"):
|
|
response = await self._maybe_await(
|
|
self._client.send_video(jid, data, caption="")
|
|
)
|
|
elif mime.startswith("audio/") and hasattr(self._client, "send_audio"):
|
|
response = await self._maybe_await(self._client.send_audio(jid, data))
|
|
elif hasattr(self._client, "send_document"):
|
|
response = await self._maybe_await(
|
|
self._client.send_document(
|
|
jid,
|
|
data,
|
|
filename=filename,
|
|
mimetype=mime,
|
|
caption="",
|
|
)
|
|
)
|
|
else:
|
|
response = None
|
|
sent_ts = max(
|
|
sent_ts,
|
|
self._normalize_timestamp(self._pluck(response, "Timestamp") or 0),
|
|
)
|
|
sent_any = True
|
|
except Exception as exc:
|
|
self.log.warning("whatsapp attachment send failed: %s", exc)
|
|
|
|
if text:
|
|
try:
|
|
response = await self._maybe_await(self._client.send_message(jid, text))
|
|
sent_any = True
|
|
except TypeError:
|
|
response = await self._maybe_await(
|
|
self._client.send_message(jid, message=text)
|
|
)
|
|
sent_any = True
|
|
except Exception as exc:
|
|
self.log.warning("whatsapp text send failed: %s", exc)
|
|
return False
|
|
sent_ts = max(
|
|
sent_ts,
|
|
self._normalize_timestamp(self._pluck(response, "Timestamp") or 0),
|
|
)
|
|
|
|
if not sent_any:
|
|
return False
|
|
return sent_ts or int(time.time() * 1000)
|
|
|
|
async def start_typing(self, identifier):
|
|
if not self._client:
|
|
return False
|
|
jid = self._to_jid(identifier)
|
|
if not jid:
|
|
return False
|
|
if (
|
|
hasattr(self._client, "send_chat_presence")
|
|
and self._chat_presence is not None
|
|
and self._chat_presence_media is not None
|
|
):
|
|
try:
|
|
await self._maybe_await(
|
|
self._client.send_chat_presence(
|
|
jid,
|
|
self._chat_presence.CHAT_PRESENCE_COMPOSING,
|
|
self._chat_presence_media.CHAT_PRESENCE_MEDIA_TEXT,
|
|
)
|
|
)
|
|
return True
|
|
except Exception:
|
|
pass
|
|
if hasattr(self._client, "set_chat_presence"):
|
|
try:
|
|
await self._maybe_await(self._client.set_chat_presence(jid, "composing"))
|
|
return True
|
|
except Exception:
|
|
pass
|
|
return False
|
|
|
|
async def stop_typing(self, identifier):
|
|
if not self._client:
|
|
return False
|
|
jid = self._to_jid(identifier)
|
|
if not jid:
|
|
return False
|
|
if (
|
|
hasattr(self._client, "send_chat_presence")
|
|
and self._chat_presence is not None
|
|
and self._chat_presence_media is not None
|
|
):
|
|
try:
|
|
await self._maybe_await(
|
|
self._client.send_chat_presence(
|
|
jid,
|
|
self._chat_presence.CHAT_PRESENCE_PAUSED,
|
|
self._chat_presence_media.CHAT_PRESENCE_MEDIA_TEXT,
|
|
)
|
|
)
|
|
return True
|
|
except Exception:
|
|
pass
|
|
if hasattr(self._client, "set_chat_presence"):
|
|
try:
|
|
await self._maybe_await(self._client.set_chat_presence(jid, "paused"))
|
|
return True
|
|
except Exception:
|
|
pass
|
|
return False
|
|
|
|
async def fetch_attachment(self, attachment_ref):
|
|
blob_key = (attachment_ref or {}).get("blob_key")
|
|
if blob_key:
|
|
return media_bridge.get_blob(blob_key)
|
|
return None
|
|
|
|
def get_link_qr_png(self, device_name):
|
|
_ = (device_name or "").strip()
|
|
if not self._last_qr_payload:
|
|
return None
|
|
try:
|
|
return transport._as_qr_png(self._last_qr_payload)
|
|
except Exception:
|
|
return None
|