Files
GIA/core/clients/whatsapp.py
2026-02-19 17:13:34 +00:00

3212 lines
124 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import inspect
import logging
import mimetypes
import os
import re
import sqlite3
import time
from urllib.parse import quote_plus
import aiohttp
from asgiref.sync import sync_to_async
from django.conf import settings
from django.core.cache import cache
from core.clients import ClientBase, transport
from core.messaging import history, media_bridge
from core.models import Message, PersonIdentifier, PlatformChatLink
class WhatsAppClient(ClientBase):
"""
Async WhatsApp transport backed by Neonize.
Design notes:
- Runs in UR process.
- Publishes runtime state to shared cache via transport.
- Degrades gracefully when Neonize/session is unavailable.
"""
def __init__(self, ur, loop, service="whatsapp"):
super().__init__(ur, loop, service)
self._task = None
self._stopping = False
self._client = None
self._build_jid = None
self._connected = False
self._last_qr_payload = ""
self._accounts = []
self._chat_presence = None
self._chat_presence_media = None
self._last_pair_request = 0
self._next_qr_probe_at = 0.0
self._qr_handler_registered = False
self._qr_handler_supported = False
self._event_hook_callable = False
self._last_send_error = ""
self.enabled = bool(
str(getattr(settings, "WHATSAPP_ENABLED", "false")).lower()
in {"1", "true", "yes", "on"}
)
self.client_name = (
str(getattr(settings, "WHATSAPP_CLIENT_NAME", "gia_whatsapp")).strip()
or "gia_whatsapp"
)
self.database_url = str(getattr(settings, "WHATSAPP_DATABASE_URL", "")).strip()
safe_name = re.sub(r"[^a-zA-Z0-9_.-]+", "_", self.client_name) or "gia_whatsapp"
# Use a persistent default path (under project mount) instead of /tmp so
# link state and contact cache survive container restarts.
default_db_dir = str(
getattr(settings, "WHATSAPP_DB_DIR", "/var/tmp/whatsapp")
).strip()
self.session_db = self.database_url or os.path.join(
default_db_dir,
f"{safe_name}_neonize_v2.db",
)
transport.register_runtime_client(self.service, self)
prior_state = transport.get_runtime_state(self.service)
prior_accounts = prior_state.get("accounts")
if not isinstance(prior_accounts, list):
prior_accounts = []
self._publish_state(
connected=False,
warning=(
"WhatsApp runtime is disabled by settings." if not self.enabled else ""
),
accounts=prior_accounts,
last_event="init",
session_db=self.session_db,
)
# Reduce third-party library logging noise (neonize/grpc/protobuf).
try:
# Common noisy libraries used by Neonize/WhatsApp stacks
logging.getLogger("neonize").setLevel(logging.WARNING)
logging.getLogger("grpc").setLevel(logging.WARNING)
logging.getLogger("google").setLevel(logging.WARNING)
logging.getLogger("protobuf").setLevel(logging.WARNING)
logging.getLogger("whatsmeow").setLevel(logging.WARNING)
logging.getLogger("whatsmeow.Client").setLevel(logging.WARNING)
logging.getLogger("aiohttp.access").setLevel(logging.WARNING)
except Exception:
pass
def _publish_state(self, **updates):
state = transport.update_runtime_state(self.service, **updates)
accounts = state.get("accounts")
if isinstance(accounts, list):
self._accounts = accounts
def start(self):
if not self.enabled:
self.log.info("whatsapp client disabled by settings")
return
if self._task is None:
self.log.info("whatsapp neonize client starting")
self._task = self.loop.create_task(self._run())
async def _run(self):
try:
import neonize.aioze.client as wa_client_mod
from neonize.aioze import events as wa_events
from neonize.aioze.client import NewAClient
try:
from neonize.utils.enum import ChatPresence, ChatPresenceMedia
except Exception:
ChatPresence = None
ChatPresenceMedia = None
try:
from neonize.utils import build_jid as wa_build_jid
except Exception:
wa_build_jid = None
except Exception as exc:
self._publish_state(
connected=False,
warning=f"Neonize not available: {exc}",
accounts=[],
last_event="neonize_import_failed",
last_error=str(exc),
)
self.log.warning("whatsapp neonize import failed: %s", exc)
return
# Neonize async module ships with its own global event loop object.
# In this runtime we already have a live asyncio loop; bind Neonize's
# globals to it so QR/pair callbacks and connect tasks actually execute.
try:
wa_events.event_global_loop = self.loop
wa_client_mod.event_global_loop = self.loop
self._publish_state(
neonize_loop_bound=True,
neonize_loop_type=str(type(self.loop).__name__),
last_event="neonize_loop_bound",
)
except Exception as exc:
self._publish_state(
neonize_loop_bound=False,
last_event="neonize_loop_bind_failed",
last_error=str(exc),
)
self.log.warning("failed binding neonize loop: %s", exc)
self._build_jid = wa_build_jid
self._chat_presence = ChatPresence
self._chat_presence_media = ChatPresenceMedia
try:
db_dir = os.path.dirname(self.session_db)
if db_dir:
os.makedirs(db_dir, exist_ok=True)
if db_dir and not os.access(db_dir, os.W_OK):
raise PermissionError(
f"session db directory is not writable: {db_dir}"
)
except Exception as exc:
self._publish_state(
connected=False,
warning=f"WhatsApp DB path setup failed: {exc}",
last_event="db_path_setup_failed",
last_error=str(exc),
)
self.log.warning("whatsapp db path setup failed: %s", exc)
return
self._client = self._build_client(NewAClient)
if self._client is None:
self._publish_state(
connected=False,
warning="Failed to initialize Neonize client.",
accounts=[],
last_event="client_init_failed",
last_error="client_none",
)
return
if not self._session_has_device():
self._publish_state(
connected=False,
warning="No linked WhatsApp device. Waiting for QR pairing.",
accounts=[],
last_event="no_linked_device",
pair_status="needs_pairing",
)
self.log.info(
"whatsapp session db has no linked device — connect will "
"start QR pairing flow"
)
self._register_event_handlers(wa_events)
try:
await self._maybe_await(self._client.connect())
await self._after_connect_probe()
except asyncio.CancelledError:
raise
except Exception as exc:
self._publish_state(
connected=False,
warning=f"WhatsApp connect failed: {exc}",
accounts=[],
last_event="connect_failed",
last_error=str(exc),
)
self.log.warning("whatsapp connect failed: %s", exc)
return
# Keep task alive so state/callbacks remain active.
next_heartbeat_at = 0.0
next_contacts_sync_at = 0.0
while not self._stopping:
now = time.time()
if now >= next_heartbeat_at:
self._publish_state(runtime_seen_at=int(now))
next_heartbeat_at = now + 5.0
await self._drain_runtime_commands()
if now >= next_contacts_sync_at:
await self._sync_contacts_from_client()
next_contacts_sync_at = now + 30.0
self._mark_qr_wait_timeout()
await self._sync_pair_request()
await self._probe_pending_qr(now)
await asyncio.sleep(1)
async def _sync_pair_request(self):
state = transport.get_runtime_state(self.service)
requested_at = int(state.get("pair_requested_at") or 0)
if requested_at <= 0 or requested_at <= self._last_pair_request:
return
self._last_pair_request = requested_at
self._publish_state(
connected=False,
pair_qr="",
warning="Waiting for WhatsApp QR from Neonize.",
last_event="pair_request_seen",
pair_status="pending",
last_error="",
pair_reconnect_attempted_at=int(time.time()),
)
self._next_qr_probe_at = time.time()
if self._client is None:
return
try:
if hasattr(self._client, "disconnect"):
await self._maybe_await(self._client.disconnect())
except Exception as exc:
self._publish_state(
last_event="pair_disconnect_failed",
last_error=str(exc),
)
self.log.warning("whatsapp disconnect before pairing failed: %s", exc)
try:
await self._maybe_await(self._client.connect())
self._publish_state(
last_event="pair_refresh_connected",
pair_refresh_connected_at=int(time.time()),
)
await self._after_connect_probe()
except Exception as exc:
self._publish_state(
connected=False,
warning=f"WhatsApp pairing refresh failed: {exc}",
last_event="pair_refresh_failed",
last_error=str(exc),
)
self.log.warning("whatsapp pairing refresh failed: %s", exc)
async def _probe_pending_qr(self, now_ts: float):
state = transport.get_runtime_state(self.service)
status = str(state.get("pair_status") or "").strip().lower()
if status != "pending":
return
if str(state.get("pair_qr") or "").strip():
return
if now_ts < float(self._next_qr_probe_at or 0.0):
return
await self._after_connect_probe()
latest = transport.get_runtime_state(self.service)
if str(latest.get("pair_qr") or "").strip():
self._next_qr_probe_at = now_ts + 30.0
return
error_text = str(latest.get("last_error") or "").strip().lower()
# Neonize may report "client is nil" for a few seconds after connect.
if "client is nil" in error_text:
self._next_qr_probe_at = now_ts + 2.0
return
self._next_qr_probe_at = now_ts + 5.0
async def _after_connect_probe(self):
if self._client is None:
return
now_ts = int(time.time())
try:
check_connected = getattr(self._client, "is_connected", None)
if check_connected is not None:
connected_value = (
await self._maybe_await(check_connected())
if callable(check_connected)
else await self._maybe_await(check_connected)
)
if connected_value:
self._connected = True
account = await self._resolve_account_identifier()
account_list = [account] if account else list(self._accounts or [])
if not account_list:
account_list = [self.client_name]
self._publish_state(
connected=True,
accounts=account_list,
pair_status="connected",
last_event="connected_probe",
connected_at=now_ts,
)
except Exception as exc:
self._publish_state(
last_event="connected_probe_failed",
last_error=str(exc),
)
if self._connected:
await self._sync_contacts_from_client()
# Neonize does not always emit QR callbacks after reconnect. Try explicit
# QR-link fetch when available to surface pair data to the UI.
try:
if hasattr(self._client, "get_contact_qr_link"):
qr_link = await self._maybe_await(self._client.get_contact_qr_link())
qr_payload = self._decode_qr_payload(qr_link)
self._publish_state(last_qr_probe_at=now_ts)
if qr_payload:
self._last_qr_payload = qr_payload
self._publish_state(
connected=False,
pair_qr=qr_payload,
warning="Scan QR in WhatsApp Linked Devices.",
last_event="qr_probe_success",
pair_status="qr_ready",
qr_received_at=now_ts,
qr_probe_result="ok",
last_error="",
)
else:
self._publish_state(
last_event="qr_probe_empty",
qr_probe_result="empty",
)
except Exception as exc:
self._publish_state(
last_event="qr_probe_failed",
qr_probe_result="error",
last_error=str(exc),
last_qr_probe_at=now_ts,
)
def _register_event(self, event_cls, callback):
if event_cls is None:
return False
if self._client is None:
return False
event_hook = getattr(self._client, "event", None)
if not callable(event_hook):
self._event_hook_callable = False
return False
self._event_hook_callable = True
try:
decorator = event_hook(event_cls)
decorator(callback)
return True
except Exception as exc:
self.log.warning(
"whatsapp event registration failed (%s): %s",
getattr(event_cls, "__name__", str(event_cls)),
exc,
)
self._publish_state(
last_event="event_registration_failed",
last_error=str(exc),
)
return False
def _register_qr_handler(self):
if self._client is None:
self._qr_handler_supported = False
self._qr_handler_registered = False
self._publish_state(
qr_handler_supported=False,
qr_handler_registered=False,
)
return
if not hasattr(self._client, "qr"):
self._qr_handler_supported = False
self._qr_handler_registered = False
self._publish_state(
qr_handler_supported=False,
qr_handler_registered=False,
last_event="qr_api_missing",
)
return
self._qr_handler_supported = True
async def on_qr(client, raw_payload):
qr_payload = self._decode_qr_payload(raw_payload)
if not qr_payload:
return
self._last_qr_payload = qr_payload
self._publish_state(
connected=False,
pair_qr=qr_payload,
warning="Scan QR in WhatsApp Linked Devices.",
last_event="qr_handler",
pair_status="qr_ready",
qr_received_at=int(time.time()),
qr_probe_result="event",
last_error="",
)
try:
self._client.qr(on_qr)
self._qr_handler_registered = True
self._publish_state(
qr_handler_supported=True,
qr_handler_registered=True,
last_event="qr_handler_registered",
)
except Exception as exc:
self._qr_handler_registered = False
self._publish_state(
qr_handler_supported=True,
qr_handler_registered=False,
last_event="qr_handler_registration_failed",
last_error=str(exc),
)
self.log.warning("whatsapp qr handler registration failed: %s", exc)
def _decode_qr_payload(self, raw_payload):
if raw_payload is None:
return ""
if isinstance(raw_payload, memoryview):
raw_payload = raw_payload.tobytes()
if isinstance(raw_payload, bytes):
return raw_payload.decode("utf-8", errors="ignore").strip()
if isinstance(raw_payload, (list, tuple)):
for item in raw_payload:
candidate = self._decode_qr_payload(item)
if candidate:
return candidate
return ""
return str(raw_payload).strip()
def _build_client(self, cls):
# NewAClient first arg is the SQLite filename / DB string.
try:
return cls(self.session_db)
except Exception as exc:
self.log.warning(
"whatsapp client init failed (%s): %s", self.session_db, exc
)
self._publish_state(
last_event="client_init_exception",
last_error=str(exc),
session_db=self.session_db,
)
return None
def _session_has_device(self):
"""Check whatsmeow_device table for a linked device row.
Neonize's Go layer panics (SIGSEGV in GetMe) when connect() is called
on a session with no linked device. Querying the SQLite schema directly
lets us skip connect() and avoid crashing the entire UR process.
"""
if not os.path.exists(self.session_db):
return False
try:
conn = sqlite3.connect(self.session_db)
cursor = conn.cursor()
cursor.execute(
"SELECT COUNT(*) FROM sqlite_master "
"WHERE type='table' AND name='whatsmeow_device'"
)
if cursor.fetchone()[0] == 0:
conn.close()
return False
cursor.execute("SELECT COUNT(*) FROM whatsmeow_device")
count = cursor.fetchone()[0]
conn.close()
return count > 0
except Exception:
return False
def _register_event_handlers(self, wa_events):
connected_ev = getattr(wa_events, "ConnectedEv", None)
message_ev = getattr(wa_events, "MessageEv", None)
receipt_ev = getattr(wa_events, "ReceiptEv", None)
chat_presence_ev = getattr(wa_events, "ChatPresenceEv", None)
presence_ev = getattr(wa_events, "PresenceEv", None)
pair_ev = getattr(wa_events, "PairStatusEv", None)
qr_ev = getattr(wa_events, "QREv", None)
history_sync_ev = getattr(wa_events, "HistorySyncEv", None)
offline_sync_completed_ev = getattr(wa_events, "OfflineSyncCompletedEv", None)
self._register_qr_handler()
support = {
"connected_ev": bool(connected_ev),
"pair_ev": bool(pair_ev),
"qr_ev": bool(qr_ev),
"message_ev": bool(message_ev),
"receipt_ev": bool(receipt_ev),
"history_sync_ev": bool(history_sync_ev),
"offline_sync_completed_ev": bool(offline_sync_completed_ev),
}
event_names = [name for name in dir(wa_events) if name.endswith("Ev")]
message_like_names = [
name
for name in event_names
if "message" in name.lower() or name.lower().startswith("msg")
]
self._publish_state(
event_hook_callable=bool(getattr(self._client, "event", None)),
event_support=support,
event_message_candidates=message_like_names[:20],
last_event="event_handlers_scanned",
)
if connected_ev is not None:
async def on_connected(client, event: connected_ev):
self._connected = True
account = await self._resolve_account_identifier()
self._publish_state(
connected=True,
warning="",
accounts=[account] if account else [self.client_name],
pair_qr="",
last_event="connected",
pair_status="connected",
connected_at=int(time.time()),
last_error="",
)
await self._sync_contacts_from_client()
self._register_event(connected_ev, on_connected)
async def on_message_any(client, event):
await self._handle_message_event(event)
registered_message_events = []
if message_ev is not None:
if self._register_event(message_ev, on_message_any):
registered_message_events.append(
getattr(message_ev, "__name__", str(message_ev))
)
for attr in message_like_names:
candidate = getattr(wa_events, attr, None)
if candidate is None or candidate is message_ev:
continue
if self._register_event(candidate, on_message_any):
registered_message_events.append(attr)
if registered_message_events:
self._publish_state(
registered_message_events=registered_message_events[:20],
last_event="message_events_registered",
)
if receipt_ev is not None:
async def on_receipt(client, event: receipt_ev):
await self._handle_receipt_event(event)
self._register_event(receipt_ev, on_receipt)
if chat_presence_ev is not None:
async def on_chat_presence(client, event: chat_presence_ev):
await self._handle_chat_presence_event(event)
self._register_event(chat_presence_ev, on_chat_presence)
if presence_ev is not None:
async def on_presence(client, event: presence_ev):
await self._handle_presence_event(event)
self._register_event(presence_ev, on_presence)
if pair_ev is not None:
async def on_pair_status(client, event: pair_ev):
qr_payload = self._extract_pair_qr(event)
if qr_payload:
self._last_qr_payload = qr_payload
self._publish_state(
pair_qr=qr_payload,
warning="Scan QR in WhatsApp Linked Devices.",
last_event="pair_status_qr",
pair_status="qr_ready",
qr_received_at=int(time.time()),
qr_probe_result="event",
last_error="",
)
status_raw = self._pluck(event, "Status")
status_text = str(status_raw or "").strip().lower()
if status_text in {"2", "success"}:
account = await self._resolve_account_identifier()
self._connected = True
self._publish_state(
connected=True,
warning="",
accounts=[account] if account else [self.client_name],
pair_qr="",
last_event="pair_status_success",
pair_status="connected",
connected_at=int(time.time()),
last_error="",
)
await self._sync_contacts_from_client()
elif status_text in {"1", "error"}:
error_text = str(self._pluck(event, "Error") or "").strip()
self._publish_state(
warning=error_text or "WhatsApp pairing failed. Retry scan.",
last_event="pair_status_error",
pair_status="error",
last_error=error_text or "unknown_pair_error",
)
self._register_event(pair_ev, on_pair_status)
if qr_ev is not None:
async def on_qr_event(client, event: qr_ev):
# Once connected, ignore late/stale QR emissions so runtime state
# does not regress from connected -> qr_ready.
state = transport.get_runtime_state(self.service)
if self._connected or bool(state.get("connected")):
return
qr_payload = self._extract_pair_qr(event)
if not qr_payload:
return
self._last_qr_payload = qr_payload
self._publish_state(
connected=False,
pair_qr=qr_payload,
warning="Scan QR in WhatsApp Linked Devices.",
last_event="qr_event",
pair_status="qr_ready",
qr_received_at=int(time.time()),
qr_probe_result="event",
last_error="",
)
self._register_event(qr_ev, on_qr_event)
if history_sync_ev is not None:
async def on_history_sync(client, event: history_sync_ev):
await self._handle_history_sync_event(event)
self._register_event(history_sync_ev, on_history_sync)
if offline_sync_completed_ev is not None:
async def on_offline_sync_completed(
client, event: offline_sync_completed_ev
):
self._publish_state(last_event="offline_sync_completed")
await self._sync_contacts_from_client()
self._register_event(offline_sync_completed_ev, on_offline_sync_completed)
def _mark_qr_wait_timeout(self):
state = transport.get_runtime_state(self.service)
if str(state.get("pair_status") or "").strip().lower() != "pending":
return
requested_at = int(state.get("pair_requested_at") or 0)
qr_received_at = int(state.get("qr_received_at") or 0)
if requested_at <= 0 or qr_received_at > 0:
return
now = int(time.time())
age = now - requested_at
# Avoid spamming writes while still surfacing a clear timeout state.
if age < 15 or (age % 10) != 0:
return
self._publish_state(
last_event="pair_waiting_no_qr",
warning=(
"Waiting for WhatsApp QR from Neonize. " "No QR callback received yet."
),
)
async def _maybe_await(self, value):
if asyncio.iscoroutine(value):
return await value
return value
async def _call_client_method(self, method, *args, timeout: float | None = None):
if method is None:
return None
if inspect.iscoroutinefunction(method):
coro = method(*args)
else:
coro = asyncio.to_thread(method, *args)
if timeout and timeout > 0:
return await asyncio.wait_for(coro, timeout=timeout)
return await coro
async def _drain_runtime_commands(self):
# Process a small burst each loop to keep sends responsive but avoid starvation.
for _ in range(5):
command = transport.pop_runtime_command(self.service)
if not command:
return
await self._execute_runtime_command(command)
async def _execute_runtime_command(self, command):
command_id = str((command or {}).get("id") or "").strip()
action = str((command or {}).get("action") or "").strip()
payload = dict((command or {}).get("payload") or {})
if not command_id:
return
self.log.debug(
"whatsapp runtime command start: id=%s action=%s",
command_id,
action,
)
if action == "send_message_raw":
recipient = str(payload.get("recipient") or "").strip()
text = payload.get("text")
attachments = payload.get("attachments") or []
metadata = dict(payload.get("metadata") or {})
send_timeout_s = 18.0
try:
# Include command_id so send_message_raw can observe cancel requests
result = await asyncio.wait_for(
self.send_message_raw(
recipient=recipient,
text=text,
attachments=attachments,
command_id=command_id,
metadata=metadata,
),
timeout=send_timeout_s,
)
if result is not False and result is not None:
transport.set_runtime_command_result(
self.service,
command_id,
{
"ok": True,
"timestamp": int(result)
if isinstance(result, int)
else int(time.time() * 1000),
},
)
self.log.debug(
"whatsapp runtime command ok: id=%s action=%s",
command_id,
action,
)
return
transport.set_runtime_command_result(
self.service,
command_id,
{
"ok": False,
"error": str(
getattr(self, "_last_send_error", "")
or "runtime_send_failed"
),
},
)
self.log.warning(
"whatsapp runtime command failed: id=%s action=%s error=%s",
command_id,
action,
str(getattr(self, "_last_send_error", "") or "runtime_send_failed"),
)
return
except asyncio.TimeoutError:
transport.set_runtime_command_result(
self.service,
command_id,
{
"ok": False,
"error": f"runtime_send_timeout:{int(send_timeout_s)}s",
},
)
self.log.warning(
"whatsapp runtime command timeout: id=%s action=%s timeout=%ss",
command_id,
action,
int(send_timeout_s),
)
return
except Exception as exc:
transport.set_runtime_command_result(
self.service,
command_id,
{
"ok": False,
"error": str(exc),
},
)
self.log.warning(
"whatsapp runtime command exception: id=%s action=%s error=%s",
command_id,
action,
exc,
)
return
if action == "send_reaction":
recipient = str(payload.get("recipient") or "").strip()
emoji = str(payload.get("emoji") or "")
target_message_id = str(payload.get("target_message_id") or "").strip()
target_timestamp = int(payload.get("target_timestamp") or 0)
remove = bool(payload.get("remove"))
try:
ok = await self.send_reaction(
recipient=recipient,
emoji=emoji,
target_message_id=target_message_id,
target_timestamp=target_timestamp,
remove=remove,
)
transport.set_runtime_command_result(
self.service,
command_id,
{
"ok": bool(ok),
"timestamp": int(time.time() * 1000),
"error": "" if ok else "reaction_send_failed",
},
)
return
except Exception as exc:
transport.set_runtime_command_result(
self.service,
command_id,
{
"ok": False,
"error": str(exc),
},
)
return
if action == "force_history_sync":
target_identifier = str(payload.get("identifier") or "").strip()
try:
result = await self._force_history_sync(target_identifier)
except Exception as exc:
transport.set_runtime_command_result(
self.service,
command_id,
{
"ok": False,
"error": str(exc),
},
)
return
transport.set_runtime_command_result(
self.service,
command_id,
{
"ok": True,
**dict(result or {}),
},
)
return
if action == "notify_xmpp_sent":
person_identifier_id = str(
payload.get("person_identifier_id") or ""
).strip()
text = str(payload.get("text") or "")
if not person_identifier_id:
transport.set_runtime_command_result(
self.service,
command_id,
{"ok": False, "error": "missing_person_identifier_id"},
)
return
try:
identifier = await sync_to_async(
lambda: PersonIdentifier.objects.filter(id=person_identifier_id)
.select_related("user", "person")
.first()
)()
if identifier is None:
transport.set_runtime_command_result(
self.service,
command_id,
{"ok": False, "error": "person_identifier_not_found"},
)
return
await self.ur.xmpp.client.send_from_external(
identifier.user,
identifier,
text,
True,
attachments=[],
)
transport.set_runtime_command_result(
self.service,
command_id,
{"ok": True, "timestamp": int(time.time() * 1000)},
)
return
except Exception as exc:
transport.set_runtime_command_result(
self.service,
command_id,
{"ok": False, "error": str(exc)},
)
return
transport.set_runtime_command_result(
self.service,
command_id,
{
"ok": False,
"error": f"unsupported_action:{action or '-'}",
},
)
async def _force_history_sync(self, identifier: str = ""):
started_at = int(time.time())
self._publish_state(
last_event="manual_history_sync_started",
history_sync_running=True,
history_sync_started_at=started_at,
history_sync_target=identifier or "",
)
try:
await self._sync_contacts_from_client()
history_request = await self._request_on_demand_history(
identifier=identifier,
count=120,
)
self._publish_state(
history_on_demand_requested=bool(history_request.get("requested")),
history_on_demand_error=str(history_request.get("error") or ""),
history_on_demand_anchor=str(history_request.get("anchor_key") or ""),
history_on_demand_at=int(time.time()),
)
if history_request.get("requested"):
# Give on-demand history chunks a brief window to arrive before command returns.
await asyncio.sleep(3)
# Best-effort probe: reconnect state and QR/connection probes can unlock
# pending history sync callbacks in some runtime sessions.
await self._after_connect_probe()
sqlite_sync = await self._sync_history_from_sqlite(identifier=identifier)
self._publish_state(
history_sqlite_imported=int(sqlite_sync.get("imported", 0)),
history_sqlite_scanned=int(sqlite_sync.get("scanned", 0)),
history_sqlite_rows=int(sqlite_sync.get("rows", 0)),
history_sqlite_table=str(sqlite_sync.get("table") or ""),
history_sqlite_ts=int(time.time()),
)
finally:
finished_at = int(time.time())
self._publish_state(
history_sync_running=False,
history_sync_finished_at=finished_at,
history_sync_duration_ms=int((finished_at - started_at) * 1000),
last_event="manual_history_sync_finished",
)
state = transport.get_runtime_state(self.service)
return {
"started_at": started_at,
"finished_at": int(
state.get("history_sync_finished_at") or int(time.time())
),
"duration_ms": int(state.get("history_sync_duration_ms") or 0),
"contacts_sync_count": int(state.get("contacts_sync_count") or 0),
"history_imported_messages": int(
state.get("history_imported_messages") or 0
),
"sqlite_imported_messages": int(state.get("history_sqlite_imported") or 0),
"sqlite_scanned_messages": int(state.get("history_sqlite_scanned") or 0),
"sqlite_table": str(state.get("history_sqlite_table") or ""),
"sqlite_error": str(state.get("history_sqlite_error") or ""),
"on_demand_requested": bool(state.get("history_on_demand_requested")),
"on_demand_error": str(state.get("history_on_demand_error") or ""),
"last_event": str(state.get("last_event") or ""),
}
async def _request_on_demand_history(self, identifier: str, count: int = 120):
if not self._client:
return {"requested": False, "error": "client_missing"}
normalized = str(identifier or "").strip()
if not normalized:
return {"requested": False, "error": "identifier_missing"}
state = transport.get_runtime_state(self.service)
anchors = state.get("history_anchors") or {}
anchor = None
anchor_key = ""
for candidate in self._normalize_identifier_candidates(normalized):
row = anchors.get(str(candidate))
if isinstance(row, dict) and row.get("msg_id"):
anchor = row
anchor_key = str(candidate)
break
if not anchor:
anchor = await self._load_history_anchor_from_sqlite(identifier=normalized)
if isinstance(anchor, dict) and anchor.get("msg_id"):
anchor_key = str(anchor.get("anchor_key") or "")
else:
return {"requested": False, "error": "anchor_missing"}
try:
from neonize.builder import build_history_sync_request
from neonize.proto.Neonize_pb2 import MessageInfo, MessageSource
except Exception as exc:
return {"requested": False, "error": f"history_builder_unavailable:{exc}"}
try:
chat_raw = str(anchor.get("chat_jid") or normalized).strip()
sender_raw = str(anchor.get("sender_jid") or chat_raw).strip()
if not chat_raw:
return {"requested": False, "error": "anchor_chat_missing"}
chat_jid = self._to_jid(chat_raw)
sender_jid = self._to_jid(sender_raw)
info = MessageInfo(
MessageSource=MessageSource(
Chat=chat_jid,
Sender=sender_jid,
IsFromMe=bool(anchor.get("from_me")),
IsGroup=False,
),
ID=str(anchor.get("msg_id") or ""),
Timestamp=int(anchor.get("ts") or int(time.time() * 1000)),
)
request_msg = build_history_sync_request(
info, max(10, min(int(count or 120), 500))
)
await self._maybe_await(self._client.send_message(chat_jid, request_msg))
self._publish_state(
last_event="history_on_demand_requested",
last_error="",
)
return {"requested": True, "anchor_key": anchor_key}
except Exception as exc:
self._publish_state(
last_event="history_on_demand_request_failed",
last_error=str(exc),
)
return {"requested": False, "error": str(exc), "anchor_key": anchor_key}
async def _load_history_anchor_from_sqlite(self, identifier: str):
def _read_anchor():
if not self.session_db or not os.path.exists(self.session_db):
return {}
try:
conn = sqlite3.connect(self.session_db)
conn.row_factory = sqlite3.Row
except Exception:
return {}
try:
cur = conn.cursor()
table_rows = cur.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()
table_names = [str(row[0]) for row in table_rows if row and row[0]]
if "whatsmeow_message_secrets" not in table_names:
return {}
candidates = self._normalize_identifier_candidates(identifier)
local_values = set()
for value in candidates:
local = str(value or "").strip().split("@", 1)[0]
if local:
local_values.add(local)
if not local_values:
return {}
placeholders = ",".join("?" for _ in local_values)
query = (
"SELECT our_jid, chat_jid, sender_jid, message_id "
'FROM "whatsmeow_message_secrets" '
f"WHERE substr(chat_jid, 1, instr(chat_jid, '@') - 1) IN ({placeholders}) "
"ORDER BY rowid DESC LIMIT 1"
)
row = cur.execute(query, tuple(local_values)).fetchone()
if not row:
return {}
our_jid = str(row["our_jid"] or "")
own_user = our_jid.split("@", 1)[0].split(":", 1)[0].strip()
sender_jid = str(row["sender_jid"] or "")
sender_user = sender_jid.split("@", 1)[0].split(":", 1)[0].strip()
from_me = bool(own_user and sender_user and own_user == sender_user)
chat_jid = str(row["chat_jid"] or "")
msg_id = str(row["message_id"] or "")
if not chat_jid or not msg_id:
return {}
anchor_key = chat_jid.split("@", 1)[0]
return {
"chat_jid": chat_jid,
"sender_jid": sender_jid,
"msg_id": msg_id,
"from_me": from_me,
"ts": int(time.time() * 1000),
"anchor_key": anchor_key,
}
except Exception:
return {}
finally:
conn.close()
return await asyncio.to_thread(_read_anchor)
async def _sync_history_from_sqlite(self, identifier: str = ""):
def _extract_rows():
if not self.session_db or not os.path.exists(self.session_db):
return {"rows": [], "table": "", "error": "sqlite_missing"}
try:
conn = sqlite3.connect(self.session_db)
conn.row_factory = sqlite3.Row
except Exception as exc:
return {"rows": [], "table": "", "error": f"sqlite_open_failed:{exc}"}
try:
cur = conn.cursor()
table_rows = cur.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()
table_names = [str(row[0]) for row in table_rows if row and row[0]]
preferred = [
"whatsmeow_messages",
"messages",
"message",
]
selected = ""
for candidate in preferred:
if candidate in table_names:
selected = candidate
break
if not selected:
for name in table_names:
lowered = name.lower()
if "message" not in lowered:
continue
if any(
token in lowered
for token in (
"secret",
"contacts",
"contact",
"chat_settings",
"event_buffer",
"identity",
"pre_keys",
"sender_keys",
"session",
"version",
"privacy",
"lid_map",
"device",
"app_state",
)
):
continue
if "contact" not in lowered:
selected = name
break
if not selected:
return {
"rows": [],
"table": "",
"error": "messages_table_not_found",
}
columns = [
str(row[1] or "")
for row in cur.execute(
f'PRAGMA table_info("{selected}")'
).fetchall()
]
if not columns:
return {"rows": [], "table": selected, "error": "no_columns"}
def pick(options):
lowered = {col.lower(): col for col in columns}
for option in options:
if option in lowered:
return lowered[option]
for option in options:
for col in columns:
if option in col.lower():
return col
return ""
text_col = pick(
[
"text",
"conversation",
"body",
"caption",
"content",
"message",
"msg",
]
)
ts_col = pick(
[
"message_timestamp",
"messagetimestamp",
"timestamp",
"ts",
"time",
]
)
from_me_col = pick(
[
"from_me",
"fromme",
"is_from_me",
"isfromme",
"outgoing",
]
)
sender_col = pick(
[
"sender_jid",
"sender",
"participant",
"from_jid",
"from",
]
)
chat_col = pick(
[
"chat_jid",
"remote_jid",
"their_jid",
"jid",
"chat",
]
)
if not (text_col and ts_col and (sender_col or chat_col)):
return {
"rows": [],
"table": selected,
"error": "required_message_columns_missing",
}
select_cols = [
col
for col in [text_col, ts_col, from_me_col, sender_col, chat_col]
if col
]
quoted = ", ".join(f'"{col}"' for col in select_cols)
order_expr = f'"{ts_col}" DESC' if ts_col else "ROWID DESC"
sql = f'SELECT {quoted} FROM "{selected}" ORDER BY {order_expr} LIMIT 12000'
try:
rows = cur.execute(sql).fetchall()
except Exception as exc:
return {
"rows": [],
"table": selected,
"error": f"messages_query_failed:{exc}",
}
parsed = []
for row in rows:
row_map = {col: row[idx] for idx, col in enumerate(select_cols)}
text = str(row_map.get(text_col) or "").strip()
if not text:
continue
raw_sender = (
str(row_map.get(sender_col) or "").strip() if sender_col else ""
)
raw_chat = (
str(row_map.get(chat_col) or "").strip() if chat_col else ""
)
raw_from_me = row_map.get(from_me_col) if from_me_col else None
parsed.append(
{
"text": text,
"ts": row_map.get(ts_col),
"from_me": raw_from_me,
"sender": raw_sender,
"chat": raw_chat,
}
)
return {"rows": parsed, "table": selected, "error": ""}
finally:
conn.close()
extracted = await asyncio.to_thread(_extract_rows)
rows = list(extracted.get("rows") or [])
table_name = str(extracted.get("table") or "")
error_text = str(extracted.get("error") or "").strip()
if error_text:
self._publish_state(
last_event="sqlite_history_scan_failed",
history_sqlite_error=error_text,
)
return {"imported": 0, "scanned": 0, "rows": 0, "table": table_name}
if not rows:
return {"imported": 0, "scanned": 0, "rows": 0, "table": table_name}
target_candidates = self._normalize_identifier_candidates(identifier)
target_local = str(identifier or "").strip().split("@", 1)[0]
if target_local:
target_candidates.update(
self._normalize_identifier_candidates(target_local)
)
identifiers = await sync_to_async(list)(
PersonIdentifier.objects.filter(service="whatsapp")
)
if not identifiers:
return {"imported": 0, "scanned": 0, "rows": len(rows), "table": table_name}
by_candidate = {}
for row in identifiers:
values = self._normalize_identifier_candidates(row.identifier)
for candidate in values:
if not candidate:
continue
by_candidate.setdefault(candidate, []).append(row)
imported = 0
scanned = 0
session_cache = {}
for row in rows:
sender = self._jid_to_identifier(row.get("sender"))
chat = self._jid_to_identifier(row.get("chat"))
row_candidates = self._normalize_identifier_candidates(sender, chat)
if not row_candidates:
continue
if target_candidates and not (row_candidates & target_candidates):
continue
matched_identifiers = {}
for candidate in row_candidates:
for pi in by_candidate.get(candidate, []):
matched_identifiers[int(pi.id)] = pi
if not matched_identifiers:
continue
scanned += 1
ts = self._normalize_timestamp(row.get("ts"))
text = str(row.get("text") or "").strip()
if not text:
continue
from_me_raw = row.get("from_me")
from_me_text = str(from_me_raw or "").strip().lower()
from_me = from_me_text in {"1", "true", "t", "yes", "y"}
sender_uuid = "" if from_me else str(sender or chat or "")
custom_author = "BOT" if from_me else None
for pi in matched_identifiers.values():
session = session_cache.get(int(pi.id))
if session is None:
session = await history.get_chat_session(pi.user, pi)
session_cache[int(pi.id)] = session
exists = await sync_to_async(
Message.objects.filter(
user=pi.user,
session=session,
ts=ts,
sender_uuid=sender_uuid,
text=text,
custom_author=custom_author,
).exists
)()
if exists:
continue
await sync_to_async(Message.objects.create)(
user=pi.user,
session=session,
ts=ts,
sender_uuid=sender_uuid,
text=text,
custom_author=custom_author,
delivered_ts=ts,
)
imported += 1
self.log.info(
"whatsapp sqlite history sync: table=%s scanned=%s imported=%s rows=%s target=%s",
table_name or "-",
scanned,
imported,
len(rows),
identifier or "-",
)
return {
"imported": imported,
"scanned": scanned,
"rows": len(rows),
"table": table_name,
}
async def _resolve_account_identifier(self):
if self._client is None:
return ""
if not hasattr(self._client, "get_me"):
return self.client_name
# Guard: Neonize Go GetMe() panics (SIGSEGV) when cli.LID is nil
# on unlinked sessions. Only safe to call after pairing completes
# and the device row exists in the whatsmeow_device table.
if not self._session_has_device():
return self.client_name
try:
me = await self._maybe_await(self._client.get_me())
except Exception:
return self.client_name
# Support both dict-like and object-like payloads.
for path in (
("JID",),
("jid",),
):
value = self._jid_to_identifier(self._pluck(me, *path))
if value:
return value
for path in (
("JID", "User"),
("jid",),
("user",),
("ID",),
):
value = self._pluck(me, *path)
if value:
return str(value)
if hasattr(self._client, "get_all_devices"):
try:
devices = await self._maybe_await(self._client.get_all_devices())
if devices:
jid = self._jid_to_identifier(self._pluck(devices[0], "JID"))
if jid:
return jid
except Exception:
pass
return self.client_name
def _pluck(self, obj, *path):
current = obj
for key in path:
if current is None:
return None
if isinstance(current, dict):
current = current.get(key)
continue
if hasattr(current, key):
current = getattr(current, key)
continue
return None
return current
def _shape_keys(self, obj):
if obj is None:
return []
if isinstance(obj, dict):
return sorted(str(key) for key in obj.keys())
if hasattr(obj, "__dict__"):
return sorted(str(key) for key in vars(obj).keys())
return []
def _normalize_timestamp(self, raw_value):
if raw_value is None:
return int(time.time() * 1000)
try:
value = int(raw_value)
except Exception:
return int(time.time() * 1000)
# WhatsApp libs often emit seconds. Promote to ms.
if value < 10**12:
return value * 1000
return value
def _normalize_identifier_candidates(self, *values):
out = set()
state = transport.get_runtime_state(self.service)
lid_map = state.get("lid_map") or {}
for value in values:
raw = self._jid_to_identifier(value)
if not raw:
continue
out.add(raw)
if "@" in raw:
out.add(raw.split("@", 1)[0])
digits = re.sub(r"[^0-9]", "", raw)
if digits:
out.add(digits)
if not digits.startswith("+"):
out.add(f"+{digits}")
if "@lid" in raw:
mapped = re.sub(r"[^0-9]", "", str(lid_map.get(digits) or ""))
if mapped:
out.add(mapped)
out.add(f"+{mapped}")
out.add(f"{mapped}@s.whatsapp.net")
return out
async def _sync_contacts_from_client(self):
if self._client is None:
return
connected_now = await self._is_contact_sync_ready()
if not connected_now:
self._publish_state(
last_event="contacts_sync_skipped_disconnected",
contacts_source="disconnected",
)
return
# NOTE: Neonize get_all_contacts has crashed some runtime builds with a Go panic.
# Read contact-like rows directly from the session sqlite DB instead.
contacts, source, lid_map = await self._sync_contacts_from_sqlite()
groups, groups_source = await self._sync_groups_from_client()
await self._upsert_groups(groups)
now_ts = int(time.time())
if contacts:
self.log.debug(
"whatsapp contacts synced: count=%s source=%s",
len(contacts),
source or "unknown",
)
self._publish_state(
contacts=contacts,
lid_map=lid_map,
contacts_synced_at=now_ts,
contacts_sync_count=len(contacts),
last_event="contacts_synced",
contacts_source=source or "unknown",
last_error="",
)
else:
self.log.debug("whatsapp contacts sync empty (%s)", source or "unknown")
self._publish_state(
last_event="contacts_sync_empty",
contacts_source=source or "unknown",
)
if groups_source:
event_name = "groups_synced" if groups else "groups_sync_empty"
self._publish_state(
groups=groups,
groups_source=groups_source,
groups_sync_count=len(groups),
groups_synced_at=now_ts,
last_event=event_name,
last_error="" if groups else "",
)
async def _sync_contacts_from_sqlite(self):
def _extract():
if not self.session_db or not os.path.exists(self.session_db):
return [], "sqlite_missing", {}
try:
conn = sqlite3.connect(self.session_db)
conn.row_factory = sqlite3.Row
except Exception:
return [], "sqlite_open_failed", {}
try:
cur = conn.cursor()
table_rows = cur.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()
table_names = [str(row[0]) for row in table_rows if row and row[0]]
lid_map = {}
if "whatsmeow_lid_map" in table_names:
try:
lid_rows = cur.execute(
'SELECT "lid", "pn" FROM "whatsmeow_lid_map" LIMIT 10000'
).fetchall()
except Exception:
lid_rows = []
for lid, pn in lid_rows:
lid_key = re.sub(r"[^0-9]", "", str(lid or "").strip())
pn_value = re.sub(r"[^0-9]", "", str(pn or "").strip())
if lid_key and pn_value:
lid_map[lid_key] = pn_value
# Prefer the canonical contacts table when available.
if "whatsmeow_contacts" in table_names:
own_ids = set()
for value in (
transport.get_runtime_state(self.service).get("accounts") or []
):
base = str(value or "").strip().split("@", 1)[0]
base = base.split(":", 1)[0]
if base:
own_ids.add(base.lower())
try:
for row in cur.execute(
'SELECT DISTINCT "our_jid" FROM "whatsmeow_contacts"'
).fetchall():
base = (
str((row or [None])[0] or "").strip().split("@", 1)[0]
)
base = base.split(":", 1)[0]
if base:
own_ids.add(base.lower())
except Exception:
pass
out = []
seen = set()
try:
rows = cur.execute(
'SELECT "their_jid", "first_name", "full_name", "push_name", "business_name" '
'FROM "whatsmeow_contacts" LIMIT 5000'
).fetchall()
except Exception:
rows = []
for (
their_jid,
first_name,
full_name,
push_name,
business_name,
) in rows:
jid_value = str(their_jid or "").strip()
if (
"@s.whatsapp.net" not in jid_value
and "@lid" not in jid_value
):
continue
identifier = jid_value.split("@", 1)[0].strip().split(":", 1)[0]
if "@lid" in jid_value:
mapped = lid_map.get(re.sub(r"[^0-9]", "", identifier))
if mapped:
identifier = mapped
jid_value = f"{mapped}@s.whatsapp.net"
if not identifier:
continue
if identifier.lower() in own_ids:
continue
if identifier in seen:
continue
seen.add(identifier)
display_name = (
str(push_name or "").strip()
or str(full_name or "").strip()
or str(business_name or "").strip()
or str(first_name or "").strip()
)
out.append(
{
"identifier": identifier,
"jid": jid_value,
"name": display_name,
"chat": "",
"seen_at": int(time.time()),
}
)
if len(out) >= 500:
break
if out:
return out, "sqlite_contacts", lid_map
account_keys = {
str(value or "").strip().split("@", 1)[0].lower()
for value in (
transport.get_runtime_state(self.service).get("accounts") or []
)
if str(value or "").strip()
}
seen = set()
out = []
for table in table_names:
try:
columns = [
str(row[1] or "")
for row in cur.execute(
f'PRAGMA table_info("{table}")'
).fetchall()
]
except Exception:
continue
if not columns:
continue
jid_cols = [
col
for col in columns
if any(
token in col.lower()
for token in (
"jid",
"phone",
"chat",
"sender",
"recipient",
"participant",
"from",
"to",
"user",
)
)
]
name_cols = [
col
for col in columns
if "name" in col.lower() or "push" in col.lower()
]
if not jid_cols:
continue
select_cols = list(dict.fromkeys(jid_cols + name_cols))[:6]
quoted = ", ".join(f'"{col}"' for col in select_cols)
try:
rows = cur.execute(
f'SELECT {quoted} FROM "{table}" LIMIT 1000'
).fetchall()
except Exception:
continue
for row in rows:
row_map = {col: row[idx] for idx, col in enumerate(select_cols)}
jid_value = ""
for col in jid_cols:
raw = str(row_map.get(col) or "").strip()
if "@s.whatsapp.net" in raw:
m = re.search(r"([0-9]{6,20})@s\.whatsapp\.net", raw)
jid_value = (
f"{m.group(1)}@s.whatsapp.net"
if m
else raw.split()[0]
)
break
if raw.endswith("@lid"):
jid_value = raw
break
digits = re.sub(r"[^0-9]", "", raw)
if digits:
jid_value = f"{digits}@s.whatsapp.net"
break
if not jid_value:
continue
identifier = jid_value.split("@", 1)[0].strip()
if not identifier:
continue
if identifier.lower() in account_keys:
continue
if identifier in seen:
continue
seen.add(identifier)
display_name = ""
for col in name_cols:
candidate = str(row_map.get(col) or "").strip()
if candidate and candidate not in {"~", "-", "_"}:
display_name = candidate
break
out.append(
{
"identifier": identifier,
"jid": jid_value,
"name": display_name,
"chat": "",
"seen_at": int(time.time()),
}
)
if len(out) >= 500:
return out, "sqlite_tables", lid_map
return out, "sqlite_tables", lid_map
finally:
conn.close()
return await asyncio.to_thread(_extract)
async def _upsert_groups(self, groups: list) -> None:
if not groups:
self.log.debug("[WA] _upsert_groups: no groups to persist")
return
identifiers = await sync_to_async(list)(
PersonIdentifier.objects.filter(service="whatsapp").select_related("user")
)
seen_user_ids: set = set()
users = []
for pi in identifiers:
if pi.user_id not in seen_user_ids:
seen_user_ids.add(pi.user_id)
users.append(pi.user)
if not users:
self.log.debug("[WA] _upsert_groups: no PersonIdentifiers found — skipping")
return
upserted = 0
for user in users:
for group in groups:
identifier = group.get("identifier") or ""
name = group.get("name") or identifier
jid = group.get("jid") or ""
if "@newsletter" in jid or "@newsletter" in identifier:
continue
await sync_to_async(PlatformChatLink.objects.update_or_create)(
user=user,
service="whatsapp",
chat_identifier=identifier,
defaults={
"person": None,
"person_identifier": None,
"is_group": True,
"chat_name": name,
"chat_jid": jid,
},
)
upserted += 1
self.log.info(
"[WA] upserted %d group rows (%d groups × %d users)",
upserted,
len(groups),
len(users),
)
async def _sync_groups_from_client(self):
if self._client is None:
return [], "client_missing"
getter = getattr(self._client, "get_joined_groups", None)
if getter is None:
return [], "get_joined_groups_missing"
try:
group_rows = await self._maybe_await(getter())
except Exception as exc:
self._publish_state(
last_event="groups_sync_failed",
last_error=str(exc),
)
return [], "get_joined_groups_failed"
out = []
now_ts = int(time.time())
for group in group_rows or []:
jid_value = self._jid_to_identifier(
self._pluck(group, "JID") or self._pluck(group, "jid")
)
identifier = (
jid_value.split("@", 1)[0].strip() if jid_value else ""
)
if not identifier:
continue
name = (
str(self._pluck(group, "GroupName", "Name") or "").strip()
or str(self._pluck(group, "GroupTopic", "Topic") or "").strip()
or identifier
)
out.append(
{
"identifier": identifier,
"jid": jid_value or f"{identifier}@g.us",
"name": name,
"chat": name,
"type": "group",
"seen_at": now_ts,
}
)
if len(out) >= 500:
break
return out, "get_joined_groups"
async def _is_contact_sync_ready(self) -> bool:
if self._client is None:
return False
if self._connected:
return True
state = transport.get_runtime_state(self.service)
if bool(state.get("connected")):
return True
pair_status = str(state.get("pair_status") or "").strip().lower()
if pair_status == "connected":
return True
check_connected = getattr(self._client, "is_connected", None)
if check_connected is None:
return False
try:
value = (
await self._maybe_await(check_connected())
if callable(check_connected)
else await self._maybe_await(check_connected)
)
except Exception:
return False
if value:
self._connected = True
self._publish_state(connected=True, warning="", pair_status="connected")
return True
# Guard: Neonize Go GetMe() panics (SIGSEGV) on unlinked sessions.
if hasattr(self._client, "get_me") and self._session_has_device():
try:
me = await self._maybe_await(self._client.get_me())
if me:
self._connected = True
self._publish_state(
connected=True, warning="", pair_status="connected"
)
return True
except Exception:
pass
return False
async def _handle_history_sync_event(self, event):
started_at = time.time()
self._publish_state(
history_sync_running=True,
history_sync_started_at=int(started_at),
last_event="history_sync_started",
)
data = self._pluck(event, "Data") or self._pluck(event, "data")
if data is None:
self._publish_state(
history_sync_running=False,
history_sync_duration_ms=int((time.time() - started_at) * 1000),
)
return
pushname_rows = (
self._pluck(data, "pushnames") or self._pluck(data, "Pushnames") or []
)
pushname_map = {}
for row in pushname_rows:
raw_id = self._jid_to_identifier(
self._pluck(row, "ID")
) or self._jid_to_identifier(self._pluck(row, "id"))
if not raw_id:
continue
pushname = str(
self._pluck(row, "pushname") or self._pluck(row, "Pushname") or ""
).strip()
if not pushname:
continue
pushname_map[raw_id] = pushname
pushname_map[raw_id.split("@", 1)[0]] = pushname
conversation_rows = (
self._pluck(data, "conversations")
or self._pluck(data, "Conversations")
or []
)
found = 0
imported_messages = 0
for row in conversation_rows:
jid = ""
for candidate in (
self._pluck(row, "ID"),
self._pluck(row, "id"),
self._pluck(row, "pnJID"),
self._pluck(row, "pnJid"),
self._pluck(row, "newJID"),
self._pluck(row, "newJid"),
self._pluck(row, "oldJID"),
self._pluck(row, "oldJid"),
):
parsed = self._jid_to_identifier(candidate)
if parsed:
jid = parsed
break
if not jid or "@g.us" in jid or "@broadcast" in jid:
continue
identifier = jid.split("@", 1)[0].strip()
if not identifier or not re.search(r"[0-9]{6,}", identifier):
continue
name = str(
self._pluck(row, "displayName")
or self._pluck(row, "DisplayName")
or self._pluck(row, "name")
or self._pluck(row, "Name")
or self._pluck(row, "username")
or self._pluck(row, "Username")
or pushname_map.get(jid)
or pushname_map.get(identifier)
or ""
).strip()
self._remember_contact(identifier, jid=jid, name=name)
found += 1
imported_messages += await self._import_history_messages_for_conversation(
row=row,
chat_jid=jid,
)
if found:
state = transport.get_runtime_state(self.service)
current_count = int(state.get("contacts_sync_count") or 0)
self._publish_state(
contacts_source="history_sync",
contacts_synced_at=int(time.time()),
contacts_sync_count=max(current_count, found),
last_event="history_sync_contacts",
last_error="",
)
self._publish_state(
history_sync_running=False,
history_sync_finished_at=int(time.time()),
history_sync_duration_ms=int((time.time() - started_at) * 1000),
history_imported_messages=imported_messages,
last_event="history_sync_completed",
)
def _history_message_rows(self, conversation_row):
candidates = (
self._pluck(conversation_row, "messages"),
self._pluck(conversation_row, "Messages"),
self._pluck(conversation_row, "msgs"),
self._pluck(conversation_row, "Msgs"),
)
for value in candidates:
if isinstance(value, (list, tuple)):
return list(value)
return []
def _history_message_text(self, msg):
return str(
self._pluck(msg, "message", "conversation")
or self._pluck(msg, "message", "Conversation")
or self._pluck(msg, "message", "extendedTextMessage", "text")
or self._pluck(msg, "message", "ExtendedTextMessage", "Text")
or self._pluck(msg, "message", "imageMessage", "caption")
or self._pluck(msg, "message", "videoMessage", "caption")
or self._pluck(msg, "message", "documentMessage", "caption")
or self._pluck(msg, "Message", "conversation")
or self._pluck(msg, "Message", "Conversation")
or self._pluck(msg, "Message", "extendedTextMessage", "text")
or self._pluck(msg, "Message", "ExtendedTextMessage", "Text")
or self._pluck(msg, "conversation")
or self._pluck(msg, "Conversation")
or self._pluck(msg, "text")
or self._pluck(msg, "Text")
or ""
).strip()
def _message_text(self, msg_obj, event_obj=None):
"""
Extract user-visible text from diverse WhatsApp message payload shapes.
"""
candidates = (
self._pluck(msg_obj, "conversation"),
self._pluck(msg_obj, "Conversation"),
self._pluck(msg_obj, "extendedTextMessage", "text"),
self._pluck(msg_obj, "ExtendedTextMessage", "Text"),
self._pluck(msg_obj, "extended_text_message", "text"),
self._pluck(msg_obj, "imageMessage", "caption"),
self._pluck(msg_obj, "videoMessage", "caption"),
self._pluck(msg_obj, "documentMessage", "caption"),
self._pluck(msg_obj, "ephemeralMessage", "message", "conversation"),
self._pluck(
msg_obj, "ephemeralMessage", "message", "extendedTextMessage", "text"
),
self._pluck(msg_obj, "viewOnceMessage", "message", "conversation"),
self._pluck(
msg_obj, "viewOnceMessage", "message", "extendedTextMessage", "text"
),
self._pluck(msg_obj, "viewOnceMessageV2", "message", "conversation"),
self._pluck(
msg_obj, "viewOnceMessageV2", "message", "extendedTextMessage", "text"
),
self._pluck(
msg_obj, "viewOnceMessageV2Extension", "message", "conversation"
),
self._pluck(
msg_obj,
"viewOnceMessageV2Extension",
"message",
"extendedTextMessage",
"text",
),
self._pluck(event_obj, "message", "conversation"),
self._pluck(event_obj, "message", "extendedTextMessage", "text"),
self._pluck(event_obj, "Message", "conversation"),
self._pluck(event_obj, "Message", "extendedTextMessage", "text"),
self._pluck(event_obj, "conversation"),
self._pluck(event_obj, "text"),
)
for value in candidates:
text = str(value or "").strip()
if text:
return text
return ""
def _history_message_ts(self, msg):
raw_ts = (
self._pluck(msg, "messageTimestamp")
or self._pluck(msg, "MessageTimestamp")
or self._pluck(msg, "timestamp")
or self._pluck(msg, "Timestamp")
or self._pluck(msg, "message", "messageTimestamp")
or self._pluck(msg, "Message", "messageTimestamp")
or int(time.time() * 1000)
)
return self._normalize_timestamp(raw_ts)
def _history_message_from_me(self, msg):
return bool(
self._pluck(msg, "key", "fromMe")
or self._pluck(msg, "Key", "FromMe")
or self._pluck(msg, "messageKey", "fromMe")
or self._pluck(msg, "MessageKey", "FromMe")
or self._pluck(msg, "fromMe")
or self._pluck(msg, "FromMe")
)
def _history_message_sender_jid(self, msg, fallback_chat_jid: str):
sender = self._jid_to_identifier(
self._pluck(msg, "key", "participant")
or self._pluck(msg, "Key", "Participant")
or self._pluck(msg, "messageKey", "participant")
or self._pluck(msg, "MessageKey", "Participant")
or self._pluck(msg, "participant")
or self._pluck(msg, "Participant")
or fallback_chat_jid
)
return str(sender or fallback_chat_jid or "").strip()
async def _import_history_messages_for_conversation(
self, row, chat_jid: str
) -> int:
imported = 0
msg_rows = self._history_message_rows(row)
if not msg_rows:
return imported
chat_identifier = str(chat_jid or "").split("@", 1)[0].strip()
if not chat_identifier:
return imported
candidate_values = self._normalize_identifier_candidates(
chat_jid, chat_identifier
)
if not candidate_values:
return imported
identifiers = await sync_to_async(list)(
PersonIdentifier.objects.filter(
service="whatsapp",
identifier__in=list(candidate_values),
)
)
if not identifiers:
return imported
for msg in msg_rows:
ts = self._history_message_ts(msg)
text = self._history_message_text(msg)
from_me = self._history_message_from_me(msg)
sender_jid = self._history_message_sender_jid(msg, chat_jid)
for identifier in identifiers:
session = await history.get_chat_session(identifier.user, identifier)
exists = await sync_to_async(
Message.objects.filter(
user=identifier.user,
session=session,
ts=ts,
sender_uuid=(sender_jid if not from_me else ""),
text=text,
custom_author=("BOT" if from_me else None),
).exists
)()
if exists:
continue
await sync_to_async(Message.objects.create)(
user=identifier.user,
session=session,
ts=ts,
sender_uuid=(sender_jid if not from_me else ""),
text=text,
custom_author=("BOT" if from_me else None),
delivered_ts=ts,
)
imported += 1
return imported
def _remember_contact(self, identifier, *, jid="", name="", chat=""):
cleaned = str(identifier or "").strip()
if "@" in cleaned:
cleaned = cleaned.split("@", 1)[0]
if not cleaned:
return
state = transport.get_runtime_state(self.service)
existing = state.get("contacts") or []
rows = [item for item in existing if isinstance(item, dict)]
merged = []
seen = set()
now_ts = int(time.time())
row = {
"identifier": cleaned,
"jid": str(jid or "").strip(),
"name": str(name or "").strip(),
"chat": str(chat or "").strip(),
"seen_at": now_ts,
}
merged.append(row)
seen.add(cleaned)
for item in rows:
candidate = str(item.get("identifier") or item.get("jid") or "").strip()
if not candidate or candidate in seen:
continue
seen.add(candidate)
merged.append(item)
if len(merged) >= 500:
break
self._publish_state(contacts=merged, last_contact_seen_at=now_ts)
def _remember_history_anchor(
self,
*,
identifier: str,
chat_jid: str,
msg_id: str,
ts: int,
from_me: bool,
sender_jid: str = "",
):
if not msg_id:
return
candidate_keys = self._normalize_identifier_candidates(identifier, chat_jid)
if not candidate_keys:
return
state = transport.get_runtime_state(self.service)
anchors = dict(state.get("history_anchors") or {})
row = {
"msg_id": str(msg_id),
"ts": int(ts or int(time.time() * 1000)),
"from_me": bool(from_me),
"chat_jid": str(chat_jid or ""),
"sender_jid": str(sender_jid or ""),
"updated_at": int(time.time()),
}
for key in candidate_keys:
anchors[str(key)] = row
if len(anchors) > 400:
recent = sorted(
anchors.items(),
key=lambda item: int((item[1] or {}).get("updated_at") or 0),
reverse=True,
)[:300]
anchors = {k: v for k, v in recent}
self._publish_state(history_anchors=anchors)
def _jid_to_identifier(self, value):
raw = str(value or "").strip()
if not raw:
return ""
if not isinstance(value, str):
user = self._pluck(value, "User") or self._pluck(value, "user")
server = self._pluck(value, "Server") or self._pluck(value, "server")
if user and server:
return f"{user}@{server}"
if user:
return str(user)
return raw
def _is_media_message(self, message_obj):
media_fields = (
"imageMessage",
"videoMessage",
"audioMessage",
"documentMessage",
"stickerMessage",
"image_message",
"video_message",
"audio_message",
"document_message",
"sticker_message",
)
for field in media_fields:
value = self._pluck(message_obj, field)
if value:
return True
return False
def _infer_media_content_type(self, message_obj):
if self._pluck(message_obj, "imageMessage") or self._pluck(
message_obj, "image_message"
):
return "image/jpeg"
if self._pluck(message_obj, "videoMessage") or self._pluck(
message_obj, "video_message"
):
return "video/mp4"
if self._pluck(message_obj, "audioMessage") or self._pluck(
message_obj, "audio_message"
):
return "audio/ogg"
if self._pluck(message_obj, "stickerMessage") or self._pluck(
message_obj, "sticker_message"
):
return "image/webp"
return "application/octet-stream"
def _extract_reaction_event(self, message_obj):
node = self._pluck(message_obj, "reactionMessage") or self._pluck(
message_obj, "reaction_message"
)
if not node:
return None
emoji = str(
self._pluck(node, "text") or self._pluck(node, "emoji") or ""
).strip()
target_msg_id = str(
self._pluck(node, "key", "id")
or self._pluck(node, "key", "ID")
or self._pluck(node, "targetMessageKey", "id")
or self._pluck(node, "target_message_key", "id")
or ""
).strip()
remove = bool(not emoji)
if not target_msg_id:
return None
return {
"emoji": emoji,
"target_message_id": target_msg_id,
"remove": remove,
}
async def _download_event_media(self, event):
if not self._client:
return []
msg_obj = self._pluck(event, "message") or self._pluck(event, "Message")
if msg_obj is None or not self._is_media_message(msg_obj):
return []
if not hasattr(self._client, "download_any"):
return []
try:
payload = await self._maybe_await(self._client.download_any(msg_obj))
except Exception as exc:
self.log.warning("whatsapp media download failed: %s", exc)
return []
if isinstance(payload, memoryview):
payload = payload.tobytes()
if not isinstance(payload, (bytes, bytearray)):
return []
filename = self._pluck(msg_obj, "documentMessage", "fileName") or self._pluck(
msg_obj, "document_message", "file_name"
)
content_type = (
self._pluck(msg_obj, "documentMessage", "mimetype")
or self._pluck(msg_obj, "document_message", "mimetype")
or self._pluck(msg_obj, "imageMessage", "mimetype")
or self._pluck(msg_obj, "image_message", "mimetype")
or self._pluck(msg_obj, "videoMessage", "mimetype")
or self._pluck(msg_obj, "video_message", "mimetype")
or self._pluck(msg_obj, "audioMessage", "mimetype")
or self._pluck(msg_obj, "audio_message", "mimetype")
or self._infer_media_content_type(msg_obj)
)
if not filename:
ext = mimetypes.guess_extension(
str(content_type or "").split(";", 1)[0].strip().lower()
)
filename = f"wa-{int(time.time())}{ext or '.bin'}"
blob_key = media_bridge.put_blob(
service="whatsapp",
content=bytes(payload),
filename=filename,
content_type=content_type,
)
if not blob_key:
return []
return [
{
"blob_key": blob_key,
"filename": filename,
"content_type": content_type,
"size": len(payload),
}
]
async def _handle_message_event(self, event):
msg_obj = self._pluck(event, "message") or self._pluck(event, "Message")
text = self._message_text(msg_obj, event)
if not text:
self.log.debug(
"whatsapp empty-text event shape: msg_keys=%s event_keys=%s type=%s",
self._shape_keys(msg_obj),
self._shape_keys(event),
str(type(event).__name__),
)
source = (
self._pluck(event, "Info", "MessageSource")
or self._pluck(event, "info", "message_source")
or self._pluck(event, "info", "messageSource")
)
is_from_me = bool(
self._pluck(source, "IsFromMe") or self._pluck(source, "isFromMe")
)
sender = self._jid_to_identifier(
self._pluck(source, "Sender")
or self._pluck(source, "sender")
or self._pluck(source, "SenderAlt")
or self._pluck(source, "senderAlt")
)
chat = self._jid_to_identifier(
self._pluck(source, "Chat") or self._pluck(source, "chat")
)
raw_ts = (
self._pluck(event, "Info", "Timestamp")
or self._pluck(event, "info", "timestamp")
or self._pluck(event, "info", "message_timestamp")
or self._pluck(event, "Timestamp")
or self._pluck(event, "timestamp")
)
msg_id = str(
self._pluck(event, "Info", "ID")
or self._pluck(event, "info", "id")
or self._pluck(event, "ID")
or self._pluck(event, "id")
or ""
).strip()
ts = self._normalize_timestamp(raw_ts)
reaction_payload = self._extract_reaction_event(msg_obj)
if reaction_payload:
self.log.debug(
"reaction-bridge whatsapp-inbound msg_id=%s target_id=%s emoji=%s remove=%s sender=%s chat=%s",
msg_id or "-",
str(reaction_payload.get("target_message_id") or "") or "-",
str(reaction_payload.get("emoji") or "") or "-",
bool(reaction_payload.get("remove")),
sender or "-",
chat or "-",
)
identifier_values = self._normalize_identifier_candidates(sender, chat)
if not identifier_values:
self.log.warning(
"reaction-bridge whatsapp-identifiers-miss sender=%s chat=%s",
sender or "-",
chat or "-",
)
return
identifiers = await sync_to_async(list)(
PersonIdentifier.objects.filter(
service="whatsapp",
identifier__in=list(identifier_values),
)
)
for identifier in identifiers:
try:
await self.ur.xmpp.client.apply_external_reaction(
identifier.user,
identifier,
source_service="whatsapp",
emoji=str(reaction_payload.get("emoji") or ""),
remove=bool(reaction_payload.get("remove")),
upstream_message_id=str(
reaction_payload.get("target_message_id") or ""
),
upstream_ts=0,
actor=(sender or chat or ""),
payload={
"event": "reaction",
"message_id": msg_id,
},
)
except Exception as exc:
self.log.warning("whatsapp reaction relay to XMPP failed: %s", exc)
return
self._remember_contact(
sender or chat,
jid=sender,
chat=chat,
)
self._remember_history_anchor(
identifier=(chat or sender or ""),
chat_jid=str(chat or sender or ""),
msg_id=msg_id,
ts=ts,
from_me=is_from_me,
sender_jid=str(sender or ""),
)
identifier_values = self._normalize_identifier_candidates(sender, chat)
if not identifier_values:
return
identifiers = await sync_to_async(list)(
PersonIdentifier.objects.filter(
service="whatsapp",
identifier__in=list(identifier_values),
)
)
if not identifiers:
return
attachments = await self._download_event_media(event)
xmpp_attachments = []
if attachments:
fetched = await asyncio.gather(
*[transport.fetch_attachment(self.service, att) for att in attachments]
)
xmpp_attachments = [row for row in fetched if row]
payload = {
"sender": str(sender or ""),
"chat": str(chat or ""),
"raw_event": str(type(event).__name__),
}
for identifier in identifiers:
uploaded_urls = await self.ur.xmpp.client.send_from_external(
identifier.user,
identifier,
text,
is_outgoing_message=is_from_me,
attachments=xmpp_attachments,
source_ref={
"upstream_message_id": str(msg_id or ""),
"upstream_author": str(sender or chat or ""),
"upstream_ts": int(ts or 0),
},
)
display_text = text
if (not display_text) and uploaded_urls:
display_text = "\n".join(uploaded_urls)
if (not display_text) and attachments:
media_urls = [
self._blob_key_to_compose_url((att or {}).get("blob_key"))
for att in attachments
]
media_urls = [url for url in media_urls if url]
if media_urls:
display_text = "\n".join(media_urls)
session = await history.get_chat_session(identifier.user, identifier)
duplicate_exists = await sync_to_async(
Message.objects.filter(
user=identifier.user,
session=session,
ts=ts,
sender_uuid=str(sender or chat or ""),
text=display_text,
).exists
)()
if duplicate_exists:
continue
await history.store_message(
session=session,
sender=str(sender or chat or ""),
text=display_text,
ts=ts,
outgoing=is_from_me,
)
await self.ur.message_received(
self.service,
identifier=identifier,
text=display_text,
ts=ts,
payload=payload,
)
async def _handle_receipt_event(self, event):
source = (
self._pluck(event, "MessageSource")
or self._pluck(event, "info", "message_source")
or self._pluck(event, "info", "messageSource")
)
sender = self._jid_to_identifier(
self._pluck(source, "Sender") or self._pluck(source, "sender")
)
chat = self._jid_to_identifier(
self._pluck(source, "Chat") or self._pluck(source, "chat")
)
timestamps = []
raw_ids = (
self._pluck(event, "MessageIDs") or self._pluck(event, "message_ids") or []
)
if isinstance(raw_ids, (list, tuple, set)):
for item in raw_ids:
try:
value = int(item)
timestamps.append(value * 1000 if value < 10**12 else value)
except Exception:
continue
read_ts = self._normalize_timestamp(
self._pluck(event, "Timestamp")
or self._pluck(event, "timestamp")
or int(time.time() * 1000)
)
receipt_type = str(self._pluck(event, "Type") or "").strip()
self._remember_contact(
sender or chat,
jid=sender,
chat=chat,
)
for candidate in self._normalize_identifier_candidates(sender, chat):
await self.ur.message_read(
self.service,
identifier=candidate,
message_timestamps=timestamps,
read_ts=read_ts,
read_by=sender or chat,
payload={
"event": "receipt",
"type": receipt_type,
"sender": str(sender),
"chat": str(chat),
},
)
async def _handle_chat_presence_event(self, event):
source = (
self._pluck(event, "MessageSource")
or self._pluck(event, "message_source")
or {}
)
sender = self._jid_to_identifier(
self._pluck(source, "Sender") or self._pluck(source, "sender")
)
chat = self._jid_to_identifier(
self._pluck(source, "Chat") or self._pluck(source, "chat")
)
state = self._pluck(event, "State") or self._pluck(event, "state")
state_text = str(state or "").strip().lower()
is_typing = state_text in {"1", "composing", "chat_presence_composing"}
self._remember_contact(
sender or chat,
jid=sender,
chat=chat,
)
for candidate in self._normalize_identifier_candidates(sender, chat):
if is_typing:
await self.ur.started_typing(
self.service,
identifier=candidate,
payload={
"presence": state_text,
"sender": str(sender),
"chat": str(chat),
},
)
else:
await self.ur.stopped_typing(
self.service,
identifier=candidate,
payload={
"presence": state_text,
"sender": str(sender),
"chat": str(chat),
},
)
async def _handle_presence_event(self, event):
sender = self._jid_to_identifier(
self._pluck(event, "From", "User") or self._pluck(event, "from", "user")
)
is_unavailable = bool(
self._pluck(event, "Unavailable") or self._pluck(event, "unavailable")
)
self._remember_contact(sender, jid=sender)
for candidate in self._normalize_identifier_candidates(sender):
if is_unavailable:
await self.ur.stopped_typing(
self.service,
identifier=candidate,
payload={"presence": "offline", "sender": str(sender)},
)
def _extract_pair_qr(self, event):
codes = self._pluck(event, "Codes") or self._pluck(event, "codes") or []
decoded_codes = self._decode_qr_payload(codes)
if decoded_codes:
return decoded_codes
for path in (
("qr",),
("qr_code",),
("code",),
("pair_code",),
("pairCode",),
("url",),
):
value = self._pluck(event, *path)
if value:
return str(value)
return ""
def _to_jid(self, recipient):
raw = str(recipient or "").strip()
if not raw:
return ""
if "@" in raw:
return raw
digits = re.sub(r"[^0-9]", "", raw)
if digits:
# Prefer direct JID formatting for phone numbers; Neonize build_jid
# can trigger a usync lookup path that intermittently times out.
return f"{digits}@s.whatsapp.net"
if self._build_jid is not None:
try:
return self._build_jid(raw)
except Exception:
pass
return raw
def _blob_key_to_compose_url(self, blob_key):
key = str(blob_key or "").strip()
if not key:
return ""
return f"/compose/media/blob/?key={quote_plus(key)}"
async def _fetch_attachment_payload(self, attachment):
blob_key = (attachment or {}).get("blob_key")
if blob_key:
row = media_bridge.get_blob(blob_key)
if row:
return row
content = (attachment or {}).get("content")
if isinstance(content, memoryview):
content = content.tobytes()
if isinstance(content, bytes):
return {
"content": content,
"filename": (attachment or {}).get("filename") or "attachment.bin",
"content_type": (attachment or {}).get("content_type")
or "application/octet-stream",
"size": len(content),
}
url = (attachment or {}).get("url")
if url:
timeout = aiohttp.ClientTimeout(total=20)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url) as response:
if response.status != 200:
return None
payload = await response.read()
return {
"content": payload,
"filename": (attachment or {}).get("filename")
or url.rstrip("/").split("/")[-1]
or "attachment.bin",
"content_type": (attachment or {}).get("content_type")
or response.headers.get(
"Content-Type", "application/octet-stream"
),
"size": len(payload),
}
return None
async def send_message_raw(
self,
recipient,
text=None,
attachments=None,
command_id: str | None = None,
metadata: dict | None = None,
):
self._last_send_error = ""
if not self._client:
self._last_send_error = "client_missing"
return False
jid_str = self._to_jid(recipient)
if not jid_str:
self._last_send_error = "recipient_invalid"
return False
# Prefer direct JID string for sends to avoid Neonize usync/device-list
# lookups that can stall on some runtime sessions.
jid = jid_str
jid_obj = None
try:
if self._build_jid is not None:
maybe_jid = None
if "@" in jid_str:
local_part, server_part = jid_str.split("@", 1)
try:
maybe_jid = self._build_jid(local_part, server_part)
except TypeError:
maybe_jid = self._build_jid(jid_str)
else:
maybe_jid = self._build_jid(jid_str)
if hasattr(maybe_jid, "SerializeToString"):
jid_obj = maybe_jid
else:
self.log.warning(
"whatsapp build_jid returned non-JID object, falling back to string: type=%s repr=%s",
type(maybe_jid).__name__,
repr(maybe_jid)[:100],
)
except Exception as exc:
self.log.warning(
"whatsapp failed to build JID from %s, falling back to string: %s",
jid_str,
exc,
)
if not self._connected and hasattr(self._client, "connect"):
try:
await self._maybe_await(self._client.connect())
self._connected = True
self._publish_state(
connected=True,
last_event="send_reconnect_ok",
warning="",
last_error="",
)
except Exception as exc:
self._last_send_error = f"reconnect_failed:{exc}"
self._publish_state(
connected=False,
last_event="send_reconnect_failed",
last_error=str(exc),
warning=f"WhatsApp reconnect before send failed: {exc}",
)
sent_any = False
sent_ts = 0
metadata = dict(metadata or {})
xmpp_source_id = str(metadata.get("xmpp_source_id") or "").strip()
legacy_message_id = str(metadata.get("legacy_message_id") or "").strip()
person_identifier = None
if xmpp_source_id:
candidates = list(self._normalize_identifier_candidates(recipient, jid_str))
if candidates:
person_identifier = await sync_to_async(
lambda: PersonIdentifier.objects.filter(
service="whatsapp",
identifier__in=candidates,
)
.select_related("user", "person")
.first()
)()
def _extract_response_message_id(response):
return str(
self._pluck(response, "ID")
or self._pluck(response, "id")
or self._pluck(response, "Info", "ID")
or self._pluck(response, "info", "id")
or ""
).strip()
def _record_bridge(response, ts_value, body_hint=""):
if not xmpp_source_id or person_identifier is None:
return
transport.record_bridge_mapping(
user_id=person_identifier.user_id,
person_id=person_identifier.person_id,
service="whatsapp",
xmpp_message_id=xmpp_source_id,
xmpp_ts=int(metadata.get("xmpp_source_ts") or 0),
upstream_message_id=_extract_response_message_id(response),
upstream_ts=int(ts_value or 0),
text_preview=str(body_hint or metadata.get("xmpp_body") or ""),
local_message_id=legacy_message_id,
)
for attachment in attachments or []:
payload = await self._fetch_attachment_payload(attachment)
if not payload:
continue
mime = str(
payload.get("content_type") or "application/octet-stream"
).lower()
data = payload.get("content") or b""
filename = payload.get("filename") or "attachment.bin"
attachment_target = jid_obj if jid_obj is not None else jid
send_method = "document"
if mime.startswith("image/") and hasattr(self._client, "send_image"):
send_method = "image"
elif mime.startswith("video/") and hasattr(self._client, "send_video"):
send_method = "video"
elif mime.startswith("audio/") and hasattr(self._client, "send_audio"):
send_method = "audio"
if getattr(settings, "WHATSAPP_DEBUG", False):
self.log.debug(
"whatsapp media send prep: method=%s mime=%s filename=%s size=%s",
send_method,
mime,
filename,
len(data) if isinstance(data, (bytes, bytearray)) else 0,
)
try:
if mime.startswith("image/") and hasattr(self._client, "send_image"):
response = await self._maybe_await(
self._client.send_image(attachment_target, data, caption="")
)
elif mime.startswith("video/") and hasattr(self._client, "send_video"):
response = await self._maybe_await(
self._client.send_video(attachment_target, data, caption="")
)
elif mime.startswith("audio/") and hasattr(self._client, "send_audio"):
response = await self._maybe_await(
self._client.send_audio(attachment_target, data)
)
elif hasattr(self._client, "send_document"):
response = await self._maybe_await(
self._client.send_document(
attachment_target,
data,
filename=filename,
mimetype=mime,
caption="",
)
)
else:
response = None
sent_ts = max(
sent_ts,
self._normalize_timestamp(self._pluck(response, "Timestamp") or 0),
)
_record_bridge(response, sent_ts, body_hint=filename)
sent_any = True
if getattr(settings, "WHATSAPP_DEBUG", False):
self.log.debug(
"whatsapp media send ok: method=%s filename=%s ts=%s",
send_method,
filename,
self._normalize_timestamp(
self._pluck(response, "Timestamp") or 0
),
)
except Exception as exc:
self.log.warning("whatsapp attachment send failed: %s", exc)
if text:
response = None
last_error = None
# Prepare cancel key (if caller provided command_id)
cancel_key = None
try:
if command_id:
cancel_key = transport._runtime_command_cancel_key(
self.service, str(command_id)
)
except Exception:
cancel_key = None
for attempt in range(2):
# Check for a cancellation marker set by transport.cancel_runtime_command
try:
if cancel_key and cache.get(cancel_key):
self.log.info("whatsapp send cancelled via cancel marker")
self._last_send_error = "cancelled"
return False
except Exception:
pass
try:
send_target = jid_obj if jid_obj is not None else jid
# Log what we're about to send for debugging
if getattr(settings, "WHATSAPP_DEBUG", False):
self.log.debug(
f"send_message attempt {attempt+1}: target_type={type(send_target).__name__} text_type={type(text).__name__} text_len={len(text)}"
)
response = await self._call_client_method(
getattr(self._client, "send_message", None),
send_target,
text,
timeout=9.0,
)
sent_any = True
last_error = None
break
except Exception as exc:
if getattr(settings, "WHATSAPP_DEBUG", False):
self.log.debug(
f"send_message attempt {attempt+1} failed: {type(exc).__name__}: {exc}"
)
last_error = exc
error_text = (
f"{type(last_error).__name__}:{repr(last_error)}"
if last_error is not None
else ""
).lower()
is_transient = (
"usync" in error_text
or "timed out" in error_text
or "timeout" in error_text
or "device list" in error_text
or "serializetostring" in error_text
or not error_text.strip()
)
if is_transient and attempt < 1:
# If runtime rejected string target, try to build protobuf JID for retry.
if (
jid_obj is None
and self._build_jid is not None
and "@" in jid_str
):
local_part, server_part = jid_str.split("@", 1)
try:
maybe_retry_jid = self._build_jid(local_part, server_part)
except TypeError:
maybe_retry_jid = self._build_jid(jid_str)
except Exception:
maybe_retry_jid = None
if hasattr(maybe_retry_jid, "SerializeToString"):
jid_obj = maybe_retry_jid
if hasattr(self._client, "connect"):
try:
await self._maybe_await(self._client.connect())
self._connected = True
self._publish_state(
connected=True,
last_event="send_retry_reconnect_ok",
warning="",
)
except Exception as reconnect_exc:
self._publish_state(
connected=False,
last_event="send_retry_reconnect_failed",
last_error=str(reconnect_exc),
)
# Sleep but wake earlier if cancelled: poll small intervals
# Increase backoff time for device list queries
total_sleep = 0.8 * (attempt + 1)
slept = 0.0
while slept < total_sleep:
try:
if cancel_key and cache.get(cancel_key):
self.log.info(
"whatsapp send cancelled during retry backoff"
)
self._last_send_error = "cancelled"
return False
except Exception:
pass
await asyncio.sleep(0.2)
slept += 0.2
continue
break
if last_error is not None and not sent_any:
self.log.warning("whatsapp text send failed: %s", last_error)
self._last_send_error = (
f"text_send_failed:{type(last_error).__name__}:{repr(last_error)}"
)
return False
sent_ts = max(
sent_ts,
self._normalize_timestamp(self._pluck(response, "Timestamp") or 0),
)
_record_bridge(response, sent_ts, body_hint=str(text or ""))
if not sent_any:
self._last_send_error = "no_payload_sent"
return False
self._last_send_error = ""
return sent_ts or int(time.time() * 1000)
async def start_typing(self, identifier):
if not self._client:
return False
jid = self._to_jid(identifier)
if not jid:
return False
if (
hasattr(self._client, "send_chat_presence")
and self._chat_presence is not None
and self._chat_presence_media is not None
):
try:
await self._maybe_await(
self._client.send_chat_presence(
jid,
self._chat_presence.CHAT_PRESENCE_COMPOSING,
self._chat_presence_media.CHAT_PRESENCE_MEDIA_TEXT,
)
)
return True
except Exception:
pass
if hasattr(self._client, "set_chat_presence"):
try:
await self._maybe_await(
self._client.set_chat_presence(jid, "composing")
)
return True
except Exception:
pass
return False
async def stop_typing(self, identifier):
if not self._client:
return False
jid = self._to_jid(identifier)
if not jid:
return False
if (
hasattr(self._client, "send_chat_presence")
and self._chat_presence is not None
and self._chat_presence_media is not None
):
try:
await self._maybe_await(
self._client.send_chat_presence(
jid,
self._chat_presence.CHAT_PRESENCE_PAUSED,
self._chat_presence_media.CHAT_PRESENCE_MEDIA_TEXT,
)
)
return True
except Exception:
pass
if hasattr(self._client, "set_chat_presence"):
try:
await self._maybe_await(self._client.set_chat_presence(jid, "paused"))
return True
except Exception:
pass
return False
async def send_reaction(
self,
recipient,
*,
emoji,
target_message_id="",
target_timestamp=0,
remove=False,
):
if not self._client:
return False
jid = self._to_jid(recipient)
if not jid:
return False
target_id = str(target_message_id or "").strip()
if not target_id:
return False
reaction_emoji = "" if remove else str(emoji or "").strip()
candidate_names = (
"send_reaction",
"react",
"send_message_reaction",
"reaction",
)
self.log.debug(
"reaction-bridge whatsapp-send start recipient=%s target_id=%s emoji=%s remove=%s",
recipient,
target_id,
reaction_emoji or "-",
bool(remove),
)
for method_name in candidate_names:
method = getattr(self._client, method_name, None)
if method is None:
continue
attempts = [
(jid, target_id, reaction_emoji),
(jid, target_id, reaction_emoji, bool(remove)),
(jid, reaction_emoji, target_id),
]
for args in attempts:
try:
response = await self._call_client_method(
method, *args, timeout=9.0
)
if response is not None:
self.log.debug(
"reaction-bridge whatsapp-send ok method=%s args_len=%s",
method_name,
len(args),
)
return True
except Exception as exc:
self.log.debug(
"reaction-bridge whatsapp-send miss method=%s args_len=%s error=%s",
method_name,
len(args),
exc,
)
continue
self.log.warning(
"reaction-bridge whatsapp-send failed recipient=%s target_id=%s",
recipient,
target_id,
)
return False
async def fetch_attachment(self, attachment_ref):
blob_key = (attachment_ref or {}).get("blob_key")
if blob_key:
return media_bridge.get_blob(blob_key)
return None
def get_link_qr_png(self, device_name):
_ = (device_name or "").strip()
if not self._last_qr_payload:
return None
try:
return transport._as_qr_png(self._last_qr_payload)
except Exception:
return None
async def send_message_to_contact(self, contact_jid: str, text: str) -> bool:
"""Send a text message to a WhatsApp contact."""
try:
jid = self._to_jid(contact_jid)
if not jid or self._client is None:
return False
# neonize.send_message() accepts either a Message protobuf or a plain string
# If passing a string, it auto-converts to Message(conversation=text)
response = self._client.send_message(jid, text)
return response is not None
except Exception as e:
self.log.error(f"Failed to send WhatsApp message: {e}")
return False
# If you need to send a Message object explicitly:
async def send_structured_message(self, contact_jid: str, message: Message) -> bool:
"""Send a structured Message protobuf to a WhatsApp contact."""
try:
jid = self._to_jid(contact_jid)
if not jid or self._client is None:
return False
response = self._client.send_message(jid, message)
return response is not None
except Exception as e:
self.log.error(f"Failed to send structured WhatsApp message: {e}")
return False