3164 lines
122 KiB
Python
3164 lines
122 KiB
Python
import asyncio
|
|
import inspect
|
|
import logging
|
|
import mimetypes
|
|
import os
|
|
import re
|
|
import sqlite3
|
|
import time
|
|
from urllib.parse import quote_plus
|
|
|
|
import aiohttp
|
|
from asgiref.sync import sync_to_async
|
|
from django.conf import settings
|
|
from django.core.cache import cache
|
|
|
|
from core.clients import ClientBase, transport
|
|
from core.messaging import history, media_bridge
|
|
from core.models import Message, PersonIdentifier
|
|
|
|
|
|
class WhatsAppClient(ClientBase):
|
|
"""
|
|
Async WhatsApp transport backed by Neonize.
|
|
|
|
Design notes:
|
|
- Runs in UR process.
|
|
- Publishes runtime state to shared cache via transport.
|
|
- Degrades gracefully when Neonize/session is unavailable.
|
|
"""
|
|
|
|
def __init__(self, ur, loop, service="whatsapp"):
|
|
super().__init__(ur, loop, service)
|
|
self._task = None
|
|
self._stopping = False
|
|
self._client = None
|
|
self._build_jid = None
|
|
self._connected = False
|
|
self._last_qr_payload = ""
|
|
self._accounts = []
|
|
self._chat_presence = None
|
|
self._chat_presence_media = None
|
|
self._last_pair_request = 0
|
|
self._next_qr_probe_at = 0.0
|
|
self._qr_handler_registered = False
|
|
self._qr_handler_supported = False
|
|
self._event_hook_callable = False
|
|
self._last_send_error = ""
|
|
|
|
self.enabled = bool(
|
|
str(getattr(settings, "WHATSAPP_ENABLED", "false")).lower()
|
|
in {"1", "true", "yes", "on"}
|
|
)
|
|
self.client_name = (
|
|
str(getattr(settings, "WHATSAPP_CLIENT_NAME", "gia_whatsapp")).strip()
|
|
or "gia_whatsapp"
|
|
)
|
|
self.database_url = str(getattr(settings, "WHATSAPP_DATABASE_URL", "")).strip()
|
|
safe_name = re.sub(r"[^a-zA-Z0-9_.-]+", "_", self.client_name) or "gia_whatsapp"
|
|
# Use a persistent default path (under project mount) instead of /tmp so
|
|
# link state and contact cache survive container restarts.
|
|
default_db_dir = str(
|
|
getattr(settings, "WHATSAPP_DB_DIR", "/var/tmp/whatsapp")
|
|
).strip()
|
|
self.session_db = self.database_url or os.path.join(
|
|
default_db_dir,
|
|
f"{safe_name}_neonize_v2.db",
|
|
)
|
|
|
|
transport.register_runtime_client(self.service, self)
|
|
prior_state = transport.get_runtime_state(self.service)
|
|
prior_accounts = prior_state.get("accounts")
|
|
if not isinstance(prior_accounts, list):
|
|
prior_accounts = []
|
|
self._publish_state(
|
|
connected=False,
|
|
warning=(
|
|
"WhatsApp runtime is disabled by settings." if not self.enabled else ""
|
|
),
|
|
accounts=prior_accounts,
|
|
last_event="init",
|
|
session_db=self.session_db,
|
|
)
|
|
# Reduce third-party library logging noise (neonize/grpc/protobuf).
|
|
try:
|
|
# Common noisy libraries used by Neonize/WhatsApp stacks
|
|
logging.getLogger("neonize").setLevel(logging.WARNING)
|
|
logging.getLogger("grpc").setLevel(logging.WARNING)
|
|
logging.getLogger("google").setLevel(logging.WARNING)
|
|
logging.getLogger("protobuf").setLevel(logging.WARNING)
|
|
logging.getLogger("whatsmeow").setLevel(logging.WARNING)
|
|
logging.getLogger("whatsmeow.Client").setLevel(logging.WARNING)
|
|
logging.getLogger("aiohttp.access").setLevel(logging.WARNING)
|
|
except Exception:
|
|
pass
|
|
|
|
def _publish_state(self, **updates):
|
|
state = transport.update_runtime_state(self.service, **updates)
|
|
accounts = state.get("accounts")
|
|
if isinstance(accounts, list):
|
|
self._accounts = accounts
|
|
|
|
def start(self):
|
|
if not self.enabled:
|
|
self.log.info("whatsapp client disabled by settings")
|
|
return
|
|
if self._task is None:
|
|
self.log.info("whatsapp neonize client starting")
|
|
self._task = self.loop.create_task(self._run())
|
|
|
|
async def _run(self):
|
|
try:
|
|
import neonize.aioze.client as wa_client_mod
|
|
from neonize.aioze import events as wa_events
|
|
from neonize.aioze.client import NewAClient
|
|
|
|
try:
|
|
from neonize.utils.enum import ChatPresence, ChatPresenceMedia
|
|
except Exception:
|
|
ChatPresence = None
|
|
ChatPresenceMedia = None
|
|
try:
|
|
from neonize.utils import build_jid as wa_build_jid
|
|
except Exception:
|
|
wa_build_jid = None
|
|
except Exception as exc:
|
|
self._publish_state(
|
|
connected=False,
|
|
warning=f"Neonize not available: {exc}",
|
|
accounts=[],
|
|
last_event="neonize_import_failed",
|
|
last_error=str(exc),
|
|
)
|
|
self.log.warning("whatsapp neonize import failed: %s", exc)
|
|
return
|
|
|
|
# Neonize async module ships with its own global event loop object.
|
|
# In this runtime we already have a live asyncio loop; bind Neonize's
|
|
# globals to it so QR/pair callbacks and connect tasks actually execute.
|
|
try:
|
|
wa_events.event_global_loop = self.loop
|
|
wa_client_mod.event_global_loop = self.loop
|
|
self._publish_state(
|
|
neonize_loop_bound=True,
|
|
neonize_loop_type=str(type(self.loop).__name__),
|
|
last_event="neonize_loop_bound",
|
|
)
|
|
except Exception as exc:
|
|
self._publish_state(
|
|
neonize_loop_bound=False,
|
|
last_event="neonize_loop_bind_failed",
|
|
last_error=str(exc),
|
|
)
|
|
self.log.warning("failed binding neonize loop: %s", exc)
|
|
|
|
self._build_jid = wa_build_jid
|
|
self._chat_presence = ChatPresence
|
|
self._chat_presence_media = ChatPresenceMedia
|
|
try:
|
|
db_dir = os.path.dirname(self.session_db)
|
|
if db_dir:
|
|
os.makedirs(db_dir, exist_ok=True)
|
|
if db_dir and not os.access(db_dir, os.W_OK):
|
|
raise PermissionError(
|
|
f"session db directory is not writable: {db_dir}"
|
|
)
|
|
except Exception as exc:
|
|
self._publish_state(
|
|
connected=False,
|
|
warning=f"WhatsApp DB path setup failed: {exc}",
|
|
last_event="db_path_setup_failed",
|
|
last_error=str(exc),
|
|
)
|
|
self.log.warning("whatsapp db path setup failed: %s", exc)
|
|
return
|
|
self._client = self._build_client(NewAClient)
|
|
if self._client is None:
|
|
self._publish_state(
|
|
connected=False,
|
|
warning="Failed to initialize Neonize client.",
|
|
accounts=[],
|
|
last_event="client_init_failed",
|
|
last_error="client_none",
|
|
)
|
|
return
|
|
|
|
if not self._session_has_device():
|
|
self._publish_state(
|
|
connected=False,
|
|
warning="No linked WhatsApp device. Waiting for QR pairing.",
|
|
accounts=[],
|
|
last_event="no_linked_device",
|
|
pair_status="needs_pairing",
|
|
)
|
|
self.log.info(
|
|
"whatsapp session db has no linked device — connect will "
|
|
"start QR pairing flow"
|
|
)
|
|
|
|
self._register_event_handlers(wa_events)
|
|
|
|
try:
|
|
await self._maybe_await(self._client.connect())
|
|
await self._after_connect_probe()
|
|
except asyncio.CancelledError:
|
|
raise
|
|
except Exception as exc:
|
|
self._publish_state(
|
|
connected=False,
|
|
warning=f"WhatsApp connect failed: {exc}",
|
|
accounts=[],
|
|
last_event="connect_failed",
|
|
last_error=str(exc),
|
|
)
|
|
self.log.warning("whatsapp connect failed: %s", exc)
|
|
return
|
|
|
|
# Keep task alive so state/callbacks remain active.
|
|
next_heartbeat_at = 0.0
|
|
next_contacts_sync_at = 0.0
|
|
while not self._stopping:
|
|
now = time.time()
|
|
if now >= next_heartbeat_at:
|
|
self._publish_state(runtime_seen_at=int(now))
|
|
next_heartbeat_at = now + 5.0
|
|
await self._drain_runtime_commands()
|
|
if now >= next_contacts_sync_at:
|
|
await self._sync_contacts_from_client()
|
|
next_contacts_sync_at = now + 30.0
|
|
self._mark_qr_wait_timeout()
|
|
await self._sync_pair_request()
|
|
await self._probe_pending_qr(now)
|
|
await asyncio.sleep(1)
|
|
|
|
async def _sync_pair_request(self):
|
|
state = transport.get_runtime_state(self.service)
|
|
requested_at = int(state.get("pair_requested_at") or 0)
|
|
if requested_at <= 0 or requested_at <= self._last_pair_request:
|
|
return
|
|
self._last_pair_request = requested_at
|
|
self._publish_state(
|
|
connected=False,
|
|
pair_qr="",
|
|
warning="Waiting for WhatsApp QR from Neonize.",
|
|
last_event="pair_request_seen",
|
|
pair_status="pending",
|
|
last_error="",
|
|
pair_reconnect_attempted_at=int(time.time()),
|
|
)
|
|
self._next_qr_probe_at = time.time()
|
|
|
|
if self._client is None:
|
|
return
|
|
|
|
try:
|
|
if hasattr(self._client, "disconnect"):
|
|
await self._maybe_await(self._client.disconnect())
|
|
except Exception as exc:
|
|
self._publish_state(
|
|
last_event="pair_disconnect_failed",
|
|
last_error=str(exc),
|
|
)
|
|
self.log.warning("whatsapp disconnect before pairing failed: %s", exc)
|
|
|
|
try:
|
|
await self._maybe_await(self._client.connect())
|
|
self._publish_state(
|
|
last_event="pair_refresh_connected",
|
|
pair_refresh_connected_at=int(time.time()),
|
|
)
|
|
await self._after_connect_probe()
|
|
except Exception as exc:
|
|
self._publish_state(
|
|
connected=False,
|
|
warning=f"WhatsApp pairing refresh failed: {exc}",
|
|
last_event="pair_refresh_failed",
|
|
last_error=str(exc),
|
|
)
|
|
self.log.warning("whatsapp pairing refresh failed: %s", exc)
|
|
|
|
async def _probe_pending_qr(self, now_ts: float):
|
|
state = transport.get_runtime_state(self.service)
|
|
status = str(state.get("pair_status") or "").strip().lower()
|
|
if status != "pending":
|
|
return
|
|
if str(state.get("pair_qr") or "").strip():
|
|
return
|
|
if now_ts < float(self._next_qr_probe_at or 0.0):
|
|
return
|
|
await self._after_connect_probe()
|
|
latest = transport.get_runtime_state(self.service)
|
|
if str(latest.get("pair_qr") or "").strip():
|
|
self._next_qr_probe_at = now_ts + 30.0
|
|
return
|
|
error_text = str(latest.get("last_error") or "").strip().lower()
|
|
# Neonize may report "client is nil" for a few seconds after connect.
|
|
if "client is nil" in error_text:
|
|
self._next_qr_probe_at = now_ts + 2.0
|
|
return
|
|
self._next_qr_probe_at = now_ts + 5.0
|
|
|
|
async def _after_connect_probe(self):
|
|
if self._client is None:
|
|
return
|
|
now_ts = int(time.time())
|
|
try:
|
|
check_connected = getattr(self._client, "is_connected", None)
|
|
if check_connected is not None:
|
|
connected_value = (
|
|
await self._maybe_await(check_connected())
|
|
if callable(check_connected)
|
|
else await self._maybe_await(check_connected)
|
|
)
|
|
if connected_value:
|
|
self._connected = True
|
|
account = await self._resolve_account_identifier()
|
|
account_list = [account] if account else list(self._accounts or [])
|
|
if not account_list:
|
|
account_list = [self.client_name]
|
|
self._publish_state(
|
|
connected=True,
|
|
accounts=account_list,
|
|
pair_status="connected",
|
|
last_event="connected_probe",
|
|
connected_at=now_ts,
|
|
)
|
|
except Exception as exc:
|
|
self._publish_state(
|
|
last_event="connected_probe_failed",
|
|
last_error=str(exc),
|
|
)
|
|
|
|
if self._connected:
|
|
await self._sync_contacts_from_client()
|
|
|
|
# Neonize does not always emit QR callbacks after reconnect. Try explicit
|
|
# QR-link fetch when available to surface pair data to the UI.
|
|
try:
|
|
if hasattr(self._client, "get_contact_qr_link"):
|
|
qr_link = await self._maybe_await(self._client.get_contact_qr_link())
|
|
qr_payload = self._decode_qr_payload(qr_link)
|
|
self._publish_state(last_qr_probe_at=now_ts)
|
|
if qr_payload:
|
|
self._last_qr_payload = qr_payload
|
|
self._publish_state(
|
|
connected=False,
|
|
pair_qr=qr_payload,
|
|
warning="Scan QR in WhatsApp Linked Devices.",
|
|
last_event="qr_probe_success",
|
|
pair_status="qr_ready",
|
|
qr_received_at=now_ts,
|
|
qr_probe_result="ok",
|
|
last_error="",
|
|
)
|
|
else:
|
|
self._publish_state(
|
|
last_event="qr_probe_empty",
|
|
qr_probe_result="empty",
|
|
)
|
|
except Exception as exc:
|
|
self._publish_state(
|
|
last_event="qr_probe_failed",
|
|
qr_probe_result="error",
|
|
last_error=str(exc),
|
|
last_qr_probe_at=now_ts,
|
|
)
|
|
|
|
def _register_event(self, event_cls, callback):
|
|
if event_cls is None:
|
|
return False
|
|
if self._client is None:
|
|
return False
|
|
event_hook = getattr(self._client, "event", None)
|
|
if not callable(event_hook):
|
|
self._event_hook_callable = False
|
|
return False
|
|
self._event_hook_callable = True
|
|
try:
|
|
decorator = event_hook(event_cls)
|
|
decorator(callback)
|
|
return True
|
|
except Exception as exc:
|
|
self.log.warning(
|
|
"whatsapp event registration failed (%s): %s",
|
|
getattr(event_cls, "__name__", str(event_cls)),
|
|
exc,
|
|
)
|
|
self._publish_state(
|
|
last_event="event_registration_failed",
|
|
last_error=str(exc),
|
|
)
|
|
return False
|
|
|
|
def _register_qr_handler(self):
|
|
if self._client is None:
|
|
self._qr_handler_supported = False
|
|
self._qr_handler_registered = False
|
|
self._publish_state(
|
|
qr_handler_supported=False,
|
|
qr_handler_registered=False,
|
|
)
|
|
return
|
|
if not hasattr(self._client, "qr"):
|
|
self._qr_handler_supported = False
|
|
self._qr_handler_registered = False
|
|
self._publish_state(
|
|
qr_handler_supported=False,
|
|
qr_handler_registered=False,
|
|
last_event="qr_api_missing",
|
|
)
|
|
return
|
|
self._qr_handler_supported = True
|
|
|
|
async def on_qr(client, raw_payload):
|
|
qr_payload = self._decode_qr_payload(raw_payload)
|
|
if not qr_payload:
|
|
return
|
|
self._last_qr_payload = qr_payload
|
|
self._publish_state(
|
|
connected=False,
|
|
pair_qr=qr_payload,
|
|
warning="Scan QR in WhatsApp Linked Devices.",
|
|
last_event="qr_handler",
|
|
pair_status="qr_ready",
|
|
qr_received_at=int(time.time()),
|
|
qr_probe_result="event",
|
|
last_error="",
|
|
)
|
|
|
|
try:
|
|
self._client.qr(on_qr)
|
|
self._qr_handler_registered = True
|
|
self._publish_state(
|
|
qr_handler_supported=True,
|
|
qr_handler_registered=True,
|
|
last_event="qr_handler_registered",
|
|
)
|
|
except Exception as exc:
|
|
self._qr_handler_registered = False
|
|
self._publish_state(
|
|
qr_handler_supported=True,
|
|
qr_handler_registered=False,
|
|
last_event="qr_handler_registration_failed",
|
|
last_error=str(exc),
|
|
)
|
|
self.log.warning("whatsapp qr handler registration failed: %s", exc)
|
|
|
|
def _decode_qr_payload(self, raw_payload):
|
|
if raw_payload is None:
|
|
return ""
|
|
if isinstance(raw_payload, memoryview):
|
|
raw_payload = raw_payload.tobytes()
|
|
if isinstance(raw_payload, bytes):
|
|
return raw_payload.decode("utf-8", errors="ignore").strip()
|
|
if isinstance(raw_payload, (list, tuple)):
|
|
for item in raw_payload:
|
|
candidate = self._decode_qr_payload(item)
|
|
if candidate:
|
|
return candidate
|
|
return ""
|
|
return str(raw_payload).strip()
|
|
|
|
def _build_client(self, cls):
|
|
# NewAClient first arg is the SQLite filename / DB string.
|
|
try:
|
|
return cls(self.session_db)
|
|
except Exception as exc:
|
|
self.log.warning(
|
|
"whatsapp client init failed (%s): %s", self.session_db, exc
|
|
)
|
|
self._publish_state(
|
|
last_event="client_init_exception",
|
|
last_error=str(exc),
|
|
session_db=self.session_db,
|
|
)
|
|
return None
|
|
|
|
def _session_has_device(self):
|
|
"""Check whatsmeow_device table for a linked device row.
|
|
|
|
Neonize's Go layer panics (SIGSEGV in GetMe) when connect() is called
|
|
on a session with no linked device. Querying the SQLite schema directly
|
|
lets us skip connect() and avoid crashing the entire UR process.
|
|
"""
|
|
if not os.path.exists(self.session_db):
|
|
return False
|
|
try:
|
|
conn = sqlite3.connect(self.session_db)
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT COUNT(*) FROM sqlite_master "
|
|
"WHERE type='table' AND name='whatsmeow_device'"
|
|
)
|
|
if cursor.fetchone()[0] == 0:
|
|
conn.close()
|
|
return False
|
|
cursor.execute("SELECT COUNT(*) FROM whatsmeow_device")
|
|
count = cursor.fetchone()[0]
|
|
conn.close()
|
|
return count > 0
|
|
except Exception:
|
|
return False
|
|
|
|
def _register_event_handlers(self, wa_events):
|
|
connected_ev = getattr(wa_events, "ConnectedEv", None)
|
|
message_ev = getattr(wa_events, "MessageEv", None)
|
|
receipt_ev = getattr(wa_events, "ReceiptEv", None)
|
|
chat_presence_ev = getattr(wa_events, "ChatPresenceEv", None)
|
|
presence_ev = getattr(wa_events, "PresenceEv", None)
|
|
pair_ev = getattr(wa_events, "PairStatusEv", None)
|
|
qr_ev = getattr(wa_events, "QREv", None)
|
|
history_sync_ev = getattr(wa_events, "HistorySyncEv", None)
|
|
offline_sync_completed_ev = getattr(wa_events, "OfflineSyncCompletedEv", None)
|
|
|
|
self._register_qr_handler()
|
|
support = {
|
|
"connected_ev": bool(connected_ev),
|
|
"pair_ev": bool(pair_ev),
|
|
"qr_ev": bool(qr_ev),
|
|
"message_ev": bool(message_ev),
|
|
"receipt_ev": bool(receipt_ev),
|
|
"history_sync_ev": bool(history_sync_ev),
|
|
"offline_sync_completed_ev": bool(offline_sync_completed_ev),
|
|
}
|
|
event_names = [name for name in dir(wa_events) if name.endswith("Ev")]
|
|
message_like_names = [
|
|
name
|
|
for name in event_names
|
|
if "message" in name.lower() or name.lower().startswith("msg")
|
|
]
|
|
self._publish_state(
|
|
event_hook_callable=bool(getattr(self._client, "event", None)),
|
|
event_support=support,
|
|
event_message_candidates=message_like_names[:20],
|
|
last_event="event_handlers_scanned",
|
|
)
|
|
|
|
if connected_ev is not None:
|
|
|
|
async def on_connected(client, event: connected_ev):
|
|
self._connected = True
|
|
account = await self._resolve_account_identifier()
|
|
self._publish_state(
|
|
connected=True,
|
|
warning="",
|
|
accounts=[account] if account else [self.client_name],
|
|
pair_qr="",
|
|
last_event="connected",
|
|
pair_status="connected",
|
|
connected_at=int(time.time()),
|
|
last_error="",
|
|
)
|
|
await self._sync_contacts_from_client()
|
|
|
|
self._register_event(connected_ev, on_connected)
|
|
|
|
async def on_message_any(client, event):
|
|
await self._handle_message_event(event)
|
|
|
|
registered_message_events = []
|
|
if message_ev is not None:
|
|
if self._register_event(message_ev, on_message_any):
|
|
registered_message_events.append(
|
|
getattr(message_ev, "__name__", str(message_ev))
|
|
)
|
|
for attr in message_like_names:
|
|
candidate = getattr(wa_events, attr, None)
|
|
if candidate is None or candidate is message_ev:
|
|
continue
|
|
if self._register_event(candidate, on_message_any):
|
|
registered_message_events.append(attr)
|
|
if registered_message_events:
|
|
self._publish_state(
|
|
registered_message_events=registered_message_events[:20],
|
|
last_event="message_events_registered",
|
|
)
|
|
|
|
if receipt_ev is not None:
|
|
|
|
async def on_receipt(client, event: receipt_ev):
|
|
await self._handle_receipt_event(event)
|
|
|
|
self._register_event(receipt_ev, on_receipt)
|
|
|
|
if chat_presence_ev is not None:
|
|
|
|
async def on_chat_presence(client, event: chat_presence_ev):
|
|
await self._handle_chat_presence_event(event)
|
|
|
|
self._register_event(chat_presence_ev, on_chat_presence)
|
|
|
|
if presence_ev is not None:
|
|
|
|
async def on_presence(client, event: presence_ev):
|
|
await self._handle_presence_event(event)
|
|
|
|
self._register_event(presence_ev, on_presence)
|
|
|
|
if pair_ev is not None:
|
|
|
|
async def on_pair_status(client, event: pair_ev):
|
|
qr_payload = self._extract_pair_qr(event)
|
|
if qr_payload:
|
|
self._last_qr_payload = qr_payload
|
|
self._publish_state(
|
|
pair_qr=qr_payload,
|
|
warning="Scan QR in WhatsApp Linked Devices.",
|
|
last_event="pair_status_qr",
|
|
pair_status="qr_ready",
|
|
qr_received_at=int(time.time()),
|
|
qr_probe_result="event",
|
|
last_error="",
|
|
)
|
|
status_raw = self._pluck(event, "Status")
|
|
status_text = str(status_raw or "").strip().lower()
|
|
if status_text in {"2", "success"}:
|
|
account = await self._resolve_account_identifier()
|
|
self._connected = True
|
|
self._publish_state(
|
|
connected=True,
|
|
warning="",
|
|
accounts=[account] if account else [self.client_name],
|
|
pair_qr="",
|
|
last_event="pair_status_success",
|
|
pair_status="connected",
|
|
connected_at=int(time.time()),
|
|
last_error="",
|
|
)
|
|
await self._sync_contacts_from_client()
|
|
elif status_text in {"1", "error"}:
|
|
error_text = str(self._pluck(event, "Error") or "").strip()
|
|
self._publish_state(
|
|
warning=error_text or "WhatsApp pairing failed. Retry scan.",
|
|
last_event="pair_status_error",
|
|
pair_status="error",
|
|
last_error=error_text or "unknown_pair_error",
|
|
)
|
|
|
|
self._register_event(pair_ev, on_pair_status)
|
|
|
|
if qr_ev is not None:
|
|
|
|
async def on_qr_event(client, event: qr_ev):
|
|
# Once connected, ignore late/stale QR emissions so runtime state
|
|
# does not regress from connected -> qr_ready.
|
|
state = transport.get_runtime_state(self.service)
|
|
if self._connected or bool(state.get("connected")):
|
|
return
|
|
qr_payload = self._extract_pair_qr(event)
|
|
if not qr_payload:
|
|
return
|
|
self._last_qr_payload = qr_payload
|
|
self._publish_state(
|
|
connected=False,
|
|
pair_qr=qr_payload,
|
|
warning="Scan QR in WhatsApp Linked Devices.",
|
|
last_event="qr_event",
|
|
pair_status="qr_ready",
|
|
qr_received_at=int(time.time()),
|
|
qr_probe_result="event",
|
|
last_error="",
|
|
)
|
|
|
|
self._register_event(qr_ev, on_qr_event)
|
|
|
|
if history_sync_ev is not None:
|
|
|
|
async def on_history_sync(client, event: history_sync_ev):
|
|
await self._handle_history_sync_event(event)
|
|
|
|
self._register_event(history_sync_ev, on_history_sync)
|
|
|
|
if offline_sync_completed_ev is not None:
|
|
|
|
async def on_offline_sync_completed(
|
|
client, event: offline_sync_completed_ev
|
|
):
|
|
self._publish_state(last_event="offline_sync_completed")
|
|
await self._sync_contacts_from_client()
|
|
|
|
self._register_event(offline_sync_completed_ev, on_offline_sync_completed)
|
|
|
|
def _mark_qr_wait_timeout(self):
|
|
state = transport.get_runtime_state(self.service)
|
|
if str(state.get("pair_status") or "").strip().lower() != "pending":
|
|
return
|
|
requested_at = int(state.get("pair_requested_at") or 0)
|
|
qr_received_at = int(state.get("qr_received_at") or 0)
|
|
if requested_at <= 0 or qr_received_at > 0:
|
|
return
|
|
now = int(time.time())
|
|
age = now - requested_at
|
|
# Avoid spamming writes while still surfacing a clear timeout state.
|
|
if age < 15 or (age % 10) != 0:
|
|
return
|
|
self._publish_state(
|
|
last_event="pair_waiting_no_qr",
|
|
warning=(
|
|
"Waiting for WhatsApp QR from Neonize. " "No QR callback received yet."
|
|
),
|
|
)
|
|
|
|
async def _maybe_await(self, value):
|
|
if asyncio.iscoroutine(value):
|
|
return await value
|
|
return value
|
|
|
|
async def _call_client_method(self, method, *args, timeout: float | None = None):
|
|
if method is None:
|
|
return None
|
|
if inspect.iscoroutinefunction(method):
|
|
coro = method(*args)
|
|
else:
|
|
coro = asyncio.to_thread(method, *args)
|
|
if timeout and timeout > 0:
|
|
return await asyncio.wait_for(coro, timeout=timeout)
|
|
return await coro
|
|
|
|
async def _drain_runtime_commands(self):
|
|
# Process a small burst each loop to keep sends responsive but avoid starvation.
|
|
for _ in range(5):
|
|
command = transport.pop_runtime_command(self.service)
|
|
if not command:
|
|
return
|
|
await self._execute_runtime_command(command)
|
|
|
|
async def _execute_runtime_command(self, command):
|
|
command_id = str((command or {}).get("id") or "").strip()
|
|
action = str((command or {}).get("action") or "").strip()
|
|
payload = dict((command or {}).get("payload") or {})
|
|
if not command_id:
|
|
return
|
|
self.log.debug(
|
|
"whatsapp runtime command start: id=%s action=%s",
|
|
command_id,
|
|
action,
|
|
)
|
|
|
|
if action == "send_message_raw":
|
|
recipient = str(payload.get("recipient") or "").strip()
|
|
text = payload.get("text")
|
|
attachments = payload.get("attachments") or []
|
|
metadata = dict(payload.get("metadata") or {})
|
|
send_timeout_s = 18.0
|
|
try:
|
|
# Include command_id so send_message_raw can observe cancel requests
|
|
result = await asyncio.wait_for(
|
|
self.send_message_raw(
|
|
recipient=recipient,
|
|
text=text,
|
|
attachments=attachments,
|
|
command_id=command_id,
|
|
metadata=metadata,
|
|
),
|
|
timeout=send_timeout_s,
|
|
)
|
|
if result is not False and result is not None:
|
|
transport.set_runtime_command_result(
|
|
self.service,
|
|
command_id,
|
|
{
|
|
"ok": True,
|
|
"timestamp": int(result)
|
|
if isinstance(result, int)
|
|
else int(time.time() * 1000),
|
|
},
|
|
)
|
|
self.log.debug(
|
|
"whatsapp runtime command ok: id=%s action=%s",
|
|
command_id,
|
|
action,
|
|
)
|
|
return
|
|
transport.set_runtime_command_result(
|
|
self.service,
|
|
command_id,
|
|
{
|
|
"ok": False,
|
|
"error": str(
|
|
getattr(self, "_last_send_error", "")
|
|
or "runtime_send_failed"
|
|
),
|
|
},
|
|
)
|
|
self.log.warning(
|
|
"whatsapp runtime command failed: id=%s action=%s error=%s",
|
|
command_id,
|
|
action,
|
|
str(getattr(self, "_last_send_error", "") or "runtime_send_failed"),
|
|
)
|
|
return
|
|
except asyncio.TimeoutError:
|
|
transport.set_runtime_command_result(
|
|
self.service,
|
|
command_id,
|
|
{
|
|
"ok": False,
|
|
"error": f"runtime_send_timeout:{int(send_timeout_s)}s",
|
|
},
|
|
)
|
|
self.log.warning(
|
|
"whatsapp runtime command timeout: id=%s action=%s timeout=%ss",
|
|
command_id,
|
|
action,
|
|
int(send_timeout_s),
|
|
)
|
|
return
|
|
except Exception as exc:
|
|
transport.set_runtime_command_result(
|
|
self.service,
|
|
command_id,
|
|
{
|
|
"ok": False,
|
|
"error": str(exc),
|
|
},
|
|
)
|
|
self.log.warning(
|
|
"whatsapp runtime command exception: id=%s action=%s error=%s",
|
|
command_id,
|
|
action,
|
|
exc,
|
|
)
|
|
return
|
|
|
|
if action == "send_reaction":
|
|
recipient = str(payload.get("recipient") or "").strip()
|
|
emoji = str(payload.get("emoji") or "")
|
|
target_message_id = str(payload.get("target_message_id") or "").strip()
|
|
target_timestamp = int(payload.get("target_timestamp") or 0)
|
|
remove = bool(payload.get("remove"))
|
|
try:
|
|
ok = await self.send_reaction(
|
|
recipient=recipient,
|
|
emoji=emoji,
|
|
target_message_id=target_message_id,
|
|
target_timestamp=target_timestamp,
|
|
remove=remove,
|
|
)
|
|
transport.set_runtime_command_result(
|
|
self.service,
|
|
command_id,
|
|
{
|
|
"ok": bool(ok),
|
|
"timestamp": int(time.time() * 1000),
|
|
"error": "" if ok else "reaction_send_failed",
|
|
},
|
|
)
|
|
return
|
|
except Exception as exc:
|
|
transport.set_runtime_command_result(
|
|
self.service,
|
|
command_id,
|
|
{
|
|
"ok": False,
|
|
"error": str(exc),
|
|
},
|
|
)
|
|
return
|
|
|
|
if action == "force_history_sync":
|
|
target_identifier = str(payload.get("identifier") or "").strip()
|
|
try:
|
|
result = await self._force_history_sync(target_identifier)
|
|
except Exception as exc:
|
|
transport.set_runtime_command_result(
|
|
self.service,
|
|
command_id,
|
|
{
|
|
"ok": False,
|
|
"error": str(exc),
|
|
},
|
|
)
|
|
return
|
|
transport.set_runtime_command_result(
|
|
self.service,
|
|
command_id,
|
|
{
|
|
"ok": True,
|
|
**dict(result or {}),
|
|
},
|
|
)
|
|
return
|
|
|
|
if action == "notify_xmpp_sent":
|
|
person_identifier_id = str(
|
|
payload.get("person_identifier_id") or ""
|
|
).strip()
|
|
text = str(payload.get("text") or "")
|
|
if not person_identifier_id:
|
|
transport.set_runtime_command_result(
|
|
self.service,
|
|
command_id,
|
|
{"ok": False, "error": "missing_person_identifier_id"},
|
|
)
|
|
return
|
|
try:
|
|
identifier = await sync_to_async(
|
|
lambda: PersonIdentifier.objects.filter(id=person_identifier_id)
|
|
.select_related("user", "person")
|
|
.first()
|
|
)()
|
|
if identifier is None:
|
|
transport.set_runtime_command_result(
|
|
self.service,
|
|
command_id,
|
|
{"ok": False, "error": "person_identifier_not_found"},
|
|
)
|
|
return
|
|
await self.ur.xmpp.client.send_from_external(
|
|
identifier.user,
|
|
identifier,
|
|
text,
|
|
True,
|
|
attachments=[],
|
|
)
|
|
transport.set_runtime_command_result(
|
|
self.service,
|
|
command_id,
|
|
{"ok": True, "timestamp": int(time.time() * 1000)},
|
|
)
|
|
return
|
|
except Exception as exc:
|
|
transport.set_runtime_command_result(
|
|
self.service,
|
|
command_id,
|
|
{"ok": False, "error": str(exc)},
|
|
)
|
|
return
|
|
|
|
transport.set_runtime_command_result(
|
|
self.service,
|
|
command_id,
|
|
{
|
|
"ok": False,
|
|
"error": f"unsupported_action:{action or '-'}",
|
|
},
|
|
)
|
|
|
|
async def _force_history_sync(self, identifier: str = ""):
|
|
started_at = int(time.time())
|
|
self._publish_state(
|
|
last_event="manual_history_sync_started",
|
|
history_sync_running=True,
|
|
history_sync_started_at=started_at,
|
|
history_sync_target=identifier or "",
|
|
)
|
|
try:
|
|
await self._sync_contacts_from_client()
|
|
history_request = await self._request_on_demand_history(
|
|
identifier=identifier,
|
|
count=120,
|
|
)
|
|
self._publish_state(
|
|
history_on_demand_requested=bool(history_request.get("requested")),
|
|
history_on_demand_error=str(history_request.get("error") or ""),
|
|
history_on_demand_anchor=str(history_request.get("anchor_key") or ""),
|
|
history_on_demand_at=int(time.time()),
|
|
)
|
|
if history_request.get("requested"):
|
|
# Give on-demand history chunks a brief window to arrive before command returns.
|
|
await asyncio.sleep(3)
|
|
# Best-effort probe: reconnect state and QR/connection probes can unlock
|
|
# pending history sync callbacks in some runtime sessions.
|
|
await self._after_connect_probe()
|
|
sqlite_sync = await self._sync_history_from_sqlite(identifier=identifier)
|
|
self._publish_state(
|
|
history_sqlite_imported=int(sqlite_sync.get("imported", 0)),
|
|
history_sqlite_scanned=int(sqlite_sync.get("scanned", 0)),
|
|
history_sqlite_rows=int(sqlite_sync.get("rows", 0)),
|
|
history_sqlite_table=str(sqlite_sync.get("table") or ""),
|
|
history_sqlite_ts=int(time.time()),
|
|
)
|
|
finally:
|
|
finished_at = int(time.time())
|
|
self._publish_state(
|
|
history_sync_running=False,
|
|
history_sync_finished_at=finished_at,
|
|
history_sync_duration_ms=int((finished_at - started_at) * 1000),
|
|
last_event="manual_history_sync_finished",
|
|
)
|
|
state = transport.get_runtime_state(self.service)
|
|
return {
|
|
"started_at": started_at,
|
|
"finished_at": int(
|
|
state.get("history_sync_finished_at") or int(time.time())
|
|
),
|
|
"duration_ms": int(state.get("history_sync_duration_ms") or 0),
|
|
"contacts_sync_count": int(state.get("contacts_sync_count") or 0),
|
|
"history_imported_messages": int(
|
|
state.get("history_imported_messages") or 0
|
|
),
|
|
"sqlite_imported_messages": int(state.get("history_sqlite_imported") or 0),
|
|
"sqlite_scanned_messages": int(state.get("history_sqlite_scanned") or 0),
|
|
"sqlite_table": str(state.get("history_sqlite_table") or ""),
|
|
"sqlite_error": str(state.get("history_sqlite_error") or ""),
|
|
"on_demand_requested": bool(state.get("history_on_demand_requested")),
|
|
"on_demand_error": str(state.get("history_on_demand_error") or ""),
|
|
"last_event": str(state.get("last_event") or ""),
|
|
}
|
|
|
|
async def _request_on_demand_history(self, identifier: str, count: int = 120):
|
|
if not self._client:
|
|
return {"requested": False, "error": "client_missing"}
|
|
normalized = str(identifier or "").strip()
|
|
if not normalized:
|
|
return {"requested": False, "error": "identifier_missing"}
|
|
|
|
state = transport.get_runtime_state(self.service)
|
|
anchors = state.get("history_anchors") or {}
|
|
anchor = None
|
|
anchor_key = ""
|
|
for candidate in self._normalize_identifier_candidates(normalized):
|
|
row = anchors.get(str(candidate))
|
|
if isinstance(row, dict) and row.get("msg_id"):
|
|
anchor = row
|
|
anchor_key = str(candidate)
|
|
break
|
|
if not anchor:
|
|
anchor = await self._load_history_anchor_from_sqlite(identifier=normalized)
|
|
if isinstance(anchor, dict) and anchor.get("msg_id"):
|
|
anchor_key = str(anchor.get("anchor_key") or "")
|
|
else:
|
|
return {"requested": False, "error": "anchor_missing"}
|
|
|
|
try:
|
|
from neonize.builder import build_history_sync_request
|
|
from neonize.proto.Neonize_pb2 import MessageInfo, MessageSource
|
|
except Exception as exc:
|
|
return {"requested": False, "error": f"history_builder_unavailable:{exc}"}
|
|
|
|
try:
|
|
chat_raw = str(anchor.get("chat_jid") or normalized).strip()
|
|
sender_raw = str(anchor.get("sender_jid") or chat_raw).strip()
|
|
if not chat_raw:
|
|
return {"requested": False, "error": "anchor_chat_missing"}
|
|
chat_jid = self._to_jid(chat_raw)
|
|
sender_jid = self._to_jid(sender_raw)
|
|
info = MessageInfo(
|
|
MessageSource=MessageSource(
|
|
Chat=chat_jid,
|
|
Sender=sender_jid,
|
|
IsFromMe=bool(anchor.get("from_me")),
|
|
IsGroup=False,
|
|
),
|
|
ID=str(anchor.get("msg_id") or ""),
|
|
Timestamp=int(anchor.get("ts") or int(time.time() * 1000)),
|
|
)
|
|
request_msg = build_history_sync_request(
|
|
info, max(10, min(int(count or 120), 500))
|
|
)
|
|
await self._maybe_await(self._client.send_message(chat_jid, request_msg))
|
|
self._publish_state(
|
|
last_event="history_on_demand_requested",
|
|
last_error="",
|
|
)
|
|
return {"requested": True, "anchor_key": anchor_key}
|
|
except Exception as exc:
|
|
self._publish_state(
|
|
last_event="history_on_demand_request_failed",
|
|
last_error=str(exc),
|
|
)
|
|
return {"requested": False, "error": str(exc), "anchor_key": anchor_key}
|
|
|
|
async def _load_history_anchor_from_sqlite(self, identifier: str):
|
|
def _read_anchor():
|
|
if not self.session_db or not os.path.exists(self.session_db):
|
|
return {}
|
|
try:
|
|
conn = sqlite3.connect(self.session_db)
|
|
conn.row_factory = sqlite3.Row
|
|
except Exception:
|
|
return {}
|
|
try:
|
|
cur = conn.cursor()
|
|
table_rows = cur.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table'"
|
|
).fetchall()
|
|
table_names = [str(row[0]) for row in table_rows if row and row[0]]
|
|
if "whatsmeow_message_secrets" not in table_names:
|
|
return {}
|
|
candidates = self._normalize_identifier_candidates(identifier)
|
|
local_values = set()
|
|
for value in candidates:
|
|
local = str(value or "").strip().split("@", 1)[0]
|
|
if local:
|
|
local_values.add(local)
|
|
if not local_values:
|
|
return {}
|
|
placeholders = ",".join("?" for _ in local_values)
|
|
query = (
|
|
"SELECT our_jid, chat_jid, sender_jid, message_id "
|
|
'FROM "whatsmeow_message_secrets" '
|
|
f"WHERE substr(chat_jid, 1, instr(chat_jid, '@') - 1) IN ({placeholders}) "
|
|
"ORDER BY rowid DESC LIMIT 1"
|
|
)
|
|
row = cur.execute(query, tuple(local_values)).fetchone()
|
|
if not row:
|
|
return {}
|
|
our_jid = str(row["our_jid"] or "")
|
|
own_user = our_jid.split("@", 1)[0].split(":", 1)[0].strip()
|
|
sender_jid = str(row["sender_jid"] or "")
|
|
sender_user = sender_jid.split("@", 1)[0].split(":", 1)[0].strip()
|
|
from_me = bool(own_user and sender_user and own_user == sender_user)
|
|
chat_jid = str(row["chat_jid"] or "")
|
|
msg_id = str(row["message_id"] or "")
|
|
if not chat_jid or not msg_id:
|
|
return {}
|
|
anchor_key = chat_jid.split("@", 1)[0]
|
|
return {
|
|
"chat_jid": chat_jid,
|
|
"sender_jid": sender_jid,
|
|
"msg_id": msg_id,
|
|
"from_me": from_me,
|
|
"ts": int(time.time() * 1000),
|
|
"anchor_key": anchor_key,
|
|
}
|
|
except Exception:
|
|
return {}
|
|
finally:
|
|
conn.close()
|
|
|
|
return await asyncio.to_thread(_read_anchor)
|
|
|
|
async def _sync_history_from_sqlite(self, identifier: str = ""):
|
|
def _extract_rows():
|
|
if not self.session_db or not os.path.exists(self.session_db):
|
|
return {"rows": [], "table": "", "error": "sqlite_missing"}
|
|
try:
|
|
conn = sqlite3.connect(self.session_db)
|
|
conn.row_factory = sqlite3.Row
|
|
except Exception as exc:
|
|
return {"rows": [], "table": "", "error": f"sqlite_open_failed:{exc}"}
|
|
try:
|
|
cur = conn.cursor()
|
|
table_rows = cur.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table'"
|
|
).fetchall()
|
|
table_names = [str(row[0]) for row in table_rows if row and row[0]]
|
|
preferred = [
|
|
"whatsmeow_messages",
|
|
"messages",
|
|
"message",
|
|
]
|
|
selected = ""
|
|
for candidate in preferred:
|
|
if candidate in table_names:
|
|
selected = candidate
|
|
break
|
|
if not selected:
|
|
for name in table_names:
|
|
lowered = name.lower()
|
|
if "message" not in lowered:
|
|
continue
|
|
if any(
|
|
token in lowered
|
|
for token in (
|
|
"secret",
|
|
"contacts",
|
|
"contact",
|
|
"chat_settings",
|
|
"event_buffer",
|
|
"identity",
|
|
"pre_keys",
|
|
"sender_keys",
|
|
"session",
|
|
"version",
|
|
"privacy",
|
|
"lid_map",
|
|
"device",
|
|
"app_state",
|
|
)
|
|
):
|
|
continue
|
|
if "contact" not in lowered:
|
|
selected = name
|
|
break
|
|
if not selected:
|
|
return {
|
|
"rows": [],
|
|
"table": "",
|
|
"error": "messages_table_not_found",
|
|
}
|
|
|
|
columns = [
|
|
str(row[1] or "")
|
|
for row in cur.execute(
|
|
f'PRAGMA table_info("{selected}")'
|
|
).fetchall()
|
|
]
|
|
if not columns:
|
|
return {"rows": [], "table": selected, "error": "no_columns"}
|
|
|
|
def pick(options):
|
|
lowered = {col.lower(): col for col in columns}
|
|
for option in options:
|
|
if option in lowered:
|
|
return lowered[option]
|
|
for option in options:
|
|
for col in columns:
|
|
if option in col.lower():
|
|
return col
|
|
return ""
|
|
|
|
text_col = pick(
|
|
[
|
|
"text",
|
|
"conversation",
|
|
"body",
|
|
"caption",
|
|
"content",
|
|
"message",
|
|
"msg",
|
|
]
|
|
)
|
|
ts_col = pick(
|
|
[
|
|
"message_timestamp",
|
|
"messagetimestamp",
|
|
"timestamp",
|
|
"ts",
|
|
"time",
|
|
]
|
|
)
|
|
from_me_col = pick(
|
|
[
|
|
"from_me",
|
|
"fromme",
|
|
"is_from_me",
|
|
"isfromme",
|
|
"outgoing",
|
|
]
|
|
)
|
|
sender_col = pick(
|
|
[
|
|
"sender_jid",
|
|
"sender",
|
|
"participant",
|
|
"from_jid",
|
|
"from",
|
|
]
|
|
)
|
|
chat_col = pick(
|
|
[
|
|
"chat_jid",
|
|
"remote_jid",
|
|
"their_jid",
|
|
"jid",
|
|
"chat",
|
|
]
|
|
)
|
|
if not (text_col and ts_col and (sender_col or chat_col)):
|
|
return {
|
|
"rows": [],
|
|
"table": selected,
|
|
"error": "required_message_columns_missing",
|
|
}
|
|
select_cols = [
|
|
col
|
|
for col in [text_col, ts_col, from_me_col, sender_col, chat_col]
|
|
if col
|
|
]
|
|
quoted = ", ".join(f'"{col}"' for col in select_cols)
|
|
order_expr = f'"{ts_col}" DESC' if ts_col else "ROWID DESC"
|
|
sql = f'SELECT {quoted} FROM "{selected}" ORDER BY {order_expr} LIMIT 12000'
|
|
try:
|
|
rows = cur.execute(sql).fetchall()
|
|
except Exception as exc:
|
|
return {
|
|
"rows": [],
|
|
"table": selected,
|
|
"error": f"messages_query_failed:{exc}",
|
|
}
|
|
parsed = []
|
|
for row in rows:
|
|
row_map = {col: row[idx] for idx, col in enumerate(select_cols)}
|
|
text = str(row_map.get(text_col) or "").strip()
|
|
if not text:
|
|
continue
|
|
raw_sender = (
|
|
str(row_map.get(sender_col) or "").strip() if sender_col else ""
|
|
)
|
|
raw_chat = (
|
|
str(row_map.get(chat_col) or "").strip() if chat_col else ""
|
|
)
|
|
raw_from_me = row_map.get(from_me_col) if from_me_col else None
|
|
parsed.append(
|
|
{
|
|
"text": text,
|
|
"ts": row_map.get(ts_col),
|
|
"from_me": raw_from_me,
|
|
"sender": raw_sender,
|
|
"chat": raw_chat,
|
|
}
|
|
)
|
|
return {"rows": parsed, "table": selected, "error": ""}
|
|
finally:
|
|
conn.close()
|
|
|
|
extracted = await asyncio.to_thread(_extract_rows)
|
|
rows = list(extracted.get("rows") or [])
|
|
table_name = str(extracted.get("table") or "")
|
|
error_text = str(extracted.get("error") or "").strip()
|
|
if error_text:
|
|
self._publish_state(
|
|
last_event="sqlite_history_scan_failed",
|
|
history_sqlite_error=error_text,
|
|
)
|
|
return {"imported": 0, "scanned": 0, "rows": 0, "table": table_name}
|
|
if not rows:
|
|
return {"imported": 0, "scanned": 0, "rows": 0, "table": table_name}
|
|
|
|
target_candidates = self._normalize_identifier_candidates(identifier)
|
|
target_local = str(identifier or "").strip().split("@", 1)[0]
|
|
if target_local:
|
|
target_candidates.update(
|
|
self._normalize_identifier_candidates(target_local)
|
|
)
|
|
|
|
identifiers = await sync_to_async(list)(
|
|
PersonIdentifier.objects.filter(service="whatsapp")
|
|
)
|
|
if not identifiers:
|
|
return {"imported": 0, "scanned": 0, "rows": len(rows), "table": table_name}
|
|
|
|
by_candidate = {}
|
|
for row in identifiers:
|
|
values = self._normalize_identifier_candidates(row.identifier)
|
|
for candidate in values:
|
|
if not candidate:
|
|
continue
|
|
by_candidate.setdefault(candidate, []).append(row)
|
|
|
|
imported = 0
|
|
scanned = 0
|
|
session_cache = {}
|
|
for row in rows:
|
|
sender = self._jid_to_identifier(row.get("sender"))
|
|
chat = self._jid_to_identifier(row.get("chat"))
|
|
row_candidates = self._normalize_identifier_candidates(sender, chat)
|
|
if not row_candidates:
|
|
continue
|
|
if target_candidates and not (row_candidates & target_candidates):
|
|
continue
|
|
matched_identifiers = {}
|
|
for candidate in row_candidates:
|
|
for pi in by_candidate.get(candidate, []):
|
|
matched_identifiers[int(pi.id)] = pi
|
|
if not matched_identifiers:
|
|
continue
|
|
scanned += 1
|
|
ts = self._normalize_timestamp(row.get("ts"))
|
|
text = str(row.get("text") or "").strip()
|
|
if not text:
|
|
continue
|
|
from_me_raw = row.get("from_me")
|
|
from_me_text = str(from_me_raw or "").strip().lower()
|
|
from_me = from_me_text in {"1", "true", "t", "yes", "y"}
|
|
sender_uuid = "" if from_me else str(sender or chat or "")
|
|
custom_author = "BOT" if from_me else None
|
|
for pi in matched_identifiers.values():
|
|
session = session_cache.get(int(pi.id))
|
|
if session is None:
|
|
session = await history.get_chat_session(pi.user, pi)
|
|
session_cache[int(pi.id)] = session
|
|
exists = await sync_to_async(
|
|
Message.objects.filter(
|
|
user=pi.user,
|
|
session=session,
|
|
ts=ts,
|
|
sender_uuid=sender_uuid,
|
|
text=text,
|
|
custom_author=custom_author,
|
|
).exists
|
|
)()
|
|
if exists:
|
|
continue
|
|
await sync_to_async(Message.objects.create)(
|
|
user=pi.user,
|
|
session=session,
|
|
ts=ts,
|
|
sender_uuid=sender_uuid,
|
|
text=text,
|
|
custom_author=custom_author,
|
|
delivered_ts=ts,
|
|
)
|
|
imported += 1
|
|
|
|
self.log.info(
|
|
"whatsapp sqlite history sync: table=%s scanned=%s imported=%s rows=%s target=%s",
|
|
table_name or "-",
|
|
scanned,
|
|
imported,
|
|
len(rows),
|
|
identifier or "-",
|
|
)
|
|
return {
|
|
"imported": imported,
|
|
"scanned": scanned,
|
|
"rows": len(rows),
|
|
"table": table_name,
|
|
}
|
|
|
|
async def _resolve_account_identifier(self):
|
|
if self._client is None:
|
|
return ""
|
|
if not hasattr(self._client, "get_me"):
|
|
return self.client_name
|
|
# Guard: Neonize Go GetMe() panics (SIGSEGV) when cli.LID is nil
|
|
# on unlinked sessions. Only safe to call after pairing completes
|
|
# and the device row exists in the whatsmeow_device table.
|
|
if not self._session_has_device():
|
|
return self.client_name
|
|
try:
|
|
me = await self._maybe_await(self._client.get_me())
|
|
except Exception:
|
|
return self.client_name
|
|
# Support both dict-like and object-like payloads.
|
|
for path in (
|
|
("JID",),
|
|
("jid",),
|
|
):
|
|
value = self._jid_to_identifier(self._pluck(me, *path))
|
|
if value:
|
|
return value
|
|
for path in (
|
|
("JID", "User"),
|
|
("jid",),
|
|
("user",),
|
|
("ID",),
|
|
):
|
|
value = self._pluck(me, *path)
|
|
if value:
|
|
return str(value)
|
|
if hasattr(self._client, "get_all_devices"):
|
|
try:
|
|
devices = await self._maybe_await(self._client.get_all_devices())
|
|
if devices:
|
|
jid = self._jid_to_identifier(self._pluck(devices[0], "JID"))
|
|
if jid:
|
|
return jid
|
|
except Exception:
|
|
pass
|
|
return self.client_name
|
|
|
|
def _pluck(self, obj, *path):
|
|
current = obj
|
|
for key in path:
|
|
if current is None:
|
|
return None
|
|
if isinstance(current, dict):
|
|
current = current.get(key)
|
|
continue
|
|
if hasattr(current, key):
|
|
current = getattr(current, key)
|
|
continue
|
|
return None
|
|
return current
|
|
|
|
def _shape_keys(self, obj):
|
|
if obj is None:
|
|
return []
|
|
if isinstance(obj, dict):
|
|
return sorted(str(key) for key in obj.keys())
|
|
if hasattr(obj, "__dict__"):
|
|
return sorted(str(key) for key in vars(obj).keys())
|
|
return []
|
|
|
|
def _normalize_timestamp(self, raw_value):
|
|
if raw_value is None:
|
|
return int(time.time() * 1000)
|
|
try:
|
|
value = int(raw_value)
|
|
except Exception:
|
|
return int(time.time() * 1000)
|
|
# WhatsApp libs often emit seconds. Promote to ms.
|
|
if value < 10**12:
|
|
return value * 1000
|
|
return value
|
|
|
|
def _normalize_identifier_candidates(self, *values):
|
|
out = set()
|
|
state = transport.get_runtime_state(self.service)
|
|
lid_map = state.get("lid_map") or {}
|
|
for value in values:
|
|
raw = self._jid_to_identifier(value)
|
|
if not raw:
|
|
continue
|
|
out.add(raw)
|
|
if "@" in raw:
|
|
out.add(raw.split("@", 1)[0])
|
|
digits = re.sub(r"[^0-9]", "", raw)
|
|
if digits:
|
|
out.add(digits)
|
|
if not digits.startswith("+"):
|
|
out.add(f"+{digits}")
|
|
if "@lid" in raw:
|
|
mapped = re.sub(r"[^0-9]", "", str(lid_map.get(digits) or ""))
|
|
if mapped:
|
|
out.add(mapped)
|
|
out.add(f"+{mapped}")
|
|
out.add(f"{mapped}@s.whatsapp.net")
|
|
return out
|
|
|
|
async def _sync_contacts_from_client(self):
|
|
if self._client is None:
|
|
return
|
|
connected_now = await self._is_contact_sync_ready()
|
|
if not connected_now:
|
|
self._publish_state(
|
|
last_event="contacts_sync_skipped_disconnected",
|
|
contacts_source="disconnected",
|
|
)
|
|
return
|
|
# NOTE: Neonize get_all_contacts has crashed some runtime builds with a Go panic.
|
|
# Read contact-like rows directly from the session sqlite DB instead.
|
|
contacts, source, lid_map = await self._sync_contacts_from_sqlite()
|
|
groups, groups_source = await self._sync_groups_from_client()
|
|
now_ts = int(time.time())
|
|
|
|
if contacts:
|
|
self.log.debug(
|
|
"whatsapp contacts synced: count=%s source=%s",
|
|
len(contacts),
|
|
source or "unknown",
|
|
)
|
|
self._publish_state(
|
|
contacts=contacts,
|
|
lid_map=lid_map,
|
|
contacts_synced_at=now_ts,
|
|
contacts_sync_count=len(contacts),
|
|
last_event="contacts_synced",
|
|
contacts_source=source or "unknown",
|
|
last_error="",
|
|
)
|
|
else:
|
|
self.log.debug("whatsapp contacts sync empty (%s)", source or "unknown")
|
|
self._publish_state(
|
|
last_event="contacts_sync_empty",
|
|
contacts_source=source or "unknown",
|
|
)
|
|
|
|
if groups_source:
|
|
event_name = "groups_synced" if groups else "groups_sync_empty"
|
|
self._publish_state(
|
|
groups=groups,
|
|
groups_source=groups_source,
|
|
groups_sync_count=len(groups),
|
|
groups_synced_at=now_ts,
|
|
last_event=event_name,
|
|
last_error="" if groups else "",
|
|
)
|
|
|
|
async def _sync_contacts_from_sqlite(self):
|
|
def _extract():
|
|
if not self.session_db or not os.path.exists(self.session_db):
|
|
return [], "sqlite_missing", {}
|
|
try:
|
|
conn = sqlite3.connect(self.session_db)
|
|
conn.row_factory = sqlite3.Row
|
|
except Exception:
|
|
return [], "sqlite_open_failed", {}
|
|
try:
|
|
cur = conn.cursor()
|
|
table_rows = cur.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table'"
|
|
).fetchall()
|
|
table_names = [str(row[0]) for row in table_rows if row and row[0]]
|
|
lid_map = {}
|
|
if "whatsmeow_lid_map" in table_names:
|
|
try:
|
|
lid_rows = cur.execute(
|
|
'SELECT "lid", "pn" FROM "whatsmeow_lid_map" LIMIT 10000'
|
|
).fetchall()
|
|
except Exception:
|
|
lid_rows = []
|
|
for lid, pn in lid_rows:
|
|
lid_key = re.sub(r"[^0-9]", "", str(lid or "").strip())
|
|
pn_value = re.sub(r"[^0-9]", "", str(pn or "").strip())
|
|
if lid_key and pn_value:
|
|
lid_map[lid_key] = pn_value
|
|
# Prefer the canonical contacts table when available.
|
|
if "whatsmeow_contacts" in table_names:
|
|
own_ids = set()
|
|
for value in (
|
|
transport.get_runtime_state(self.service).get("accounts") or []
|
|
):
|
|
base = str(value or "").strip().split("@", 1)[0]
|
|
base = base.split(":", 1)[0]
|
|
if base:
|
|
own_ids.add(base.lower())
|
|
try:
|
|
for row in cur.execute(
|
|
'SELECT DISTINCT "our_jid" FROM "whatsmeow_contacts"'
|
|
).fetchall():
|
|
base = (
|
|
str((row or [None])[0] or "").strip().split("@", 1)[0]
|
|
)
|
|
base = base.split(":", 1)[0]
|
|
if base:
|
|
own_ids.add(base.lower())
|
|
except Exception:
|
|
pass
|
|
|
|
out = []
|
|
seen = set()
|
|
try:
|
|
rows = cur.execute(
|
|
'SELECT "their_jid", "first_name", "full_name", "push_name", "business_name" '
|
|
'FROM "whatsmeow_contacts" LIMIT 5000'
|
|
).fetchall()
|
|
except Exception:
|
|
rows = []
|
|
for (
|
|
their_jid,
|
|
first_name,
|
|
full_name,
|
|
push_name,
|
|
business_name,
|
|
) in rows:
|
|
jid_value = str(their_jid or "").strip()
|
|
if (
|
|
"@s.whatsapp.net" not in jid_value
|
|
and "@lid" not in jid_value
|
|
):
|
|
continue
|
|
identifier = jid_value.split("@", 1)[0].strip().split(":", 1)[0]
|
|
if "@lid" in jid_value:
|
|
mapped = lid_map.get(re.sub(r"[^0-9]", "", identifier))
|
|
if mapped:
|
|
identifier = mapped
|
|
jid_value = f"{mapped}@s.whatsapp.net"
|
|
if not identifier:
|
|
continue
|
|
if identifier.lower() in own_ids:
|
|
continue
|
|
if identifier in seen:
|
|
continue
|
|
seen.add(identifier)
|
|
display_name = (
|
|
str(push_name or "").strip()
|
|
or str(full_name or "").strip()
|
|
or str(business_name or "").strip()
|
|
or str(first_name or "").strip()
|
|
)
|
|
out.append(
|
|
{
|
|
"identifier": identifier,
|
|
"jid": jid_value,
|
|
"name": display_name,
|
|
"chat": "",
|
|
"seen_at": int(time.time()),
|
|
}
|
|
)
|
|
if len(out) >= 500:
|
|
break
|
|
if out:
|
|
return out, "sqlite_contacts", lid_map
|
|
|
|
account_keys = {
|
|
str(value or "").strip().split("@", 1)[0].lower()
|
|
for value in (
|
|
transport.get_runtime_state(self.service).get("accounts") or []
|
|
)
|
|
if str(value or "").strip()
|
|
}
|
|
seen = set()
|
|
out = []
|
|
for table in table_names:
|
|
try:
|
|
columns = [
|
|
str(row[1] or "")
|
|
for row in cur.execute(
|
|
f'PRAGMA table_info("{table}")'
|
|
).fetchall()
|
|
]
|
|
except Exception:
|
|
continue
|
|
if not columns:
|
|
continue
|
|
jid_cols = [
|
|
col
|
|
for col in columns
|
|
if any(
|
|
token in col.lower()
|
|
for token in (
|
|
"jid",
|
|
"phone",
|
|
"chat",
|
|
"sender",
|
|
"recipient",
|
|
"participant",
|
|
"from",
|
|
"to",
|
|
"user",
|
|
)
|
|
)
|
|
]
|
|
name_cols = [
|
|
col
|
|
for col in columns
|
|
if "name" in col.lower() or "push" in col.lower()
|
|
]
|
|
if not jid_cols:
|
|
continue
|
|
select_cols = list(dict.fromkeys(jid_cols + name_cols))[:6]
|
|
quoted = ", ".join(f'"{col}"' for col in select_cols)
|
|
try:
|
|
rows = cur.execute(
|
|
f'SELECT {quoted} FROM "{table}" LIMIT 1000'
|
|
).fetchall()
|
|
except Exception:
|
|
continue
|
|
for row in rows:
|
|
row_map = {col: row[idx] for idx, col in enumerate(select_cols)}
|
|
jid_value = ""
|
|
for col in jid_cols:
|
|
raw = str(row_map.get(col) or "").strip()
|
|
if "@s.whatsapp.net" in raw:
|
|
m = re.search(r"([0-9]{6,20})@s\.whatsapp\.net", raw)
|
|
jid_value = (
|
|
f"{m.group(1)}@s.whatsapp.net"
|
|
if m
|
|
else raw.split()[0]
|
|
)
|
|
break
|
|
if raw.endswith("@lid"):
|
|
jid_value = raw
|
|
break
|
|
digits = re.sub(r"[^0-9]", "", raw)
|
|
if digits:
|
|
jid_value = f"{digits}@s.whatsapp.net"
|
|
break
|
|
if not jid_value:
|
|
continue
|
|
identifier = jid_value.split("@", 1)[0].strip()
|
|
if not identifier:
|
|
continue
|
|
if identifier.lower() in account_keys:
|
|
continue
|
|
if identifier in seen:
|
|
continue
|
|
seen.add(identifier)
|
|
display_name = ""
|
|
for col in name_cols:
|
|
candidate = str(row_map.get(col) or "").strip()
|
|
if candidate and candidate not in {"~", "-", "_"}:
|
|
display_name = candidate
|
|
break
|
|
out.append(
|
|
{
|
|
"identifier": identifier,
|
|
"jid": jid_value,
|
|
"name": display_name,
|
|
"chat": "",
|
|
"seen_at": int(time.time()),
|
|
}
|
|
)
|
|
if len(out) >= 500:
|
|
return out, "sqlite_tables", lid_map
|
|
return out, "sqlite_tables", lid_map
|
|
finally:
|
|
conn.close()
|
|
|
|
return await asyncio.to_thread(_extract)
|
|
|
|
async def _sync_groups_from_client(self):
|
|
if self._client is None:
|
|
return [], "client_missing"
|
|
getter = getattr(self._client, "get_joined_groups", None)
|
|
if getter is None:
|
|
return [], "get_joined_groups_missing"
|
|
try:
|
|
group_rows = await self._maybe_await(getter())
|
|
except Exception as exc:
|
|
self._publish_state(
|
|
last_event="groups_sync_failed",
|
|
last_error=str(exc),
|
|
)
|
|
return [], "get_joined_groups_failed"
|
|
|
|
out = []
|
|
now_ts = int(time.time())
|
|
for group in group_rows or []:
|
|
jid_value = self._jid_to_identifier(
|
|
self._pluck(group, "JID") or self._pluck(group, "jid")
|
|
)
|
|
identifier = (
|
|
jid_value.split("@", 1)[0].strip() if jid_value else ""
|
|
)
|
|
if not identifier:
|
|
continue
|
|
name = (
|
|
str(self._pluck(group, "GroupName", "Name") or "").strip()
|
|
or str(self._pluck(group, "GroupTopic", "Topic") or "").strip()
|
|
or identifier
|
|
)
|
|
out.append(
|
|
{
|
|
"identifier": identifier,
|
|
"jid": jid_value or f"{identifier}@g.us",
|
|
"name": name,
|
|
"chat": name,
|
|
"type": "group",
|
|
"seen_at": now_ts,
|
|
}
|
|
)
|
|
if len(out) >= 500:
|
|
break
|
|
return out, "get_joined_groups"
|
|
|
|
async def _is_contact_sync_ready(self) -> bool:
|
|
if self._client is None:
|
|
return False
|
|
if self._connected:
|
|
return True
|
|
state = transport.get_runtime_state(self.service)
|
|
if bool(state.get("connected")):
|
|
return True
|
|
pair_status = str(state.get("pair_status") or "").strip().lower()
|
|
if pair_status == "connected":
|
|
return True
|
|
check_connected = getattr(self._client, "is_connected", None)
|
|
if check_connected is None:
|
|
return False
|
|
try:
|
|
value = (
|
|
await self._maybe_await(check_connected())
|
|
if callable(check_connected)
|
|
else await self._maybe_await(check_connected)
|
|
)
|
|
except Exception:
|
|
return False
|
|
if value:
|
|
self._connected = True
|
|
self._publish_state(connected=True, warning="", pair_status="connected")
|
|
return True
|
|
# Guard: Neonize Go GetMe() panics (SIGSEGV) on unlinked sessions.
|
|
if hasattr(self._client, "get_me") and self._session_has_device():
|
|
try:
|
|
me = await self._maybe_await(self._client.get_me())
|
|
if me:
|
|
self._connected = True
|
|
self._publish_state(
|
|
connected=True, warning="", pair_status="connected"
|
|
)
|
|
return True
|
|
except Exception:
|
|
pass
|
|
return False
|
|
|
|
async def _handle_history_sync_event(self, event):
|
|
started_at = time.time()
|
|
self._publish_state(
|
|
history_sync_running=True,
|
|
history_sync_started_at=int(started_at),
|
|
last_event="history_sync_started",
|
|
)
|
|
data = self._pluck(event, "Data") or self._pluck(event, "data")
|
|
if data is None:
|
|
self._publish_state(
|
|
history_sync_running=False,
|
|
history_sync_duration_ms=int((time.time() - started_at) * 1000),
|
|
)
|
|
return
|
|
|
|
pushname_rows = (
|
|
self._pluck(data, "pushnames") or self._pluck(data, "Pushnames") or []
|
|
)
|
|
pushname_map = {}
|
|
for row in pushname_rows:
|
|
raw_id = self._jid_to_identifier(
|
|
self._pluck(row, "ID")
|
|
) or self._jid_to_identifier(self._pluck(row, "id"))
|
|
if not raw_id:
|
|
continue
|
|
pushname = str(
|
|
self._pluck(row, "pushname") or self._pluck(row, "Pushname") or ""
|
|
).strip()
|
|
if not pushname:
|
|
continue
|
|
pushname_map[raw_id] = pushname
|
|
pushname_map[raw_id.split("@", 1)[0]] = pushname
|
|
|
|
conversation_rows = (
|
|
self._pluck(data, "conversations")
|
|
or self._pluck(data, "Conversations")
|
|
or []
|
|
)
|
|
found = 0
|
|
imported_messages = 0
|
|
for row in conversation_rows:
|
|
jid = ""
|
|
for candidate in (
|
|
self._pluck(row, "ID"),
|
|
self._pluck(row, "id"),
|
|
self._pluck(row, "pnJID"),
|
|
self._pluck(row, "pnJid"),
|
|
self._pluck(row, "newJID"),
|
|
self._pluck(row, "newJid"),
|
|
self._pluck(row, "oldJID"),
|
|
self._pluck(row, "oldJid"),
|
|
):
|
|
parsed = self._jid_to_identifier(candidate)
|
|
if parsed:
|
|
jid = parsed
|
|
break
|
|
if not jid or "@g.us" in jid or "@broadcast" in jid:
|
|
continue
|
|
identifier = jid.split("@", 1)[0].strip()
|
|
if not identifier or not re.search(r"[0-9]{6,}", identifier):
|
|
continue
|
|
name = str(
|
|
self._pluck(row, "displayName")
|
|
or self._pluck(row, "DisplayName")
|
|
or self._pluck(row, "name")
|
|
or self._pluck(row, "Name")
|
|
or self._pluck(row, "username")
|
|
or self._pluck(row, "Username")
|
|
or pushname_map.get(jid)
|
|
or pushname_map.get(identifier)
|
|
or ""
|
|
).strip()
|
|
self._remember_contact(identifier, jid=jid, name=name)
|
|
found += 1
|
|
imported_messages += await self._import_history_messages_for_conversation(
|
|
row=row,
|
|
chat_jid=jid,
|
|
)
|
|
|
|
if found:
|
|
state = transport.get_runtime_state(self.service)
|
|
current_count = int(state.get("contacts_sync_count") or 0)
|
|
self._publish_state(
|
|
contacts_source="history_sync",
|
|
contacts_synced_at=int(time.time()),
|
|
contacts_sync_count=max(current_count, found),
|
|
last_event="history_sync_contacts",
|
|
last_error="",
|
|
)
|
|
self._publish_state(
|
|
history_sync_running=False,
|
|
history_sync_finished_at=int(time.time()),
|
|
history_sync_duration_ms=int((time.time() - started_at) * 1000),
|
|
history_imported_messages=imported_messages,
|
|
last_event="history_sync_completed",
|
|
)
|
|
|
|
def _history_message_rows(self, conversation_row):
|
|
candidates = (
|
|
self._pluck(conversation_row, "messages"),
|
|
self._pluck(conversation_row, "Messages"),
|
|
self._pluck(conversation_row, "msgs"),
|
|
self._pluck(conversation_row, "Msgs"),
|
|
)
|
|
for value in candidates:
|
|
if isinstance(value, (list, tuple)):
|
|
return list(value)
|
|
return []
|
|
|
|
def _history_message_text(self, msg):
|
|
return str(
|
|
self._pluck(msg, "message", "conversation")
|
|
or self._pluck(msg, "message", "Conversation")
|
|
or self._pluck(msg, "message", "extendedTextMessage", "text")
|
|
or self._pluck(msg, "message", "ExtendedTextMessage", "Text")
|
|
or self._pluck(msg, "message", "imageMessage", "caption")
|
|
or self._pluck(msg, "message", "videoMessage", "caption")
|
|
or self._pluck(msg, "message", "documentMessage", "caption")
|
|
or self._pluck(msg, "Message", "conversation")
|
|
or self._pluck(msg, "Message", "Conversation")
|
|
or self._pluck(msg, "Message", "extendedTextMessage", "text")
|
|
or self._pluck(msg, "Message", "ExtendedTextMessage", "Text")
|
|
or self._pluck(msg, "conversation")
|
|
or self._pluck(msg, "Conversation")
|
|
or self._pluck(msg, "text")
|
|
or self._pluck(msg, "Text")
|
|
or ""
|
|
).strip()
|
|
|
|
def _message_text(self, msg_obj, event_obj=None):
|
|
"""
|
|
Extract user-visible text from diverse WhatsApp message payload shapes.
|
|
"""
|
|
candidates = (
|
|
self._pluck(msg_obj, "conversation"),
|
|
self._pluck(msg_obj, "Conversation"),
|
|
self._pluck(msg_obj, "extendedTextMessage", "text"),
|
|
self._pluck(msg_obj, "ExtendedTextMessage", "Text"),
|
|
self._pluck(msg_obj, "extended_text_message", "text"),
|
|
self._pluck(msg_obj, "imageMessage", "caption"),
|
|
self._pluck(msg_obj, "videoMessage", "caption"),
|
|
self._pluck(msg_obj, "documentMessage", "caption"),
|
|
self._pluck(msg_obj, "ephemeralMessage", "message", "conversation"),
|
|
self._pluck(
|
|
msg_obj, "ephemeralMessage", "message", "extendedTextMessage", "text"
|
|
),
|
|
self._pluck(msg_obj, "viewOnceMessage", "message", "conversation"),
|
|
self._pluck(
|
|
msg_obj, "viewOnceMessage", "message", "extendedTextMessage", "text"
|
|
),
|
|
self._pluck(msg_obj, "viewOnceMessageV2", "message", "conversation"),
|
|
self._pluck(
|
|
msg_obj, "viewOnceMessageV2", "message", "extendedTextMessage", "text"
|
|
),
|
|
self._pluck(
|
|
msg_obj, "viewOnceMessageV2Extension", "message", "conversation"
|
|
),
|
|
self._pluck(
|
|
msg_obj,
|
|
"viewOnceMessageV2Extension",
|
|
"message",
|
|
"extendedTextMessage",
|
|
"text",
|
|
),
|
|
self._pluck(event_obj, "message", "conversation"),
|
|
self._pluck(event_obj, "message", "extendedTextMessage", "text"),
|
|
self._pluck(event_obj, "Message", "conversation"),
|
|
self._pluck(event_obj, "Message", "extendedTextMessage", "text"),
|
|
self._pluck(event_obj, "conversation"),
|
|
self._pluck(event_obj, "text"),
|
|
)
|
|
for value in candidates:
|
|
text = str(value or "").strip()
|
|
if text:
|
|
return text
|
|
return ""
|
|
|
|
def _history_message_ts(self, msg):
|
|
raw_ts = (
|
|
self._pluck(msg, "messageTimestamp")
|
|
or self._pluck(msg, "MessageTimestamp")
|
|
or self._pluck(msg, "timestamp")
|
|
or self._pluck(msg, "Timestamp")
|
|
or self._pluck(msg, "message", "messageTimestamp")
|
|
or self._pluck(msg, "Message", "messageTimestamp")
|
|
or int(time.time() * 1000)
|
|
)
|
|
return self._normalize_timestamp(raw_ts)
|
|
|
|
def _history_message_from_me(self, msg):
|
|
return bool(
|
|
self._pluck(msg, "key", "fromMe")
|
|
or self._pluck(msg, "Key", "FromMe")
|
|
or self._pluck(msg, "messageKey", "fromMe")
|
|
or self._pluck(msg, "MessageKey", "FromMe")
|
|
or self._pluck(msg, "fromMe")
|
|
or self._pluck(msg, "FromMe")
|
|
)
|
|
|
|
def _history_message_sender_jid(self, msg, fallback_chat_jid: str):
|
|
sender = self._jid_to_identifier(
|
|
self._pluck(msg, "key", "participant")
|
|
or self._pluck(msg, "Key", "Participant")
|
|
or self._pluck(msg, "messageKey", "participant")
|
|
or self._pluck(msg, "MessageKey", "Participant")
|
|
or self._pluck(msg, "participant")
|
|
or self._pluck(msg, "Participant")
|
|
or fallback_chat_jid
|
|
)
|
|
return str(sender or fallback_chat_jid or "").strip()
|
|
|
|
async def _import_history_messages_for_conversation(
|
|
self, row, chat_jid: str
|
|
) -> int:
|
|
imported = 0
|
|
msg_rows = self._history_message_rows(row)
|
|
if not msg_rows:
|
|
return imported
|
|
chat_identifier = str(chat_jid or "").split("@", 1)[0].strip()
|
|
if not chat_identifier:
|
|
return imported
|
|
candidate_values = self._normalize_identifier_candidates(
|
|
chat_jid, chat_identifier
|
|
)
|
|
if not candidate_values:
|
|
return imported
|
|
identifiers = await sync_to_async(list)(
|
|
PersonIdentifier.objects.filter(
|
|
service="whatsapp",
|
|
identifier__in=list(candidate_values),
|
|
)
|
|
)
|
|
if not identifiers:
|
|
return imported
|
|
|
|
for msg in msg_rows:
|
|
ts = self._history_message_ts(msg)
|
|
text = self._history_message_text(msg)
|
|
from_me = self._history_message_from_me(msg)
|
|
sender_jid = self._history_message_sender_jid(msg, chat_jid)
|
|
for identifier in identifiers:
|
|
session = await history.get_chat_session(identifier.user, identifier)
|
|
exists = await sync_to_async(
|
|
Message.objects.filter(
|
|
user=identifier.user,
|
|
session=session,
|
|
ts=ts,
|
|
sender_uuid=(sender_jid if not from_me else ""),
|
|
text=text,
|
|
custom_author=("BOT" if from_me else None),
|
|
).exists
|
|
)()
|
|
if exists:
|
|
continue
|
|
await sync_to_async(Message.objects.create)(
|
|
user=identifier.user,
|
|
session=session,
|
|
ts=ts,
|
|
sender_uuid=(sender_jid if not from_me else ""),
|
|
text=text,
|
|
custom_author=("BOT" if from_me else None),
|
|
delivered_ts=ts,
|
|
)
|
|
imported += 1
|
|
return imported
|
|
|
|
def _remember_contact(self, identifier, *, jid="", name="", chat=""):
|
|
cleaned = str(identifier or "").strip()
|
|
if "@" in cleaned:
|
|
cleaned = cleaned.split("@", 1)[0]
|
|
if not cleaned:
|
|
return
|
|
state = transport.get_runtime_state(self.service)
|
|
existing = state.get("contacts") or []
|
|
rows = [item for item in existing if isinstance(item, dict)]
|
|
merged = []
|
|
seen = set()
|
|
now_ts = int(time.time())
|
|
row = {
|
|
"identifier": cleaned,
|
|
"jid": str(jid or "").strip(),
|
|
"name": str(name or "").strip(),
|
|
"chat": str(chat or "").strip(),
|
|
"seen_at": now_ts,
|
|
}
|
|
merged.append(row)
|
|
seen.add(cleaned)
|
|
for item in rows:
|
|
candidate = str(item.get("identifier") or item.get("jid") or "").strip()
|
|
if not candidate or candidate in seen:
|
|
continue
|
|
seen.add(candidate)
|
|
merged.append(item)
|
|
if len(merged) >= 500:
|
|
break
|
|
self._publish_state(contacts=merged, last_contact_seen_at=now_ts)
|
|
|
|
def _remember_history_anchor(
|
|
self,
|
|
*,
|
|
identifier: str,
|
|
chat_jid: str,
|
|
msg_id: str,
|
|
ts: int,
|
|
from_me: bool,
|
|
sender_jid: str = "",
|
|
):
|
|
if not msg_id:
|
|
return
|
|
candidate_keys = self._normalize_identifier_candidates(identifier, chat_jid)
|
|
if not candidate_keys:
|
|
return
|
|
state = transport.get_runtime_state(self.service)
|
|
anchors = dict(state.get("history_anchors") or {})
|
|
row = {
|
|
"msg_id": str(msg_id),
|
|
"ts": int(ts or int(time.time() * 1000)),
|
|
"from_me": bool(from_me),
|
|
"chat_jid": str(chat_jid or ""),
|
|
"sender_jid": str(sender_jid or ""),
|
|
"updated_at": int(time.time()),
|
|
}
|
|
for key in candidate_keys:
|
|
anchors[str(key)] = row
|
|
if len(anchors) > 400:
|
|
recent = sorted(
|
|
anchors.items(),
|
|
key=lambda item: int((item[1] or {}).get("updated_at") or 0),
|
|
reverse=True,
|
|
)[:300]
|
|
anchors = {k: v for k, v in recent}
|
|
self._publish_state(history_anchors=anchors)
|
|
|
|
def _jid_to_identifier(self, value):
|
|
raw = str(value or "").strip()
|
|
if not raw:
|
|
return ""
|
|
if not isinstance(value, str):
|
|
user = self._pluck(value, "User") or self._pluck(value, "user")
|
|
server = self._pluck(value, "Server") or self._pluck(value, "server")
|
|
if user and server:
|
|
return f"{user}@{server}"
|
|
if user:
|
|
return str(user)
|
|
return raw
|
|
|
|
def _is_media_message(self, message_obj):
|
|
media_fields = (
|
|
"imageMessage",
|
|
"videoMessage",
|
|
"audioMessage",
|
|
"documentMessage",
|
|
"stickerMessage",
|
|
"image_message",
|
|
"video_message",
|
|
"audio_message",
|
|
"document_message",
|
|
"sticker_message",
|
|
)
|
|
for field in media_fields:
|
|
value = self._pluck(message_obj, field)
|
|
if value:
|
|
return True
|
|
return False
|
|
|
|
def _infer_media_content_type(self, message_obj):
|
|
if self._pluck(message_obj, "imageMessage") or self._pluck(
|
|
message_obj, "image_message"
|
|
):
|
|
return "image/jpeg"
|
|
if self._pluck(message_obj, "videoMessage") or self._pluck(
|
|
message_obj, "video_message"
|
|
):
|
|
return "video/mp4"
|
|
if self._pluck(message_obj, "audioMessage") or self._pluck(
|
|
message_obj, "audio_message"
|
|
):
|
|
return "audio/ogg"
|
|
if self._pluck(message_obj, "stickerMessage") or self._pluck(
|
|
message_obj, "sticker_message"
|
|
):
|
|
return "image/webp"
|
|
return "application/octet-stream"
|
|
|
|
def _extract_reaction_event(self, message_obj):
|
|
node = self._pluck(message_obj, "reactionMessage") or self._pluck(
|
|
message_obj, "reaction_message"
|
|
)
|
|
if not node:
|
|
return None
|
|
emoji = str(
|
|
self._pluck(node, "text") or self._pluck(node, "emoji") or ""
|
|
).strip()
|
|
target_msg_id = str(
|
|
self._pluck(node, "key", "id")
|
|
or self._pluck(node, "key", "ID")
|
|
or self._pluck(node, "targetMessageKey", "id")
|
|
or self._pluck(node, "target_message_key", "id")
|
|
or ""
|
|
).strip()
|
|
remove = bool(not emoji)
|
|
if not target_msg_id:
|
|
return None
|
|
return {
|
|
"emoji": emoji,
|
|
"target_message_id": target_msg_id,
|
|
"remove": remove,
|
|
}
|
|
|
|
async def _download_event_media(self, event):
|
|
if not self._client:
|
|
return []
|
|
msg_obj = self._pluck(event, "message") or self._pluck(event, "Message")
|
|
if msg_obj is None or not self._is_media_message(msg_obj):
|
|
return []
|
|
if not hasattr(self._client, "download_any"):
|
|
return []
|
|
|
|
try:
|
|
payload = await self._maybe_await(self._client.download_any(msg_obj))
|
|
except Exception as exc:
|
|
self.log.warning("whatsapp media download failed: %s", exc)
|
|
return []
|
|
|
|
if isinstance(payload, memoryview):
|
|
payload = payload.tobytes()
|
|
if not isinstance(payload, (bytes, bytearray)):
|
|
return []
|
|
|
|
filename = self._pluck(msg_obj, "documentMessage", "fileName") or self._pluck(
|
|
msg_obj, "document_message", "file_name"
|
|
)
|
|
content_type = (
|
|
self._pluck(msg_obj, "documentMessage", "mimetype")
|
|
or self._pluck(msg_obj, "document_message", "mimetype")
|
|
or self._pluck(msg_obj, "imageMessage", "mimetype")
|
|
or self._pluck(msg_obj, "image_message", "mimetype")
|
|
or self._pluck(msg_obj, "videoMessage", "mimetype")
|
|
or self._pluck(msg_obj, "video_message", "mimetype")
|
|
or self._pluck(msg_obj, "audioMessage", "mimetype")
|
|
or self._pluck(msg_obj, "audio_message", "mimetype")
|
|
or self._infer_media_content_type(msg_obj)
|
|
)
|
|
if not filename:
|
|
ext = mimetypes.guess_extension(
|
|
str(content_type or "").split(";", 1)[0].strip().lower()
|
|
)
|
|
filename = f"wa-{int(time.time())}{ext or '.bin'}"
|
|
blob_key = media_bridge.put_blob(
|
|
service="whatsapp",
|
|
content=bytes(payload),
|
|
filename=filename,
|
|
content_type=content_type,
|
|
)
|
|
if not blob_key:
|
|
return []
|
|
return [
|
|
{
|
|
"blob_key": blob_key,
|
|
"filename": filename,
|
|
"content_type": content_type,
|
|
"size": len(payload),
|
|
}
|
|
]
|
|
|
|
async def _handle_message_event(self, event):
|
|
msg_obj = self._pluck(event, "message") or self._pluck(event, "Message")
|
|
text = self._message_text(msg_obj, event)
|
|
if not text:
|
|
self.log.debug(
|
|
"whatsapp empty-text event shape: msg_keys=%s event_keys=%s type=%s",
|
|
self._shape_keys(msg_obj),
|
|
self._shape_keys(event),
|
|
str(type(event).__name__),
|
|
)
|
|
source = (
|
|
self._pluck(event, "Info", "MessageSource")
|
|
or self._pluck(event, "info", "message_source")
|
|
or self._pluck(event, "info", "messageSource")
|
|
)
|
|
is_from_me = bool(
|
|
self._pluck(source, "IsFromMe") or self._pluck(source, "isFromMe")
|
|
)
|
|
|
|
sender = self._jid_to_identifier(
|
|
self._pluck(source, "Sender")
|
|
or self._pluck(source, "sender")
|
|
or self._pluck(source, "SenderAlt")
|
|
or self._pluck(source, "senderAlt")
|
|
)
|
|
chat = self._jid_to_identifier(
|
|
self._pluck(source, "Chat") or self._pluck(source, "chat")
|
|
)
|
|
raw_ts = (
|
|
self._pluck(event, "Info", "Timestamp")
|
|
or self._pluck(event, "info", "timestamp")
|
|
or self._pluck(event, "info", "message_timestamp")
|
|
or self._pluck(event, "Timestamp")
|
|
or self._pluck(event, "timestamp")
|
|
)
|
|
msg_id = str(
|
|
self._pluck(event, "Info", "ID")
|
|
or self._pluck(event, "info", "id")
|
|
or self._pluck(event, "ID")
|
|
or self._pluck(event, "id")
|
|
or ""
|
|
).strip()
|
|
ts = self._normalize_timestamp(raw_ts)
|
|
|
|
reaction_payload = self._extract_reaction_event(msg_obj)
|
|
if reaction_payload:
|
|
self.log.debug(
|
|
"reaction-bridge whatsapp-inbound msg_id=%s target_id=%s emoji=%s remove=%s sender=%s chat=%s",
|
|
msg_id or "-",
|
|
str(reaction_payload.get("target_message_id") or "") or "-",
|
|
str(reaction_payload.get("emoji") or "") or "-",
|
|
bool(reaction_payload.get("remove")),
|
|
sender or "-",
|
|
chat or "-",
|
|
)
|
|
identifier_values = self._normalize_identifier_candidates(sender, chat)
|
|
if not identifier_values:
|
|
self.log.warning(
|
|
"reaction-bridge whatsapp-identifiers-miss sender=%s chat=%s",
|
|
sender or "-",
|
|
chat or "-",
|
|
)
|
|
return
|
|
identifiers = await sync_to_async(list)(
|
|
PersonIdentifier.objects.filter(
|
|
service="whatsapp",
|
|
identifier__in=list(identifier_values),
|
|
)
|
|
)
|
|
for identifier in identifiers:
|
|
try:
|
|
await self.ur.xmpp.client.apply_external_reaction(
|
|
identifier.user,
|
|
identifier,
|
|
source_service="whatsapp",
|
|
emoji=str(reaction_payload.get("emoji") or ""),
|
|
remove=bool(reaction_payload.get("remove")),
|
|
upstream_message_id=str(
|
|
reaction_payload.get("target_message_id") or ""
|
|
),
|
|
upstream_ts=0,
|
|
actor=(sender or chat or ""),
|
|
payload={
|
|
"event": "reaction",
|
|
"message_id": msg_id,
|
|
},
|
|
)
|
|
except Exception as exc:
|
|
self.log.warning("whatsapp reaction relay to XMPP failed: %s", exc)
|
|
return
|
|
|
|
self._remember_contact(
|
|
sender or chat,
|
|
jid=sender,
|
|
chat=chat,
|
|
)
|
|
self._remember_history_anchor(
|
|
identifier=(chat or sender or ""),
|
|
chat_jid=str(chat or sender or ""),
|
|
msg_id=msg_id,
|
|
ts=ts,
|
|
from_me=is_from_me,
|
|
sender_jid=str(sender or ""),
|
|
)
|
|
|
|
identifier_values = self._normalize_identifier_candidates(sender, chat)
|
|
if not identifier_values:
|
|
return
|
|
|
|
identifiers = await sync_to_async(list)(
|
|
PersonIdentifier.objects.filter(
|
|
service="whatsapp",
|
|
identifier__in=list(identifier_values),
|
|
)
|
|
)
|
|
if not identifiers:
|
|
return
|
|
|
|
attachments = await self._download_event_media(event)
|
|
xmpp_attachments = []
|
|
if attachments:
|
|
fetched = await asyncio.gather(
|
|
*[transport.fetch_attachment(self.service, att) for att in attachments]
|
|
)
|
|
xmpp_attachments = [row for row in fetched if row]
|
|
|
|
payload = {
|
|
"sender": str(sender or ""),
|
|
"chat": str(chat or ""),
|
|
"raw_event": str(type(event).__name__),
|
|
}
|
|
|
|
for identifier in identifiers:
|
|
uploaded_urls = await self.ur.xmpp.client.send_from_external(
|
|
identifier.user,
|
|
identifier,
|
|
text,
|
|
is_outgoing_message=is_from_me,
|
|
attachments=xmpp_attachments,
|
|
source_ref={
|
|
"upstream_message_id": str(msg_id or ""),
|
|
"upstream_author": str(sender or chat or ""),
|
|
"upstream_ts": int(ts or 0),
|
|
},
|
|
)
|
|
display_text = text
|
|
if (not display_text) and uploaded_urls:
|
|
display_text = "\n".join(uploaded_urls)
|
|
if (not display_text) and attachments:
|
|
media_urls = [
|
|
self._blob_key_to_compose_url((att or {}).get("blob_key"))
|
|
for att in attachments
|
|
]
|
|
media_urls = [url for url in media_urls if url]
|
|
if media_urls:
|
|
display_text = "\n".join(media_urls)
|
|
|
|
session = await history.get_chat_session(identifier.user, identifier)
|
|
duplicate_exists = await sync_to_async(
|
|
Message.objects.filter(
|
|
user=identifier.user,
|
|
session=session,
|
|
ts=ts,
|
|
sender_uuid=str(sender or chat or ""),
|
|
text=display_text,
|
|
).exists
|
|
)()
|
|
if duplicate_exists:
|
|
continue
|
|
await history.store_message(
|
|
session=session,
|
|
sender=str(sender or chat or ""),
|
|
text=display_text,
|
|
ts=ts,
|
|
outgoing=is_from_me,
|
|
)
|
|
await self.ur.message_received(
|
|
self.service,
|
|
identifier=identifier,
|
|
text=display_text,
|
|
ts=ts,
|
|
payload=payload,
|
|
)
|
|
|
|
async def _handle_receipt_event(self, event):
|
|
source = (
|
|
self._pluck(event, "MessageSource")
|
|
or self._pluck(event, "info", "message_source")
|
|
or self._pluck(event, "info", "messageSource")
|
|
)
|
|
sender = self._jid_to_identifier(
|
|
self._pluck(source, "Sender") or self._pluck(source, "sender")
|
|
)
|
|
chat = self._jid_to_identifier(
|
|
self._pluck(source, "Chat") or self._pluck(source, "chat")
|
|
)
|
|
timestamps = []
|
|
raw_ids = (
|
|
self._pluck(event, "MessageIDs") or self._pluck(event, "message_ids") or []
|
|
)
|
|
if isinstance(raw_ids, (list, tuple, set)):
|
|
for item in raw_ids:
|
|
try:
|
|
value = int(item)
|
|
timestamps.append(value * 1000 if value < 10**12 else value)
|
|
except Exception:
|
|
continue
|
|
read_ts = self._normalize_timestamp(
|
|
self._pluck(event, "Timestamp")
|
|
or self._pluck(event, "timestamp")
|
|
or int(time.time() * 1000)
|
|
)
|
|
receipt_type = str(self._pluck(event, "Type") or "").strip()
|
|
self._remember_contact(
|
|
sender or chat,
|
|
jid=sender,
|
|
chat=chat,
|
|
)
|
|
|
|
for candidate in self._normalize_identifier_candidates(sender, chat):
|
|
await self.ur.message_read(
|
|
self.service,
|
|
identifier=candidate,
|
|
message_timestamps=timestamps,
|
|
read_ts=read_ts,
|
|
read_by=sender or chat,
|
|
payload={
|
|
"event": "receipt",
|
|
"type": receipt_type,
|
|
"sender": str(sender),
|
|
"chat": str(chat),
|
|
},
|
|
)
|
|
|
|
async def _handle_chat_presence_event(self, event):
|
|
source = (
|
|
self._pluck(event, "MessageSource")
|
|
or self._pluck(event, "message_source")
|
|
or {}
|
|
)
|
|
sender = self._jid_to_identifier(
|
|
self._pluck(source, "Sender") or self._pluck(source, "sender")
|
|
)
|
|
chat = self._jid_to_identifier(
|
|
self._pluck(source, "Chat") or self._pluck(source, "chat")
|
|
)
|
|
state = self._pluck(event, "State") or self._pluck(event, "state")
|
|
state_text = str(state or "").strip().lower()
|
|
is_typing = state_text in {"1", "composing", "chat_presence_composing"}
|
|
self._remember_contact(
|
|
sender or chat,
|
|
jid=sender,
|
|
chat=chat,
|
|
)
|
|
|
|
for candidate in self._normalize_identifier_candidates(sender, chat):
|
|
if is_typing:
|
|
await self.ur.started_typing(
|
|
self.service,
|
|
identifier=candidate,
|
|
payload={
|
|
"presence": state_text,
|
|
"sender": str(sender),
|
|
"chat": str(chat),
|
|
},
|
|
)
|
|
else:
|
|
await self.ur.stopped_typing(
|
|
self.service,
|
|
identifier=candidate,
|
|
payload={
|
|
"presence": state_text,
|
|
"sender": str(sender),
|
|
"chat": str(chat),
|
|
},
|
|
)
|
|
|
|
async def _handle_presence_event(self, event):
|
|
sender = self._jid_to_identifier(
|
|
self._pluck(event, "From", "User") or self._pluck(event, "from", "user")
|
|
)
|
|
is_unavailable = bool(
|
|
self._pluck(event, "Unavailable") or self._pluck(event, "unavailable")
|
|
)
|
|
self._remember_contact(sender, jid=sender)
|
|
|
|
for candidate in self._normalize_identifier_candidates(sender):
|
|
if is_unavailable:
|
|
await self.ur.stopped_typing(
|
|
self.service,
|
|
identifier=candidate,
|
|
payload={"presence": "offline", "sender": str(sender)},
|
|
)
|
|
|
|
def _extract_pair_qr(self, event):
|
|
codes = self._pluck(event, "Codes") or self._pluck(event, "codes") or []
|
|
decoded_codes = self._decode_qr_payload(codes)
|
|
if decoded_codes:
|
|
return decoded_codes
|
|
|
|
for path in (
|
|
("qr",),
|
|
("qr_code",),
|
|
("code",),
|
|
("pair_code",),
|
|
("pairCode",),
|
|
("url",),
|
|
):
|
|
value = self._pluck(event, *path)
|
|
if value:
|
|
return str(value)
|
|
return ""
|
|
|
|
def _to_jid(self, recipient):
|
|
raw = str(recipient or "").strip()
|
|
if not raw:
|
|
return ""
|
|
if "@" in raw:
|
|
return raw
|
|
digits = re.sub(r"[^0-9]", "", raw)
|
|
if digits:
|
|
# Prefer direct JID formatting for phone numbers; Neonize build_jid
|
|
# can trigger a usync lookup path that intermittently times out.
|
|
return f"{digits}@s.whatsapp.net"
|
|
if self._build_jid is not None:
|
|
try:
|
|
return self._build_jid(raw)
|
|
except Exception:
|
|
pass
|
|
return raw
|
|
|
|
def _blob_key_to_compose_url(self, blob_key):
|
|
key = str(blob_key or "").strip()
|
|
if not key:
|
|
return ""
|
|
return f"/compose/media/blob/?key={quote_plus(key)}"
|
|
|
|
async def _fetch_attachment_payload(self, attachment):
|
|
blob_key = (attachment or {}).get("blob_key")
|
|
if blob_key:
|
|
row = media_bridge.get_blob(blob_key)
|
|
if row:
|
|
return row
|
|
|
|
content = (attachment or {}).get("content")
|
|
if isinstance(content, memoryview):
|
|
content = content.tobytes()
|
|
if isinstance(content, bytes):
|
|
return {
|
|
"content": content,
|
|
"filename": (attachment or {}).get("filename") or "attachment.bin",
|
|
"content_type": (attachment or {}).get("content_type")
|
|
or "application/octet-stream",
|
|
"size": len(content),
|
|
}
|
|
|
|
url = (attachment or {}).get("url")
|
|
if url:
|
|
timeout = aiohttp.ClientTimeout(total=20)
|
|
async with aiohttp.ClientSession(timeout=timeout) as session:
|
|
async with session.get(url) as response:
|
|
if response.status != 200:
|
|
return None
|
|
payload = await response.read()
|
|
return {
|
|
"content": payload,
|
|
"filename": (attachment or {}).get("filename")
|
|
or url.rstrip("/").split("/")[-1]
|
|
or "attachment.bin",
|
|
"content_type": (attachment or {}).get("content_type")
|
|
or response.headers.get(
|
|
"Content-Type", "application/octet-stream"
|
|
),
|
|
"size": len(payload),
|
|
}
|
|
return None
|
|
|
|
async def send_message_raw(
|
|
self,
|
|
recipient,
|
|
text=None,
|
|
attachments=None,
|
|
command_id: str | None = None,
|
|
metadata: dict | None = None,
|
|
):
|
|
self._last_send_error = ""
|
|
if not self._client:
|
|
self._last_send_error = "client_missing"
|
|
return False
|
|
jid_str = self._to_jid(recipient)
|
|
if not jid_str:
|
|
self._last_send_error = "recipient_invalid"
|
|
return False
|
|
# Prefer direct JID string for sends to avoid Neonize usync/device-list
|
|
# lookups that can stall on some runtime sessions.
|
|
jid = jid_str
|
|
jid_obj = None
|
|
try:
|
|
if self._build_jid is not None:
|
|
maybe_jid = None
|
|
if "@" in jid_str:
|
|
local_part, server_part = jid_str.split("@", 1)
|
|
try:
|
|
maybe_jid = self._build_jid(local_part, server_part)
|
|
except TypeError:
|
|
maybe_jid = self._build_jid(jid_str)
|
|
else:
|
|
maybe_jid = self._build_jid(jid_str)
|
|
if hasattr(maybe_jid, "SerializeToString"):
|
|
jid_obj = maybe_jid
|
|
else:
|
|
self.log.warning(
|
|
"whatsapp build_jid returned non-JID object, falling back to string: type=%s repr=%s",
|
|
type(maybe_jid).__name__,
|
|
repr(maybe_jid)[:100],
|
|
)
|
|
except Exception as exc:
|
|
self.log.warning(
|
|
"whatsapp failed to build JID from %s, falling back to string: %s",
|
|
jid_str,
|
|
exc,
|
|
)
|
|
if not self._connected and hasattr(self._client, "connect"):
|
|
try:
|
|
await self._maybe_await(self._client.connect())
|
|
self._connected = True
|
|
self._publish_state(
|
|
connected=True,
|
|
last_event="send_reconnect_ok",
|
|
warning="",
|
|
last_error="",
|
|
)
|
|
except Exception as exc:
|
|
self._last_send_error = f"reconnect_failed:{exc}"
|
|
self._publish_state(
|
|
connected=False,
|
|
last_event="send_reconnect_failed",
|
|
last_error=str(exc),
|
|
warning=f"WhatsApp reconnect before send failed: {exc}",
|
|
)
|
|
|
|
sent_any = False
|
|
sent_ts = 0
|
|
metadata = dict(metadata or {})
|
|
xmpp_source_id = str(metadata.get("xmpp_source_id") or "").strip()
|
|
legacy_message_id = str(metadata.get("legacy_message_id") or "").strip()
|
|
person_identifier = None
|
|
if xmpp_source_id:
|
|
candidates = list(self._normalize_identifier_candidates(recipient, jid_str))
|
|
if candidates:
|
|
person_identifier = await sync_to_async(
|
|
lambda: PersonIdentifier.objects.filter(
|
|
service="whatsapp",
|
|
identifier__in=candidates,
|
|
)
|
|
.select_related("user", "person")
|
|
.first()
|
|
)()
|
|
|
|
def _extract_response_message_id(response):
|
|
return str(
|
|
self._pluck(response, "ID")
|
|
or self._pluck(response, "id")
|
|
or self._pluck(response, "Info", "ID")
|
|
or self._pluck(response, "info", "id")
|
|
or ""
|
|
).strip()
|
|
|
|
def _record_bridge(response, ts_value, body_hint=""):
|
|
if not xmpp_source_id or person_identifier is None:
|
|
return
|
|
transport.record_bridge_mapping(
|
|
user_id=person_identifier.user_id,
|
|
person_id=person_identifier.person_id,
|
|
service="whatsapp",
|
|
xmpp_message_id=xmpp_source_id,
|
|
xmpp_ts=int(metadata.get("xmpp_source_ts") or 0),
|
|
upstream_message_id=_extract_response_message_id(response),
|
|
upstream_ts=int(ts_value or 0),
|
|
text_preview=str(body_hint or metadata.get("xmpp_body") or ""),
|
|
local_message_id=legacy_message_id,
|
|
)
|
|
|
|
for attachment in attachments or []:
|
|
payload = await self._fetch_attachment_payload(attachment)
|
|
if not payload:
|
|
continue
|
|
mime = str(
|
|
payload.get("content_type") or "application/octet-stream"
|
|
).lower()
|
|
data = payload.get("content") or b""
|
|
filename = payload.get("filename") or "attachment.bin"
|
|
attachment_target = jid_obj if jid_obj is not None else jid
|
|
send_method = "document"
|
|
if mime.startswith("image/") and hasattr(self._client, "send_image"):
|
|
send_method = "image"
|
|
elif mime.startswith("video/") and hasattr(self._client, "send_video"):
|
|
send_method = "video"
|
|
elif mime.startswith("audio/") and hasattr(self._client, "send_audio"):
|
|
send_method = "audio"
|
|
|
|
if getattr(settings, "WHATSAPP_DEBUG", False):
|
|
self.log.debug(
|
|
"whatsapp media send prep: method=%s mime=%s filename=%s size=%s",
|
|
send_method,
|
|
mime,
|
|
filename,
|
|
len(data) if isinstance(data, (bytes, bytearray)) else 0,
|
|
)
|
|
|
|
try:
|
|
if mime.startswith("image/") and hasattr(self._client, "send_image"):
|
|
response = await self._maybe_await(
|
|
self._client.send_image(attachment_target, data, caption="")
|
|
)
|
|
elif mime.startswith("video/") and hasattr(self._client, "send_video"):
|
|
response = await self._maybe_await(
|
|
self._client.send_video(attachment_target, data, caption="")
|
|
)
|
|
elif mime.startswith("audio/") and hasattr(self._client, "send_audio"):
|
|
response = await self._maybe_await(
|
|
self._client.send_audio(attachment_target, data)
|
|
)
|
|
elif hasattr(self._client, "send_document"):
|
|
response = await self._maybe_await(
|
|
self._client.send_document(
|
|
attachment_target,
|
|
data,
|
|
filename=filename,
|
|
mimetype=mime,
|
|
caption="",
|
|
)
|
|
)
|
|
else:
|
|
response = None
|
|
sent_ts = max(
|
|
sent_ts,
|
|
self._normalize_timestamp(self._pluck(response, "Timestamp") or 0),
|
|
)
|
|
_record_bridge(response, sent_ts, body_hint=filename)
|
|
sent_any = True
|
|
if getattr(settings, "WHATSAPP_DEBUG", False):
|
|
self.log.debug(
|
|
"whatsapp media send ok: method=%s filename=%s ts=%s",
|
|
send_method,
|
|
filename,
|
|
self._normalize_timestamp(
|
|
self._pluck(response, "Timestamp") or 0
|
|
),
|
|
)
|
|
except Exception as exc:
|
|
self.log.warning("whatsapp attachment send failed: %s", exc)
|
|
|
|
if text:
|
|
response = None
|
|
last_error = None
|
|
# Prepare cancel key (if caller provided command_id)
|
|
cancel_key = None
|
|
try:
|
|
if command_id:
|
|
cancel_key = transport._runtime_command_cancel_key(
|
|
self.service, str(command_id)
|
|
)
|
|
except Exception:
|
|
cancel_key = None
|
|
|
|
for attempt in range(2):
|
|
# Check for a cancellation marker set by transport.cancel_runtime_command
|
|
try:
|
|
if cancel_key and cache.get(cancel_key):
|
|
self.log.info("whatsapp send cancelled via cancel marker")
|
|
self._last_send_error = "cancelled"
|
|
return False
|
|
except Exception:
|
|
pass
|
|
try:
|
|
send_target = jid_obj if jid_obj is not None else jid
|
|
# Log what we're about to send for debugging
|
|
if getattr(settings, "WHATSAPP_DEBUG", False):
|
|
self.log.debug(
|
|
f"send_message attempt {attempt+1}: target_type={type(send_target).__name__} text_type={type(text).__name__} text_len={len(text)}"
|
|
)
|
|
response = await self._call_client_method(
|
|
getattr(self._client, "send_message", None),
|
|
send_target,
|
|
text,
|
|
timeout=9.0,
|
|
)
|
|
sent_any = True
|
|
last_error = None
|
|
break
|
|
except Exception as exc:
|
|
if getattr(settings, "WHATSAPP_DEBUG", False):
|
|
self.log.debug(
|
|
f"send_message attempt {attempt+1} failed: {type(exc).__name__}: {exc}"
|
|
)
|
|
last_error = exc
|
|
|
|
error_text = (
|
|
f"{type(last_error).__name__}:{repr(last_error)}"
|
|
if last_error is not None
|
|
else ""
|
|
).lower()
|
|
is_transient = (
|
|
"usync" in error_text
|
|
or "timed out" in error_text
|
|
or "timeout" in error_text
|
|
or "device list" in error_text
|
|
or "serializetostring" in error_text
|
|
or not error_text.strip()
|
|
)
|
|
if is_transient and attempt < 1:
|
|
# If runtime rejected string target, try to build protobuf JID for retry.
|
|
if (
|
|
jid_obj is None
|
|
and self._build_jid is not None
|
|
and "@" in jid_str
|
|
):
|
|
local_part, server_part = jid_str.split("@", 1)
|
|
try:
|
|
maybe_retry_jid = self._build_jid(local_part, server_part)
|
|
except TypeError:
|
|
maybe_retry_jid = self._build_jid(jid_str)
|
|
except Exception:
|
|
maybe_retry_jid = None
|
|
if hasattr(maybe_retry_jid, "SerializeToString"):
|
|
jid_obj = maybe_retry_jid
|
|
if hasattr(self._client, "connect"):
|
|
try:
|
|
await self._maybe_await(self._client.connect())
|
|
self._connected = True
|
|
self._publish_state(
|
|
connected=True,
|
|
last_event="send_retry_reconnect_ok",
|
|
warning="",
|
|
)
|
|
except Exception as reconnect_exc:
|
|
self._publish_state(
|
|
connected=False,
|
|
last_event="send_retry_reconnect_failed",
|
|
last_error=str(reconnect_exc),
|
|
)
|
|
# Sleep but wake earlier if cancelled: poll small intervals
|
|
# Increase backoff time for device list queries
|
|
total_sleep = 0.8 * (attempt + 1)
|
|
slept = 0.0
|
|
while slept < total_sleep:
|
|
try:
|
|
if cancel_key and cache.get(cancel_key):
|
|
self.log.info(
|
|
"whatsapp send cancelled during retry backoff"
|
|
)
|
|
self._last_send_error = "cancelled"
|
|
return False
|
|
except Exception:
|
|
pass
|
|
await asyncio.sleep(0.2)
|
|
slept += 0.2
|
|
continue
|
|
break
|
|
if last_error is not None and not sent_any:
|
|
self.log.warning("whatsapp text send failed: %s", last_error)
|
|
self._last_send_error = (
|
|
f"text_send_failed:{type(last_error).__name__}:{repr(last_error)}"
|
|
)
|
|
return False
|
|
sent_ts = max(
|
|
sent_ts,
|
|
self._normalize_timestamp(self._pluck(response, "Timestamp") or 0),
|
|
)
|
|
_record_bridge(response, sent_ts, body_hint=str(text or ""))
|
|
|
|
if not sent_any:
|
|
self._last_send_error = "no_payload_sent"
|
|
return False
|
|
self._last_send_error = ""
|
|
return sent_ts or int(time.time() * 1000)
|
|
|
|
async def start_typing(self, identifier):
|
|
if not self._client:
|
|
return False
|
|
jid = self._to_jid(identifier)
|
|
if not jid:
|
|
return False
|
|
if (
|
|
hasattr(self._client, "send_chat_presence")
|
|
and self._chat_presence is not None
|
|
and self._chat_presence_media is not None
|
|
):
|
|
try:
|
|
await self._maybe_await(
|
|
self._client.send_chat_presence(
|
|
jid,
|
|
self._chat_presence.CHAT_PRESENCE_COMPOSING,
|
|
self._chat_presence_media.CHAT_PRESENCE_MEDIA_TEXT,
|
|
)
|
|
)
|
|
return True
|
|
except Exception:
|
|
pass
|
|
if hasattr(self._client, "set_chat_presence"):
|
|
try:
|
|
await self._maybe_await(
|
|
self._client.set_chat_presence(jid, "composing")
|
|
)
|
|
return True
|
|
except Exception:
|
|
pass
|
|
return False
|
|
|
|
async def stop_typing(self, identifier):
|
|
if not self._client:
|
|
return False
|
|
jid = self._to_jid(identifier)
|
|
if not jid:
|
|
return False
|
|
if (
|
|
hasattr(self._client, "send_chat_presence")
|
|
and self._chat_presence is not None
|
|
and self._chat_presence_media is not None
|
|
):
|
|
try:
|
|
await self._maybe_await(
|
|
self._client.send_chat_presence(
|
|
jid,
|
|
self._chat_presence.CHAT_PRESENCE_PAUSED,
|
|
self._chat_presence_media.CHAT_PRESENCE_MEDIA_TEXT,
|
|
)
|
|
)
|
|
return True
|
|
except Exception:
|
|
pass
|
|
if hasattr(self._client, "set_chat_presence"):
|
|
try:
|
|
await self._maybe_await(self._client.set_chat_presence(jid, "paused"))
|
|
return True
|
|
except Exception:
|
|
pass
|
|
return False
|
|
|
|
async def send_reaction(
|
|
self,
|
|
recipient,
|
|
*,
|
|
emoji,
|
|
target_message_id="",
|
|
target_timestamp=0,
|
|
remove=False,
|
|
):
|
|
if not self._client:
|
|
return False
|
|
jid = self._to_jid(recipient)
|
|
if not jid:
|
|
return False
|
|
target_id = str(target_message_id or "").strip()
|
|
if not target_id:
|
|
return False
|
|
|
|
reaction_emoji = "" if remove else str(emoji or "").strip()
|
|
candidate_names = (
|
|
"send_reaction",
|
|
"react",
|
|
"send_message_reaction",
|
|
"reaction",
|
|
)
|
|
self.log.debug(
|
|
"reaction-bridge whatsapp-send start recipient=%s target_id=%s emoji=%s remove=%s",
|
|
recipient,
|
|
target_id,
|
|
reaction_emoji or "-",
|
|
bool(remove),
|
|
)
|
|
for method_name in candidate_names:
|
|
method = getattr(self._client, method_name, None)
|
|
if method is None:
|
|
continue
|
|
attempts = [
|
|
(jid, target_id, reaction_emoji),
|
|
(jid, target_id, reaction_emoji, bool(remove)),
|
|
(jid, reaction_emoji, target_id),
|
|
]
|
|
for args in attempts:
|
|
try:
|
|
response = await self._call_client_method(
|
|
method, *args, timeout=9.0
|
|
)
|
|
if response is not None:
|
|
self.log.debug(
|
|
"reaction-bridge whatsapp-send ok method=%s args_len=%s",
|
|
method_name,
|
|
len(args),
|
|
)
|
|
return True
|
|
except Exception as exc:
|
|
self.log.debug(
|
|
"reaction-bridge whatsapp-send miss method=%s args_len=%s error=%s",
|
|
method_name,
|
|
len(args),
|
|
exc,
|
|
)
|
|
continue
|
|
self.log.warning(
|
|
"reaction-bridge whatsapp-send failed recipient=%s target_id=%s",
|
|
recipient,
|
|
target_id,
|
|
)
|
|
return False
|
|
|
|
async def fetch_attachment(self, attachment_ref):
|
|
blob_key = (attachment_ref or {}).get("blob_key")
|
|
if blob_key:
|
|
return media_bridge.get_blob(blob_key)
|
|
return None
|
|
|
|
def get_link_qr_png(self, device_name):
|
|
_ = (device_name or "").strip()
|
|
if not self._last_qr_payload:
|
|
return None
|
|
try:
|
|
return transport._as_qr_png(self._last_qr_payload)
|
|
except Exception:
|
|
return None
|
|
|
|
async def send_message_to_contact(self, contact_jid: str, text: str) -> bool:
|
|
"""Send a text message to a WhatsApp contact."""
|
|
try:
|
|
jid = self._to_jid(contact_jid)
|
|
if not jid or self._client is None:
|
|
return False
|
|
# neonize.send_message() accepts either a Message protobuf or a plain string
|
|
# If passing a string, it auto-converts to Message(conversation=text)
|
|
response = self._client.send_message(jid, text)
|
|
return response is not None
|
|
except Exception as e:
|
|
self.log.error(f"Failed to send WhatsApp message: {e}")
|
|
return False
|
|
|
|
# If you need to send a Message object explicitly:
|
|
async def send_structured_message(self, contact_jid: str, message: Message) -> bool:
|
|
"""Send a structured Message protobuf to a WhatsApp contact."""
|
|
try:
|
|
jid = self._to_jid(contact_jid)
|
|
if not jid or self._client is None:
|
|
return False
|
|
response = self._client.send_message(jid, message)
|
|
return response is not None
|
|
except Exception as e:
|
|
self.log.error(f"Failed to send structured WhatsApp message: {e}")
|
|
return False
|