import asyncio import os import re import sqlite3 import time from urllib.parse import quote_plus import aiohttp from asgiref.sync import sync_to_async from django.conf import settings from core.clients import ClientBase, transport from core.messaging import history, media_bridge from core.models import PersonIdentifier class WhatsAppClient(ClientBase): """ Async WhatsApp transport backed by Neonize. Design notes: - Runs in UR process. - Publishes runtime state to shared cache via transport. - Degrades gracefully when Neonize/session is unavailable. """ def __init__(self, ur, loop, service="whatsapp"): super().__init__(ur, loop, service) self._task = None self._stopping = False self._client = None self._build_jid = None self._connected = False self._last_qr_payload = "" self._accounts = [] self._chat_presence = None self._chat_presence_media = None self._last_pair_request = 0 self._next_qr_probe_at = 0.0 self._qr_handler_registered = False self._qr_handler_supported = False self._event_hook_callable = False self.enabled = bool( str(getattr(settings, "WHATSAPP_ENABLED", "false")).lower() in {"1", "true", "yes", "on"} ) self.client_name = str( getattr(settings, "WHATSAPP_CLIENT_NAME", "gia_whatsapp") ).strip() or "gia_whatsapp" self.database_url = str( getattr(settings, "WHATSAPP_DATABASE_URL", "") ).strip() safe_name = re.sub(r"[^a-zA-Z0-9_.-]+", "_", self.client_name) or "gia_whatsapp" # Use a persistent default path (under project mount) instead of /tmp so # link state and contact cache survive container restarts. default_db_dir = str( getattr(settings, "WHATSAPP_DB_DIR", "/var/tmp/whatsapp") ).strip() self.session_db = self.database_url or os.path.join( default_db_dir, f"{safe_name}_neonize_v2.db", ) transport.register_runtime_client(self.service, self) prior_state = transport.get_runtime_state(self.service) prior_accounts = prior_state.get("accounts") if not isinstance(prior_accounts, list): prior_accounts = [] self._publish_state( connected=False, warning=( "WhatsApp runtime is disabled by settings." if not self.enabled else "" ), accounts=prior_accounts, last_event="init", session_db=self.session_db, ) def _publish_state(self, **updates): state = transport.update_runtime_state(self.service, **updates) accounts = state.get("accounts") if isinstance(accounts, list): self._accounts = accounts def start(self): if not self.enabled: self.log.info("whatsapp client disabled by settings") return if self._task is None: self.log.info("whatsapp neonize client starting") self._task = self.loop.create_task(self._run()) async def _run(self): try: import neonize.aioze.client as wa_client_mod from neonize.aioze.client import NewAClient from neonize.aioze import events as wa_events try: from neonize.utils.enum import ChatPresence, ChatPresenceMedia except Exception: ChatPresence = None ChatPresenceMedia = None try: from neonize.utils import build_jid as wa_build_jid except Exception: wa_build_jid = None except Exception as exc: self._publish_state( connected=False, warning=f"Neonize not available: {exc}", accounts=[], last_event="neonize_import_failed", last_error=str(exc), ) self.log.warning("whatsapp neonize import failed: %s", exc) return # Neonize async module ships with its own global event loop object. # In this runtime we already have a live asyncio loop; bind Neonize's # globals to it so QR/pair callbacks and connect tasks actually execute. try: wa_events.event_global_loop = self.loop wa_client_mod.event_global_loop = self.loop self._publish_state( neonize_loop_bound=True, neonize_loop_type=str(type(self.loop).__name__), last_event="neonize_loop_bound", ) except Exception as exc: self._publish_state( neonize_loop_bound=False, last_event="neonize_loop_bind_failed", last_error=str(exc), ) self.log.warning("failed binding neonize loop: %s", exc) self._build_jid = wa_build_jid self._chat_presence = ChatPresence self._chat_presence_media = ChatPresenceMedia try: db_dir = os.path.dirname(self.session_db) if db_dir: os.makedirs(db_dir, exist_ok=True) except Exception as exc: self._publish_state( connected=False, warning=f"WhatsApp DB path setup failed: {exc}", last_event="db_path_setup_failed", last_error=str(exc), ) self.log.warning("whatsapp db path setup failed: %s", exc) return self._client = self._build_client(NewAClient) if self._client is None: self._publish_state( connected=False, warning="Failed to initialize Neonize client.", accounts=[], last_event="client_init_failed", last_error="client_none", ) return self._register_event_handlers(wa_events) try: await self._maybe_await(self._client.connect()) await self._after_connect_probe() except asyncio.CancelledError: raise except Exception as exc: self._publish_state( connected=False, warning=f"WhatsApp connect failed: {exc}", accounts=[], last_event="connect_failed", last_error=str(exc), ) self.log.warning("whatsapp connect failed: %s", exc) return # Keep task alive so state/callbacks remain active. next_heartbeat_at = 0.0 next_contacts_sync_at = 0.0 while not self._stopping: now = time.time() if now >= next_heartbeat_at: self._publish_state(runtime_seen_at=int(now)) next_heartbeat_at = now + 5.0 if now >= next_contacts_sync_at: await self._sync_contacts_from_client() next_contacts_sync_at = now + 30.0 self._mark_qr_wait_timeout() await self._sync_pair_request() await self._probe_pending_qr(now) await asyncio.sleep(1) async def _sync_pair_request(self): state = transport.get_runtime_state(self.service) requested_at = int(state.get("pair_requested_at") or 0) if requested_at <= 0 or requested_at <= self._last_pair_request: return self._last_pair_request = requested_at self._publish_state( connected=False, pair_qr="", warning="Waiting for WhatsApp QR from Neonize.", last_event="pair_request_seen", pair_status="pending", last_error="", pair_reconnect_attempted_at=int(time.time()), ) self._next_qr_probe_at = time.time() if self._client is None: return try: if hasattr(self._client, "disconnect"): await self._maybe_await(self._client.disconnect()) except Exception as exc: self._publish_state( last_event="pair_disconnect_failed", last_error=str(exc), ) self.log.warning("whatsapp disconnect before pairing failed: %s", exc) try: await self._maybe_await(self._client.connect()) self._publish_state( last_event="pair_refresh_connected", pair_refresh_connected_at=int(time.time()), ) await self._after_connect_probe() except Exception as exc: self._publish_state( connected=False, warning=f"WhatsApp pairing refresh failed: {exc}", last_event="pair_refresh_failed", last_error=str(exc), ) self.log.warning("whatsapp pairing refresh failed: %s", exc) async def _probe_pending_qr(self, now_ts: float): state = transport.get_runtime_state(self.service) status = str(state.get("pair_status") or "").strip().lower() if status != "pending": return if str(state.get("pair_qr") or "").strip(): return if now_ts < float(self._next_qr_probe_at or 0.0): return await self._after_connect_probe() latest = transport.get_runtime_state(self.service) if str(latest.get("pair_qr") or "").strip(): self._next_qr_probe_at = now_ts + 30.0 return error_text = str(latest.get("last_error") or "").strip().lower() # Neonize may report "client is nil" for a few seconds after connect. if "client is nil" in error_text: self._next_qr_probe_at = now_ts + 2.0 return self._next_qr_probe_at = now_ts + 5.0 async def _after_connect_probe(self): if self._client is None: return now_ts = int(time.time()) try: check_connected = getattr(self._client, "is_connected", None) if check_connected is not None: connected_value = ( await self._maybe_await(check_connected()) if callable(check_connected) else await self._maybe_await(check_connected) ) if connected_value: self._connected = True account = await self._resolve_account_identifier() account_list = [account] if account else list(self._accounts or []) if not account_list: account_list = [self.client_name] self._publish_state( connected=True, accounts=account_list, pair_status="connected", last_event="connected_probe", connected_at=now_ts, ) except Exception as exc: self._publish_state( last_event="connected_probe_failed", last_error=str(exc), ) if self._connected: await self._sync_contacts_from_client() # Neonize does not always emit QR callbacks after reconnect. Try explicit # QR-link fetch when available to surface pair data to the UI. try: if hasattr(self._client, "get_contact_qr_link"): qr_link = await self._maybe_await(self._client.get_contact_qr_link()) qr_payload = self._decode_qr_payload(qr_link) self._publish_state(last_qr_probe_at=now_ts) if qr_payload: self._last_qr_payload = qr_payload self._publish_state( connected=False, pair_qr=qr_payload, warning="Scan QR in WhatsApp Linked Devices.", last_event="qr_probe_success", pair_status="qr_ready", qr_received_at=now_ts, qr_probe_result="ok", last_error="", ) else: self._publish_state( last_event="qr_probe_empty", qr_probe_result="empty", ) except Exception as exc: self._publish_state( last_event="qr_probe_failed", qr_probe_result="error", last_error=str(exc), last_qr_probe_at=now_ts, ) def _register_event(self, event_cls, callback): if event_cls is None: return False if self._client is None: return False event_hook = getattr(self._client, "event", None) if not callable(event_hook): self._event_hook_callable = False return False self._event_hook_callable = True try: decorator = event_hook(event_cls) decorator(callback) return True except Exception as exc: self.log.warning( "whatsapp event registration failed (%s): %s", getattr(event_cls, "__name__", str(event_cls)), exc, ) self._publish_state( last_event="event_registration_failed", last_error=str(exc), ) return False def _register_qr_handler(self): if self._client is None: self._qr_handler_supported = False self._qr_handler_registered = False self._publish_state( qr_handler_supported=False, qr_handler_registered=False, ) return if not hasattr(self._client, "qr"): self._qr_handler_supported = False self._qr_handler_registered = False self._publish_state( qr_handler_supported=False, qr_handler_registered=False, last_event="qr_api_missing", ) return self._qr_handler_supported = True async def on_qr(client, raw_payload): qr_payload = self._decode_qr_payload(raw_payload) if not qr_payload: return self._last_qr_payload = qr_payload self._publish_state( connected=False, pair_qr=qr_payload, warning="Scan QR in WhatsApp Linked Devices.", last_event="qr_handler", pair_status="qr_ready", qr_received_at=int(time.time()), qr_probe_result="event", last_error="", ) try: self._client.qr(on_qr) self._qr_handler_registered = True self._publish_state( qr_handler_supported=True, qr_handler_registered=True, last_event="qr_handler_registered", ) except Exception as exc: self._qr_handler_registered = False self._publish_state( qr_handler_supported=True, qr_handler_registered=False, last_event="qr_handler_registration_failed", last_error=str(exc), ) self.log.warning("whatsapp qr handler registration failed: %s", exc) def _decode_qr_payload(self, raw_payload): if raw_payload is None: return "" if isinstance(raw_payload, memoryview): raw_payload = raw_payload.tobytes() if isinstance(raw_payload, bytes): return raw_payload.decode("utf-8", errors="ignore").strip() if isinstance(raw_payload, (list, tuple)): for item in raw_payload: candidate = self._decode_qr_payload(item) if candidate: return candidate return "" return str(raw_payload).strip() def _build_client(self, cls): # NewAClient first arg is the SQLite filename / DB string. try: return cls(self.session_db) except Exception as exc: self.log.warning("whatsapp client init failed (%s): %s", self.session_db, exc) self._publish_state( last_event="client_init_exception", last_error=str(exc), session_db=self.session_db, ) return None def _register_event_handlers(self, wa_events): connected_ev = getattr(wa_events, "ConnectedEv", None) message_ev = getattr(wa_events, "MessageEv", None) receipt_ev = getattr(wa_events, "ReceiptEv", None) chat_presence_ev = getattr(wa_events, "ChatPresenceEv", None) presence_ev = getattr(wa_events, "PresenceEv", None) pair_ev = getattr(wa_events, "PairStatusEv", None) qr_ev = getattr(wa_events, "QREv", None) history_sync_ev = getattr(wa_events, "HistorySyncEv", None) offline_sync_completed_ev = getattr(wa_events, "OfflineSyncCompletedEv", None) self._register_qr_handler() support = { "connected_ev": bool(connected_ev), "pair_ev": bool(pair_ev), "qr_ev": bool(qr_ev), "message_ev": bool(message_ev), "receipt_ev": bool(receipt_ev), "history_sync_ev": bool(history_sync_ev), "offline_sync_completed_ev": bool(offline_sync_completed_ev), } self._publish_state( event_hook_callable=bool(getattr(self._client, "event", None)), event_support=support, last_event="event_handlers_scanned", ) if connected_ev is not None: async def on_connected(client, event: connected_ev): self._connected = True account = await self._resolve_account_identifier() self._publish_state( connected=True, warning="", accounts=[account] if account else [self.client_name], pair_qr="", last_event="connected", pair_status="connected", connected_at=int(time.time()), last_error="", ) await self._sync_contacts_from_client() self._register_event(connected_ev, on_connected) if message_ev is not None: async def on_message(client, event: message_ev): await self._handle_message_event(event) self._register_event(message_ev, on_message) if receipt_ev is not None: async def on_receipt(client, event: receipt_ev): await self._handle_receipt_event(event) self._register_event(receipt_ev, on_receipt) if chat_presence_ev is not None: async def on_chat_presence(client, event: chat_presence_ev): await self._handle_chat_presence_event(event) self._register_event(chat_presence_ev, on_chat_presence) if presence_ev is not None: async def on_presence(client, event: presence_ev): await self._handle_presence_event(event) self._register_event(presence_ev, on_presence) if pair_ev is not None: async def on_pair_status(client, event: pair_ev): qr_payload = self._extract_pair_qr(event) if qr_payload: self._last_qr_payload = qr_payload self._publish_state( pair_qr=qr_payload, warning="Scan QR in WhatsApp Linked Devices.", last_event="pair_status_qr", pair_status="qr_ready", qr_received_at=int(time.time()), qr_probe_result="event", last_error="", ) status_raw = self._pluck(event, "Status") status_text = str(status_raw or "").strip().lower() if status_text in {"2", "success"}: account = await self._resolve_account_identifier() self._connected = True self._publish_state( connected=True, warning="", accounts=[account] if account else [self.client_name], pair_qr="", last_event="pair_status_success", pair_status="connected", connected_at=int(time.time()), last_error="", ) await self._sync_contacts_from_client() elif status_text in {"1", "error"}: error_text = str(self._pluck(event, "Error") or "").strip() self._publish_state( warning=error_text or "WhatsApp pairing failed. Retry scan.", last_event="pair_status_error", pair_status="error", last_error=error_text or "unknown_pair_error", ) self._register_event(pair_ev, on_pair_status) if qr_ev is not None: async def on_qr_event(client, event: qr_ev): # Once connected, ignore late/stale QR emissions so runtime state # does not regress from connected -> qr_ready. state = transport.get_runtime_state(self.service) if self._connected or bool(state.get("connected")): return qr_payload = self._extract_pair_qr(event) if not qr_payload: return self._last_qr_payload = qr_payload self._publish_state( connected=False, pair_qr=qr_payload, warning="Scan QR in WhatsApp Linked Devices.", last_event="qr_event", pair_status="qr_ready", qr_received_at=int(time.time()), qr_probe_result="event", last_error="", ) self._register_event(qr_ev, on_qr_event) if history_sync_ev is not None: async def on_history_sync(client, event: history_sync_ev): await self._handle_history_sync_event(event) self._register_event(history_sync_ev, on_history_sync) if offline_sync_completed_ev is not None: async def on_offline_sync_completed(client, event: offline_sync_completed_ev): self._publish_state(last_event="offline_sync_completed") await self._sync_contacts_from_client() self._register_event(offline_sync_completed_ev, on_offline_sync_completed) def _mark_qr_wait_timeout(self): state = transport.get_runtime_state(self.service) if str(state.get("pair_status") or "").strip().lower() != "pending": return requested_at = int(state.get("pair_requested_at") or 0) qr_received_at = int(state.get("qr_received_at") or 0) if requested_at <= 0 or qr_received_at > 0: return now = int(time.time()) age = now - requested_at # Avoid spamming writes while still surfacing a clear timeout state. if age < 15 or (age % 10) != 0: return self._publish_state( last_event="pair_waiting_no_qr", warning=( "Waiting for WhatsApp QR from Neonize. " "No QR callback received yet." ), ) async def _maybe_await(self, value): if asyncio.iscoroutine(value): return await value return value async def _resolve_account_identifier(self): if self._client is None: return "" if not hasattr(self._client, "get_me"): return self.client_name try: me = await self._maybe_await(self._client.get_me()) except Exception: return self.client_name # Support both dict-like and object-like payloads. for path in ( ("JID",), ("jid",), ): value = self._jid_to_identifier(self._pluck(me, *path)) if value: return value for path in ( ("JID", "User"), ("jid",), ("user",), ("ID",), ): value = self._pluck(me, *path) if value: return str(value) if hasattr(self._client, "get_all_devices"): try: devices = await self._maybe_await(self._client.get_all_devices()) if devices: jid = self._jid_to_identifier(self._pluck(devices[0], "JID")) if jid: return jid except Exception: pass return self.client_name def _pluck(self, obj, *path): current = obj for key in path: if current is None: return None if isinstance(current, dict): current = current.get(key) continue if hasattr(current, key): current = getattr(current, key) continue return None return current def _normalize_timestamp(self, raw_value): if raw_value is None: return int(time.time() * 1000) try: value = int(raw_value) except Exception: return int(time.time() * 1000) # WhatsApp libs often emit seconds. Promote to ms. if value < 10**12: return value * 1000 return value def _normalize_identifier_candidates(self, *values): out = set() for value in values: raw = self._jid_to_identifier(value) if not raw: continue out.add(raw) if "@" in raw: out.add(raw.split("@", 1)[0]) digits = re.sub(r"[^0-9]", "", raw) if digits: out.add(digits) if not digits.startswith("+"): out.add(f"+{digits}") return out async def _sync_contacts_from_client(self): if self._client is None: return connected_now = await self._is_contact_sync_ready() if not connected_now: self._publish_state( last_event="contacts_sync_skipped_disconnected", contacts_source="disconnected", ) return # NOTE: Neonize get_all_contacts has crashed some runtime builds with a Go panic. # Read contact-like rows directly from the session sqlite DB instead. contacts, source = await self._sync_contacts_from_sqlite() if not contacts: self.log.info("whatsapp contacts sync empty (%s)", source or "unknown") self._publish_state( last_event="contacts_sync_empty", contacts_source=source or "unknown", ) return self.log.info( "whatsapp contacts synced: count=%s source=%s", len(contacts), source or "unknown", ) self._publish_state( contacts=contacts, contacts_synced_at=int(time.time()), contacts_sync_count=len(contacts), last_event="contacts_synced", contacts_source=source or "unknown", last_error="", ) async def _sync_contacts_from_sqlite(self): def _extract(): if not self.session_db or not os.path.exists(self.session_db): return [], "sqlite_missing" try: conn = sqlite3.connect(self.session_db) conn.row_factory = sqlite3.Row except Exception: return [], "sqlite_open_failed" try: cur = conn.cursor() table_rows = cur.execute( "SELECT name FROM sqlite_master WHERE type='table'" ).fetchall() table_names = [str(row[0]) for row in table_rows if row and row[0]] # Prefer the canonical contacts table when available. if "whatsmeow_contacts" in table_names: own_ids = set() for value in ( transport.get_runtime_state(self.service).get("accounts") or [] ): base = str(value or "").strip().split("@", 1)[0] base = base.split(":", 1)[0] if base: own_ids.add(base.lower()) try: for row in cur.execute( 'SELECT DISTINCT "our_jid" FROM "whatsmeow_contacts"' ).fetchall(): base = str((row or [None])[0] or "").strip().split("@", 1)[0] base = base.split(":", 1)[0] if base: own_ids.add(base.lower()) except Exception: pass out = [] seen = set() try: rows = cur.execute( 'SELECT "their_jid", "first_name", "full_name", "push_name", "business_name" ' 'FROM "whatsmeow_contacts" LIMIT 5000' ).fetchall() except Exception: rows = [] for their_jid, first_name, full_name, push_name, business_name in rows: jid_value = str(their_jid or "").strip() if "@s.whatsapp.net" not in jid_value and "@lid" not in jid_value: continue identifier = jid_value.split("@", 1)[0].strip().split(":", 1)[0] if not identifier: continue if identifier.lower() in own_ids: continue if identifier in seen: continue seen.add(identifier) display_name = ( str(push_name or "").strip() or str(full_name or "").strip() or str(business_name or "").strip() or str(first_name or "").strip() ) out.append( { "identifier": identifier, "jid": jid_value, "name": display_name, "chat": "", "seen_at": int(time.time()), } ) if len(out) >= 500: break if out: return out, "sqlite_contacts" account_keys = { str(value or "").strip().split("@", 1)[0].lower() for value in ( transport.get_runtime_state(self.service).get("accounts") or [] ) if str(value or "").strip() } seen = set() out = [] for table in table_names: try: columns = [ str(row[1] or "") for row in cur.execute( f'PRAGMA table_info("{table}")' ).fetchall() ] except Exception: continue if not columns: continue jid_cols = [ col for col in columns if any( token in col.lower() for token in ( "jid", "phone", "chat", "sender", "recipient", "participant", "from", "to", "user", ) ) ] name_cols = [ col for col in columns if "name" in col.lower() or "push" in col.lower() ] if not jid_cols: continue select_cols = list(dict.fromkeys(jid_cols + name_cols))[:6] quoted = ", ".join(f'"{col}"' for col in select_cols) try: rows = cur.execute( f'SELECT {quoted} FROM "{table}" LIMIT 1000' ).fetchall() except Exception: continue for row in rows: row_map = {col: row[idx] for idx, col in enumerate(select_cols)} jid_value = "" for col in jid_cols: raw = str(row_map.get(col) or "").strip() if "@s.whatsapp.net" in raw: m = re.search(r"([0-9]{6,20})@s\.whatsapp\.net", raw) jid_value = ( f"{m.group(1)}@s.whatsapp.net" if m else raw.split()[0] ) break if raw.endswith("@lid"): jid_value = raw break digits = re.sub(r"[^0-9]", "", raw) if digits: jid_value = f"{digits}@s.whatsapp.net" break if not jid_value: continue identifier = jid_value.split("@", 1)[0].strip() if not identifier: continue if identifier.lower() in account_keys: continue if identifier in seen: continue seen.add(identifier) display_name = "" for col in name_cols: candidate = str(row_map.get(col) or "").strip() if candidate and candidate not in {"~", "-", "_"}: display_name = candidate break out.append( { "identifier": identifier, "jid": jid_value, "name": display_name, "chat": "", "seen_at": int(time.time()), } ) if len(out) >= 500: return out, "sqlite_tables" return out, "sqlite_tables" finally: conn.close() return await asyncio.to_thread(_extract) async def _is_contact_sync_ready(self) -> bool: if self._client is None: return False if self._connected: return True state = transport.get_runtime_state(self.service) if bool(state.get("connected")): return True pair_status = str(state.get("pair_status") or "").strip().lower() if pair_status == "connected": return True check_connected = getattr(self._client, "is_connected", None) if check_connected is None: return False try: value = ( await self._maybe_await(check_connected()) if callable(check_connected) else await self._maybe_await(check_connected) ) except Exception: return False if value: self._connected = True self._publish_state(connected=True, warning="", pair_status="connected") return True if hasattr(self._client, "get_me"): try: me = await self._maybe_await(self._client.get_me()) if me: self._connected = True self._publish_state(connected=True, warning="", pair_status="connected") return True except Exception: pass return False async def _handle_history_sync_event(self, event): data = self._pluck(event, "Data") or self._pluck(event, "data") if data is None: return pushname_rows = ( self._pluck(data, "pushnames") or self._pluck(data, "Pushnames") or [] ) pushname_map = {} for row in pushname_rows: raw_id = ( self._jid_to_identifier(self._pluck(row, "ID")) or self._jid_to_identifier(self._pluck(row, "id")) ) if not raw_id: continue pushname = str( self._pluck(row, "pushname") or self._pluck(row, "Pushname") or "" ).strip() if not pushname: continue pushname_map[raw_id] = pushname pushname_map[raw_id.split("@", 1)[0]] = pushname conversation_rows = ( self._pluck(data, "conversations") or self._pluck(data, "Conversations") or [] ) found = 0 for row in conversation_rows: jid = "" for candidate in ( self._pluck(row, "ID"), self._pluck(row, "id"), self._pluck(row, "pnJID"), self._pluck(row, "pnJid"), self._pluck(row, "newJID"), self._pluck(row, "newJid"), self._pluck(row, "oldJID"), self._pluck(row, "oldJid"), ): parsed = self._jid_to_identifier(candidate) if parsed: jid = parsed break if not jid or "@g.us" in jid or "@broadcast" in jid: continue identifier = jid.split("@", 1)[0].strip() if not identifier or not re.search(r"[0-9]{6,}", identifier): continue name = str( self._pluck(row, "displayName") or self._pluck(row, "DisplayName") or self._pluck(row, "name") or self._pluck(row, "Name") or self._pluck(row, "username") or self._pluck(row, "Username") or pushname_map.get(jid) or pushname_map.get(identifier) or "" ).strip() self._remember_contact(identifier, jid=jid, name=name) found += 1 if found: state = transport.get_runtime_state(self.service) current_count = int(state.get("contacts_sync_count") or 0) self._publish_state( contacts_source="history_sync", contacts_synced_at=int(time.time()), contacts_sync_count=max(current_count, found), last_event="history_sync_contacts", last_error="", ) def _remember_contact(self, identifier, *, jid="", name="", chat=""): cleaned = str(identifier or "").strip() if "@" in cleaned: cleaned = cleaned.split("@", 1)[0] if not cleaned: return state = transport.get_runtime_state(self.service) existing = state.get("contacts") or [] rows = [item for item in existing if isinstance(item, dict)] merged = [] seen = set() now_ts = int(time.time()) row = { "identifier": cleaned, "jid": str(jid or "").strip(), "name": str(name or "").strip(), "chat": str(chat or "").strip(), "seen_at": now_ts, } merged.append(row) seen.add(cleaned) for item in rows: candidate = str(item.get("identifier") or item.get("jid") or "").strip() if not candidate or candidate in seen: continue seen.add(candidate) merged.append(item) if len(merged) >= 500: break self._publish_state(contacts=merged, last_contact_seen_at=now_ts) def _jid_to_identifier(self, value): raw = str(value or "").strip() if not raw: return "" if not isinstance(value, str): user = self._pluck(value, "User") or self._pluck(value, "user") server = self._pluck(value, "Server") or self._pluck(value, "server") if user and server: return f"{user}@{server}" if user: return str(user) return raw def _is_media_message(self, message_obj): media_fields = ( "imageMessage", "videoMessage", "audioMessage", "documentMessage", "stickerMessage", "image_message", "video_message", "audio_message", "document_message", "sticker_message", ) for field in media_fields: value = self._pluck(message_obj, field) if value: return True return False async def _download_event_media(self, event): if not self._client: return [] msg_obj = self._pluck(event, "message") or self._pluck(event, "Message") if msg_obj is None or not self._is_media_message(msg_obj): return [] if not hasattr(self._client, "download_any"): return [] try: payload = await self._maybe_await(self._client.download_any(msg_obj)) except Exception as exc: self.log.warning("whatsapp media download failed: %s", exc) return [] if isinstance(payload, memoryview): payload = payload.tobytes() if not isinstance(payload, (bytes, bytearray)): return [] filename = ( self._pluck(msg_obj, "documentMessage", "fileName") or self._pluck(msg_obj, "document_message", "file_name") or f"wa-{int(time.time())}.bin" ) content_type = ( self._pluck(msg_obj, "documentMessage", "mimetype") or self._pluck(msg_obj, "document_message", "mimetype") or self._pluck(msg_obj, "imageMessage", "mimetype") or self._pluck(msg_obj, "image_message", "mimetype") or "application/octet-stream" ) blob_key = media_bridge.put_blob( service="whatsapp", content=bytes(payload), filename=filename, content_type=content_type, ) if not blob_key: return [] return [ { "blob_key": blob_key, "filename": filename, "content_type": content_type, "size": len(payload), } ] async def _handle_message_event(self, event): msg_obj = self._pluck(event, "message") or self._pluck(event, "Message") text = ( self._pluck(msg_obj, "conversation") or self._pluck(msg_obj, "Conversation") or self._pluck(msg_obj, "extendedTextMessage", "text") or self._pluck(msg_obj, "ExtendedTextMessage", "Text") or self._pluck(msg_obj, "extended_text_message", "text") or "" ) source = ( self._pluck(event, "Info", "MessageSource") or self._pluck(event, "info", "message_source") or self._pluck(event, "info", "messageSource") ) is_from_me = bool( self._pluck(source, "IsFromMe") or self._pluck(source, "isFromMe") ) if is_from_me: return sender = self._jid_to_identifier( self._pluck(source, "Sender") or self._pluck(source, "sender") or self._pluck(source, "SenderAlt") or self._pluck(source, "senderAlt") ) chat = self._jid_to_identifier( self._pluck(source, "Chat") or self._pluck(source, "chat") ) raw_ts = ( self._pluck(event, "Info", "Timestamp") or self._pluck(event, "info", "timestamp") or self._pluck(event, "info", "message_timestamp") or self._pluck(event, "Timestamp") or self._pluck(event, "timestamp") ) ts = self._normalize_timestamp(raw_ts) self._remember_contact( sender or chat, jid=sender, chat=chat, ) identifier_values = self._normalize_identifier_candidates(sender, chat) if not identifier_values: return identifiers = await sync_to_async(list)( PersonIdentifier.objects.filter( service="whatsapp", identifier__in=list(identifier_values), ) ) if not identifiers: return attachments = await self._download_event_media(event) xmpp_attachments = [] if attachments: fetched = await asyncio.gather( *[transport.fetch_attachment(self.service, att) for att in attachments] ) xmpp_attachments = [row for row in fetched if row] payload = { "sender": str(sender or ""), "chat": str(chat or ""), "raw_event": str(type(event).__name__), } for identifier in identifiers: uploaded_urls = await self.ur.xmpp.client.send_from_external( identifier.user, identifier, text, is_outgoing_message=False, attachments=xmpp_attachments, ) display_text = text if (not display_text) and uploaded_urls: display_text = "\n".join(uploaded_urls) if (not display_text) and attachments: media_urls = [ self._blob_key_to_compose_url((att or {}).get("blob_key")) for att in attachments ] media_urls = [url for url in media_urls if url] if media_urls: display_text = "\n".join(media_urls) session = await history.get_chat_session(identifier.user, identifier) await history.store_message( session=session, sender=str(sender or chat or ""), text=display_text, ts=ts, outgoing=False, ) await self.ur.message_received( self.service, identifier=identifier, text=display_text, ts=ts, payload=payload, ) async def _handle_receipt_event(self, event): source = ( self._pluck(event, "MessageSource") or self._pluck(event, "info", "message_source") or self._pluck(event, "info", "messageSource") ) sender = self._jid_to_identifier( self._pluck(source, "Sender") or self._pluck(source, "sender") ) chat = self._jid_to_identifier( self._pluck(source, "Chat") or self._pluck(source, "chat") ) timestamps = [] raw_ids = self._pluck(event, "MessageIDs") or self._pluck(event, "message_ids") or [] if isinstance(raw_ids, (list, tuple, set)): for item in raw_ids: try: value = int(item) timestamps.append(value * 1000 if value < 10**12 else value) except Exception: continue read_ts = self._normalize_timestamp( self._pluck(event, "Timestamp") or self._pluck(event, "timestamp") or int(time.time() * 1000) ) receipt_type = str(self._pluck(event, "Type") or "").strip() self._remember_contact( sender or chat, jid=sender, chat=chat, ) for candidate in self._normalize_identifier_candidates(sender, chat): await self.ur.message_read( self.service, identifier=candidate, message_timestamps=timestamps, read_ts=read_ts, read_by=sender or chat, payload={ "event": "receipt", "type": receipt_type, "sender": str(sender), "chat": str(chat), }, ) async def _handle_chat_presence_event(self, event): source = ( self._pluck(event, "MessageSource") or self._pluck(event, "message_source") or {} ) sender = self._jid_to_identifier( self._pluck(source, "Sender") or self._pluck(source, "sender") ) chat = self._jid_to_identifier( self._pluck(source, "Chat") or self._pluck(source, "chat") ) state = self._pluck(event, "State") or self._pluck(event, "state") state_text = str(state or "").strip().lower() is_typing = state_text in {"1", "composing", "chat_presence_composing"} self._remember_contact( sender or chat, jid=sender, chat=chat, ) for candidate in self._normalize_identifier_candidates(sender, chat): if is_typing: await self.ur.started_typing( self.service, identifier=candidate, payload={"presence": state_text, "sender": str(sender), "chat": str(chat)}, ) else: await self.ur.stopped_typing( self.service, identifier=candidate, payload={"presence": state_text, "sender": str(sender), "chat": str(chat)}, ) async def _handle_presence_event(self, event): sender = self._jid_to_identifier( self._pluck(event, "From", "User") or self._pluck(event, "from", "user") ) is_unavailable = bool( self._pluck(event, "Unavailable") or self._pluck(event, "unavailable") ) self._remember_contact(sender, jid=sender) for candidate in self._normalize_identifier_candidates(sender): if is_unavailable: await self.ur.stopped_typing( self.service, identifier=candidate, payload={"presence": "offline", "sender": str(sender)}, ) def _extract_pair_qr(self, event): codes = self._pluck(event, "Codes") or self._pluck(event, "codes") or [] decoded_codes = self._decode_qr_payload(codes) if decoded_codes: return decoded_codes for path in ( ("qr",), ("qr_code",), ("code",), ("pair_code",), ("pairCode",), ("url",), ): value = self._pluck(event, *path) if value: return str(value) return "" def _to_jid(self, recipient): raw = str(recipient or "").strip() if not raw: return "" if self._build_jid is not None: try: return self._build_jid(raw) except Exception: pass if "@" in raw: return raw digits = re.sub(r"[^0-9]", "", raw) if digits: return f"{digits}@s.whatsapp.net" return raw def _blob_key_to_compose_url(self, blob_key): key = str(blob_key or "").strip() if not key: return "" return f"/compose/media/blob/?key={quote_plus(key)}" async def _fetch_attachment_payload(self, attachment): blob_key = (attachment or {}).get("blob_key") if blob_key: row = media_bridge.get_blob(blob_key) if row: return row content = (attachment or {}).get("content") if isinstance(content, memoryview): content = content.tobytes() if isinstance(content, bytes): return { "content": content, "filename": (attachment or {}).get("filename") or "attachment.bin", "content_type": (attachment or {}).get("content_type") or "application/octet-stream", "size": len(content), } url = (attachment or {}).get("url") if url: timeout = aiohttp.ClientTimeout(total=20) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(url) as response: if response.status != 200: return None payload = await response.read() return { "content": payload, "filename": (attachment or {}).get("filename") or url.rstrip("/").split("/")[-1] or "attachment.bin", "content_type": (attachment or {}).get("content_type") or response.headers.get( "Content-Type", "application/octet-stream" ), "size": len(payload), } return None async def send_message_raw(self, recipient, text=None, attachments=None): if not self._client: return False jid = self._to_jid(recipient) if not jid: return False sent_any = False sent_ts = 0 for attachment in attachments or []: payload = await self._fetch_attachment_payload(attachment) if not payload: continue mime = str(payload.get("content_type") or "application/octet-stream").lower() data = payload.get("content") or b"" filename = payload.get("filename") or "attachment.bin" try: if mime.startswith("image/") and hasattr(self._client, "send_image"): response = await self._maybe_await( self._client.send_image(jid, data, caption="") ) elif mime.startswith("video/") and hasattr(self._client, "send_video"): response = await self._maybe_await( self._client.send_video(jid, data, caption="") ) elif mime.startswith("audio/") and hasattr(self._client, "send_audio"): response = await self._maybe_await(self._client.send_audio(jid, data)) elif hasattr(self._client, "send_document"): response = await self._maybe_await( self._client.send_document( jid, data, filename=filename, mimetype=mime, caption="", ) ) else: response = None sent_ts = max( sent_ts, self._normalize_timestamp(self._pluck(response, "Timestamp") or 0), ) sent_any = True except Exception as exc: self.log.warning("whatsapp attachment send failed: %s", exc) if text: try: response = await self._maybe_await(self._client.send_message(jid, text)) sent_any = True except TypeError: response = await self._maybe_await( self._client.send_message(jid, message=text) ) sent_any = True except Exception as exc: self.log.warning("whatsapp text send failed: %s", exc) return False sent_ts = max( sent_ts, self._normalize_timestamp(self._pluck(response, "Timestamp") or 0), ) if not sent_any: return False return sent_ts or int(time.time() * 1000) async def start_typing(self, identifier): if not self._client: return False jid = self._to_jid(identifier) if not jid: return False if ( hasattr(self._client, "send_chat_presence") and self._chat_presence is not None and self._chat_presence_media is not None ): try: await self._maybe_await( self._client.send_chat_presence( jid, self._chat_presence.CHAT_PRESENCE_COMPOSING, self._chat_presence_media.CHAT_PRESENCE_MEDIA_TEXT, ) ) return True except Exception: pass if hasattr(self._client, "set_chat_presence"): try: await self._maybe_await(self._client.set_chat_presence(jid, "composing")) return True except Exception: pass return False async def stop_typing(self, identifier): if not self._client: return False jid = self._to_jid(identifier) if not jid: return False if ( hasattr(self._client, "send_chat_presence") and self._chat_presence is not None and self._chat_presence_media is not None ): try: await self._maybe_await( self._client.send_chat_presence( jid, self._chat_presence.CHAT_PRESENCE_PAUSED, self._chat_presence_media.CHAT_PRESENCE_MEDIA_TEXT, ) ) return True except Exception: pass if hasattr(self._client, "set_chat_presence"): try: await self._maybe_await(self._client.set_chat_presence(jid, "paused")) return True except Exception: pass return False async def fetch_attachment(self, attachment_ref): blob_key = (attachment_ref or {}).get("blob_key") if blob_key: return media_bridge.get_blob(blob_key) return None def get_link_qr_png(self, device_name): _ = (device_name or "").strip() if not self._last_qr_payload: return None try: return transport._as_qr_png(self._last_qr_payload) except Exception: return None