diff --git a/app/settings.py b/app/settings.py index d1182b3..3350ba8 100644 --- a/app/settings.py +++ b/app/settings.py @@ -97,6 +97,12 @@ MIDDLEWARE = [ ROOT_URLCONF = "app.urls" ASGI_APPLICATION = "app.asgi.application" +COMPOSE_WS_ENABLED = os.environ.get("COMPOSE_WS_ENABLED", "false").lower() in { + "1", + "true", + "yes", + "on", +} TEMPLATES = [ { diff --git a/app/urls.py b/app/urls.py index 5ed01e5..8121d89 100644 --- a/app/urls.py +++ b/app/urls.py @@ -184,6 +184,11 @@ urlpatterns = [ compose.ComposeThread.as_view(), name="compose_thread", ), + path( + "compose/history-sync/", + compose.ComposeHistorySync.as_view(), + name="compose_history_sync", + ), path( "compose/media/blob/", compose.ComposeMediaBlob.as_view(), diff --git a/core/clients/signal.py b/core/clients/signal.py index 059617e..7b6a2d6 100644 --- a/core/clients/signal.py +++ b/core/clients/signal.py @@ -22,12 +22,7 @@ if _signal_http_url: parsed = urlparse( _signal_http_url if "://" in _signal_http_url else f"http://{_signal_http_url}" ) - configured_host = (parsed.hostname or "").strip().lower() - runtime = os.getenv("container", "").strip().lower() - if configured_host == "signal" and runtime == "podman": - SIGNAL_HOST = "127.0.0.1" - else: - SIGNAL_HOST = parsed.hostname or "signal" + SIGNAL_HOST = parsed.hostname or "signal" SIGNAL_PORT = parsed.port or 8080 else: if settings.DEBUG: @@ -276,18 +271,34 @@ class HandleMessage(Command): envelope_source_uuid = envelope.get("sourceUuid") envelope_source_number = envelope.get("sourceNumber") envelope_source = envelope.get("source") + destination_number = ( + raw.get("envelope", {}) + .get("syncMessage", {}) + .get("sentMessage", {}) + .get("destination") + ) primary_identifier = dest if is_from_bot else source_uuid - identifier_candidates = _identifier_candidates( - primary_identifier, - source_uuid, - source_number, - source_value, - envelope_source_uuid, - envelope_source_number, - envelope_source, - dest, - ) + if is_from_bot: + # Outbound events must route only by destination identity. + # Including the bot's own UUID/number leaks messages across people + # if "self" identifiers are linked anywhere. + identifier_candidates = _identifier_candidates( + dest, + destination_number, + primary_identifier, + ) + else: + identifier_candidates = _identifier_candidates( + primary_identifier, + source_uuid, + source_number, + source_value, + envelope_source_uuid, + envelope_source_number, + envelope_source, + dest, + ) if not identifier_candidates: log.warning("No Signal identifier available for message routing.") return diff --git a/core/clients/transport.py b/core/clients/transport.py index 1071814..e99b353 100644 --- a/core/clients/transport.py +++ b/core/clients/transport.py @@ -19,6 +19,8 @@ from core.util import logs log = logs.get_logger("transport") _RUNTIME_STATE_TTL = 60 * 60 * 24 +_RUNTIME_COMMANDS_TTL = 60 * 15 +_RUNTIME_COMMAND_RESULT_TTL = 60 _RUNTIME_CLIENTS: dict[str, Any] = {} @@ -30,6 +32,14 @@ def _runtime_key(service: str) -> str: return f"gia:service:runtime:{_service_key(service)}" +def _runtime_commands_key(service: str) -> str: + return f"gia:service:commands:{_service_key(service)}" + + +def _runtime_command_result_key(service: str, command_id: str) -> str: + return f"gia:service:command-result:{_service_key(service)}:{command_id}" + + def _gateway_base(service: str) -> str: key = f"{service.upper()}_HTTP_URL" default = f"http://{service}:8080" @@ -78,6 +88,59 @@ def update_runtime_state(service: str, **updates): return state +def enqueue_runtime_command(service: str, action: str, payload: dict | None = None) -> str: + service_key = _service_key(service) + command_id = secrets.token_hex(12) + command = { + "id": command_id, + "action": str(action or "").strip(), + "payload": dict(payload or {}), + "created_at": int(time.time()), + } + key = _runtime_commands_key(service_key) + queued = list(cache.get(key) or []) + queued.append(command) + # Keep queue bounded to avoid unbounded growth. + if len(queued) > 200: + queued = queued[-200:] + cache.set(key, queued, timeout=_RUNTIME_COMMANDS_TTL) + return command_id + + +def pop_runtime_command(service: str) -> dict[str, Any] | None: + service_key = _service_key(service) + key = _runtime_commands_key(service_key) + queued = list(cache.get(key) or []) + if not queued: + return None + command = dict(queued.pop(0) or {}) + cache.set(key, queued, timeout=_RUNTIME_COMMANDS_TTL) + return command + + +def set_runtime_command_result(service: str, command_id: str, result: dict | None = None): + service_key = _service_key(service) + result_key = _runtime_command_result_key(service_key, command_id) + payload = dict(result or {}) + payload.setdefault("completed_at", int(time.time())) + cache.set(result_key, payload, timeout=_RUNTIME_COMMAND_RESULT_TTL) + + +async def wait_runtime_command_result(service: str, command_id: str, timeout: float = 20.0): + service_key = _service_key(service) + result_key = _runtime_command_result_key(service_key, command_id) + deadline = time.monotonic() + max(0.1, float(timeout or 0.0)) + while time.monotonic() < deadline: + payload = cache.get(result_key) + if payload is not None: + cache.delete(result_key) + if isinstance(payload, dict): + return dict(payload) + return {} + await asyncio.sleep(0.2) + return None + + def list_accounts(service: str): """ Return account identifiers for service UI list. @@ -365,7 +428,37 @@ async def send_message_raw(service: str, recipient: str, text=None, attachments= return runtime_result except Exception as exc: log.warning("%s runtime send failed: %s", service_key, exc) - log.warning("whatsapp send skipped: runtime is unavailable or not paired") + # Web/UI process cannot access UR in-process runtime client directly. + # Hand off send to UR via shared cache command queue. + command_attachments = [] + for att in attachments or []: + row = dict(att or {}) + # Keep payload cache-friendly and avoid embedding raw bytes. + for key in ("content",): + row.pop(key, None) + command_attachments.append(row) + command_id = enqueue_runtime_command( + service_key, + "send_message_raw", + { + "recipient": recipient, + "text": text or "", + "attachments": command_attachments, + }, + ) + command_result = await wait_runtime_command_result( + service_key, + command_id, + timeout=20.0, + ) + if isinstance(command_result, dict): + if command_result.get("ok"): + ts = _parse_timestamp(command_result) + return ts if ts else True + err = str(command_result.get("error") or "").strip() + log.warning("whatsapp queued send failed: %s", err or "unknown") + return False + log.warning("whatsapp queued send timed out waiting for runtime result") return False if service_key == "instagram": diff --git a/core/clients/whatsapp.py b/core/clients/whatsapp.py index 1ff58ee..34e7f2f 100644 --- a/core/clients/whatsapp.py +++ b/core/clients/whatsapp.py @@ -11,7 +11,7 @@ from django.conf import settings from core.clients import ClientBase, transport from core.messaging import history, media_bridge -from core.models import PersonIdentifier +from core.models import Message, PersonIdentifier class WhatsAppClient(ClientBase): @@ -190,6 +190,7 @@ class WhatsAppClient(ClientBase): if now >= next_heartbeat_at: self._publish_state(runtime_seen_at=int(now)) next_heartbeat_at = now + 5.0 + await self._drain_runtime_commands() if now >= next_contacts_sync_at: await self._sync_contacts_from_client() next_contacts_sync_at = now + 30.0 @@ -460,9 +461,16 @@ class WhatsAppClient(ClientBase): "history_sync_ev": bool(history_sync_ev), "offline_sync_completed_ev": bool(offline_sync_completed_ev), } + event_names = [name for name in dir(wa_events) if name.endswith("Ev")] + message_like_names = [ + name + for name in event_names + if "message" in name.lower() or name.lower().startswith("msg") + ] self._publish_state( event_hook_callable=bool(getattr(self._client, "event", None)), event_support=support, + event_message_candidates=message_like_names[:20], last_event="event_handlers_scanned", ) @@ -485,12 +493,26 @@ class WhatsAppClient(ClientBase): self._register_event(connected_ev, on_connected) + async def on_message_any(client, event): + await self._handle_message_event(event) + + registered_message_events = [] if message_ev is not None: - - async def on_message(client, event: message_ev): - await self._handle_message_event(event) - - self._register_event(message_ev, on_message) + if self._register_event(message_ev, on_message_any): + registered_message_events.append( + getattr(message_ev, "__name__", str(message_ev)) + ) + for attr in message_like_names: + candidate = getattr(wa_events, attr, None) + if candidate is None or candidate is message_ev: + continue + if self._register_event(candidate, on_message_any): + registered_message_events.append(attr) + if registered_message_events: + self._publish_state( + registered_message_events=registered_message_events[:20], + last_event="message_events_registered", + ) if receipt_ev is not None: @@ -621,6 +643,538 @@ class WhatsAppClient(ClientBase): return await value return value + async def _drain_runtime_commands(self): + # Process a small burst each loop to keep sends responsive but avoid starvation. + for _ in range(5): + command = transport.pop_runtime_command(self.service) + if not command: + return + await self._execute_runtime_command(command) + + async def _execute_runtime_command(self, command): + command_id = str((command or {}).get("id") or "").strip() + action = str((command or {}).get("action") or "").strip() + payload = dict((command or {}).get("payload") or {}) + if not command_id: + return + + if action == "send_message_raw": + recipient = str(payload.get("recipient") or "").strip() + text = payload.get("text") + attachments = payload.get("attachments") or [] + try: + result = await self.send_message_raw( + recipient=recipient, + text=text, + attachments=attachments, + ) + if result is not False and result is not None: + transport.set_runtime_command_result( + self.service, + command_id, + { + "ok": True, + "timestamp": int(result) + if isinstance(result, int) + else int(time.time() * 1000), + }, + ) + return + transport.set_runtime_command_result( + self.service, + command_id, + { + "ok": False, + "error": "runtime_send_failed", + }, + ) + return + except Exception as exc: + transport.set_runtime_command_result( + self.service, + command_id, + { + "ok": False, + "error": str(exc), + }, + ) + return + + if action == "force_history_sync": + target_identifier = str(payload.get("identifier") or "").strip() + try: + result = await self._force_history_sync(target_identifier) + except Exception as exc: + transport.set_runtime_command_result( + self.service, + command_id, + { + "ok": False, + "error": str(exc), + }, + ) + return + transport.set_runtime_command_result( + self.service, + command_id, + { + "ok": True, + **dict(result or {}), + }, + ) + return + + transport.set_runtime_command_result( + self.service, + command_id, + { + "ok": False, + "error": f"unsupported_action:{action or '-'}", + }, + ) + + async def _force_history_sync(self, identifier: str = ""): + started_at = int(time.time()) + self._publish_state( + last_event="manual_history_sync_started", + history_sync_running=True, + history_sync_started_at=started_at, + history_sync_target=identifier or "", + ) + try: + await self._sync_contacts_from_client() + history_request = await self._request_on_demand_history( + identifier=identifier, + count=120, + ) + self._publish_state( + history_on_demand_requested=bool(history_request.get("requested")), + history_on_demand_error=str(history_request.get("error") or ""), + history_on_demand_anchor=str(history_request.get("anchor_key") or ""), + history_on_demand_at=int(time.time()), + ) + if history_request.get("requested"): + # Give on-demand history chunks a brief window to arrive before command returns. + await asyncio.sleep(3) + # Best-effort probe: reconnect state and QR/connection probes can unlock + # pending history sync callbacks in some runtime sessions. + await self._after_connect_probe() + sqlite_sync = await self._sync_history_from_sqlite(identifier=identifier) + self._publish_state( + history_sqlite_imported=int(sqlite_sync.get("imported", 0)), + history_sqlite_scanned=int(sqlite_sync.get("scanned", 0)), + history_sqlite_rows=int(sqlite_sync.get("rows", 0)), + history_sqlite_table=str(sqlite_sync.get("table") or ""), + history_sqlite_ts=int(time.time()), + ) + finally: + finished_at = int(time.time()) + self._publish_state( + history_sync_running=False, + history_sync_finished_at=finished_at, + history_sync_duration_ms=int((finished_at - started_at) * 1000), + last_event="manual_history_sync_finished", + ) + state = transport.get_runtime_state(self.service) + return { + "started_at": started_at, + "finished_at": int(state.get("history_sync_finished_at") or int(time.time())), + "duration_ms": int(state.get("history_sync_duration_ms") or 0), + "contacts_sync_count": int(state.get("contacts_sync_count") or 0), + "history_imported_messages": int(state.get("history_imported_messages") or 0), + "sqlite_imported_messages": int(state.get("history_sqlite_imported") or 0), + "sqlite_scanned_messages": int(state.get("history_sqlite_scanned") or 0), + "sqlite_table": str(state.get("history_sqlite_table") or ""), + "sqlite_error": str(state.get("history_sqlite_error") or ""), + "on_demand_requested": bool(state.get("history_on_demand_requested")), + "on_demand_error": str(state.get("history_on_demand_error") or ""), + "last_event": str(state.get("last_event") or ""), + } + + async def _request_on_demand_history(self, identifier: str, count: int = 120): + if not self._client: + return {"requested": False, "error": "client_missing"} + normalized = str(identifier or "").strip() + if not normalized: + return {"requested": False, "error": "identifier_missing"} + + state = transport.get_runtime_state(self.service) + anchors = state.get("history_anchors") or {} + anchor = None + anchor_key = "" + for candidate in self._normalize_identifier_candidates(normalized): + row = anchors.get(str(candidate)) + if isinstance(row, dict) and row.get("msg_id"): + anchor = row + anchor_key = str(candidate) + break + if not anchor: + anchor = await self._load_history_anchor_from_sqlite(identifier=normalized) + if isinstance(anchor, dict) and anchor.get("msg_id"): + anchor_key = str(anchor.get("anchor_key") or "") + else: + return {"requested": False, "error": "anchor_missing"} + + try: + from neonize.builder import build_history_sync_request + from neonize.proto.Neonize_pb2 import MessageInfo, MessageSource + except Exception as exc: + return {"requested": False, "error": f"history_builder_unavailable:{exc}"} + + try: + chat_raw = str(anchor.get("chat_jid") or normalized).strip() + sender_raw = str(anchor.get("sender_jid") or chat_raw).strip() + if not chat_raw: + return {"requested": False, "error": "anchor_chat_missing"} + chat_jid = self._to_jid(chat_raw) + sender_jid = self._to_jid(sender_raw) + info = MessageInfo( + MessageSource=MessageSource( + Chat=chat_jid, + Sender=sender_jid, + IsFromMe=bool(anchor.get("from_me")), + IsGroup=False, + ), + ID=str(anchor.get("msg_id") or ""), + Timestamp=int(anchor.get("ts") or int(time.time() * 1000)), + ) + request_msg = build_history_sync_request(info, max(10, min(int(count or 120), 500))) + await self._maybe_await(self._client.send_message(chat_jid, request_msg)) + self._publish_state( + last_event="history_on_demand_requested", + last_error="", + ) + return {"requested": True, "anchor_key": anchor_key} + except Exception as exc: + self._publish_state( + last_event="history_on_demand_request_failed", + last_error=str(exc), + ) + return {"requested": False, "error": str(exc), "anchor_key": anchor_key} + + async def _load_history_anchor_from_sqlite(self, identifier: str): + def _read_anchor(): + if not self.session_db or not os.path.exists(self.session_db): + return {} + try: + conn = sqlite3.connect(self.session_db) + conn.row_factory = sqlite3.Row + except Exception: + return {} + try: + cur = conn.cursor() + table_rows = cur.execute( + "SELECT name FROM sqlite_master WHERE type='table'" + ).fetchall() + table_names = [str(row[0]) for row in table_rows if row and row[0]] + if "whatsmeow_message_secrets" not in table_names: + return {} + candidates = self._normalize_identifier_candidates(identifier) + local_values = set() + for value in candidates: + local = str(value or "").strip().split("@", 1)[0] + if local: + local_values.add(local) + if not local_values: + return {} + placeholders = ",".join("?" for _ in local_values) + query = ( + "SELECT our_jid, chat_jid, sender_jid, message_id " + 'FROM "whatsmeow_message_secrets" ' + f"WHERE substr(chat_jid, 1, instr(chat_jid, '@') - 1) IN ({placeholders}) " + "ORDER BY rowid DESC LIMIT 1" + ) + row = cur.execute(query, tuple(local_values)).fetchone() + if not row: + return {} + our_jid = str(row["our_jid"] or "") + own_user = our_jid.split("@", 1)[0].split(":", 1)[0].strip() + sender_jid = str(row["sender_jid"] or "") + sender_user = sender_jid.split("@", 1)[0].split(":", 1)[0].strip() + from_me = bool(own_user and sender_user and own_user == sender_user) + chat_jid = str(row["chat_jid"] or "") + msg_id = str(row["message_id"] or "") + if not chat_jid or not msg_id: + return {} + anchor_key = chat_jid.split("@", 1)[0] + return { + "chat_jid": chat_jid, + "sender_jid": sender_jid, + "msg_id": msg_id, + "from_me": from_me, + "ts": int(time.time() * 1000), + "anchor_key": anchor_key, + } + except Exception: + return {} + finally: + conn.close() + + return await asyncio.to_thread(_read_anchor) + + async def _sync_history_from_sqlite(self, identifier: str = ""): + def _extract_rows(): + if not self.session_db or not os.path.exists(self.session_db): + return {"rows": [], "table": "", "error": "sqlite_missing"} + try: + conn = sqlite3.connect(self.session_db) + conn.row_factory = sqlite3.Row + except Exception as exc: + return {"rows": [], "table": "", "error": f"sqlite_open_failed:{exc}"} + try: + cur = conn.cursor() + table_rows = cur.execute( + "SELECT name FROM sqlite_master WHERE type='table'" + ).fetchall() + table_names = [str(row[0]) for row in table_rows if row and row[0]] + preferred = [ + "whatsmeow_messages", + "messages", + "message", + ] + selected = "" + for candidate in preferred: + if candidate in table_names: + selected = candidate + break + if not selected: + for name in table_names: + lowered = name.lower() + if "message" not in lowered: + continue + if any( + token in lowered + for token in ( + "secret", + "contacts", + "contact", + "chat_settings", + "event_buffer", + "identity", + "pre_keys", + "sender_keys", + "session", + "version", + "privacy", + "lid_map", + "device", + "app_state", + ) + ): + continue + if "contact" not in lowered: + selected = name + break + if not selected: + return {"rows": [], "table": "", "error": "messages_table_not_found"} + + columns = [ + str(row[1] or "") + for row in cur.execute(f'PRAGMA table_info("{selected}")').fetchall() + ] + if not columns: + return {"rows": [], "table": selected, "error": "no_columns"} + + def pick(options): + lowered = {col.lower(): col for col in columns} + for option in options: + if option in lowered: + return lowered[option] + for option in options: + for col in columns: + if option in col.lower(): + return col + return "" + + text_col = pick( + [ + "text", + "conversation", + "body", + "caption", + "content", + "message", + "msg", + ] + ) + ts_col = pick( + [ + "message_timestamp", + "messagetimestamp", + "timestamp", + "ts", + "time", + ] + ) + from_me_col = pick( + [ + "from_me", + "fromme", + "is_from_me", + "isfromme", + "outgoing", + ] + ) + sender_col = pick( + [ + "sender_jid", + "sender", + "participant", + "from_jid", + "from", + ] + ) + chat_col = pick( + [ + "chat_jid", + "remote_jid", + "their_jid", + "jid", + "chat", + ] + ) + if not (text_col and ts_col and (sender_col or chat_col)): + return { + "rows": [], + "table": selected, + "error": "required_message_columns_missing", + } + select_cols = [col for col in [text_col, ts_col, from_me_col, sender_col, chat_col] if col] + quoted = ", ".join(f'"{col}"' for col in select_cols) + order_expr = f'"{ts_col}" DESC' if ts_col else "ROWID DESC" + sql = f'SELECT {quoted} FROM "{selected}" ORDER BY {order_expr} LIMIT 12000' + try: + rows = cur.execute(sql).fetchall() + except Exception as exc: + return { + "rows": [], + "table": selected, + "error": f"messages_query_failed:{exc}", + } + parsed = [] + for row in rows: + row_map = {col: row[idx] for idx, col in enumerate(select_cols)} + text = str(row_map.get(text_col) or "").strip() + if not text: + continue + raw_sender = str(row_map.get(sender_col) or "").strip() if sender_col else "" + raw_chat = str(row_map.get(chat_col) or "").strip() if chat_col else "" + raw_from_me = row_map.get(from_me_col) if from_me_col else None + parsed.append( + { + "text": text, + "ts": row_map.get(ts_col), + "from_me": raw_from_me, + "sender": raw_sender, + "chat": raw_chat, + } + ) + return {"rows": parsed, "table": selected, "error": ""} + finally: + conn.close() + + extracted = await asyncio.to_thread(_extract_rows) + rows = list(extracted.get("rows") or []) + table_name = str(extracted.get("table") or "") + error_text = str(extracted.get("error") or "").strip() + if error_text: + self._publish_state( + last_event="sqlite_history_scan_failed", + history_sqlite_error=error_text, + ) + return {"imported": 0, "scanned": 0, "rows": 0, "table": table_name} + if not rows: + return {"imported": 0, "scanned": 0, "rows": 0, "table": table_name} + + target_candidates = self._normalize_identifier_candidates(identifier) + target_local = str(identifier or "").strip().split("@", 1)[0] + if target_local: + target_candidates.update(self._normalize_identifier_candidates(target_local)) + + identifiers = await sync_to_async(list)( + PersonIdentifier.objects.filter(service="whatsapp") + ) + if not identifiers: + return {"imported": 0, "scanned": 0, "rows": len(rows), "table": table_name} + + by_candidate = {} + for row in identifiers: + values = self._normalize_identifier_candidates(row.identifier) + for candidate in values: + if not candidate: + continue + by_candidate.setdefault(candidate, []).append(row) + + imported = 0 + scanned = 0 + session_cache = {} + for row in rows: + sender = self._jid_to_identifier(row.get("sender")) + chat = self._jid_to_identifier(row.get("chat")) + row_candidates = self._normalize_identifier_candidates(sender, chat) + if not row_candidates: + continue + if target_candidates and not (row_candidates & target_candidates): + continue + matched_identifiers = {} + for candidate in row_candidates: + for pi in by_candidate.get(candidate, []): + matched_identifiers[int(pi.id)] = pi + if not matched_identifiers: + continue + scanned += 1 + ts = self._normalize_timestamp(row.get("ts")) + text = str(row.get("text") or "").strip() + if not text: + continue + from_me_raw = row.get("from_me") + from_me_text = str(from_me_raw or "").strip().lower() + from_me = from_me_text in {"1", "true", "t", "yes", "y"} + sender_uuid = "" if from_me else str(sender or chat or "") + custom_author = "BOT" if from_me else None + for pi in matched_identifiers.values(): + session = session_cache.get(int(pi.id)) + if session is None: + session = await history.get_chat_session(pi.user, pi) + session_cache[int(pi.id)] = session + exists = await sync_to_async( + Message.objects.filter( + user=pi.user, + session=session, + ts=ts, + sender_uuid=sender_uuid, + text=text, + custom_author=custom_author, + ).exists + )() + if exists: + continue + await sync_to_async(Message.objects.create)( + user=pi.user, + session=session, + ts=ts, + sender_uuid=sender_uuid, + text=text, + custom_author=custom_author, + delivered_ts=ts, + ) + imported += 1 + + self.log.info( + "whatsapp sqlite history sync: table=%s scanned=%s imported=%s rows=%s target=%s", + table_name or "-", + scanned, + imported, + len(rows), + identifier or "-", + ) + return { + "imported": imported, + "scanned": scanned, + "rows": len(rows), + "table": table_name, + } + async def _resolve_account_identifier(self): if self._client is None: return "" @@ -672,6 +1226,15 @@ class WhatsAppClient(ClientBase): return None return current + def _shape_keys(self, obj): + if obj is None: + return [] + if isinstance(obj, dict): + return sorted(str(key) for key in obj.keys()) + if hasattr(obj, "__dict__"): + return sorted(str(key) for key in vars(obj).keys()) + return [] + def _normalize_timestamp(self, raw_value): if raw_value is None: return int(time.time() * 1000) @@ -686,6 +1249,8 @@ class WhatsAppClient(ClientBase): def _normalize_identifier_candidates(self, *values): out = set() + state = transport.get_runtime_state(self.service) + lid_map = state.get("lid_map") or {} for value in values: raw = self._jid_to_identifier(value) if not raw: @@ -698,6 +1263,12 @@ class WhatsAppClient(ClientBase): out.add(digits) if not digits.startswith("+"): out.add(f"+{digits}") + if "@lid" in raw: + mapped = re.sub(r"[^0-9]", "", str(lid_map.get(digits) or "")) + if mapped: + out.add(mapped) + out.add(f"+{mapped}") + out.add(f"{mapped}@s.whatsapp.net") return out async def _sync_contacts_from_client(self): @@ -712,7 +1283,7 @@ class WhatsAppClient(ClientBase): return # NOTE: Neonize get_all_contacts has crashed some runtime builds with a Go panic. # Read contact-like rows directly from the session sqlite DB instead. - contacts, source = await self._sync_contacts_from_sqlite() + contacts, source, lid_map = await self._sync_contacts_from_sqlite() if not contacts: self.log.info("whatsapp contacts sync empty (%s)", source or "unknown") self._publish_state( @@ -727,6 +1298,7 @@ class WhatsAppClient(ClientBase): ) self._publish_state( contacts=contacts, + lid_map=lid_map, contacts_synced_at=int(time.time()), contacts_sync_count=len(contacts), last_event="contacts_synced", @@ -737,18 +1309,31 @@ class WhatsAppClient(ClientBase): async def _sync_contacts_from_sqlite(self): def _extract(): if not self.session_db or not os.path.exists(self.session_db): - return [], "sqlite_missing" + return [], "sqlite_missing", {} try: conn = sqlite3.connect(self.session_db) conn.row_factory = sqlite3.Row except Exception: - return [], "sqlite_open_failed" + return [], "sqlite_open_failed", {} try: cur = conn.cursor() table_rows = cur.execute( "SELECT name FROM sqlite_master WHERE type='table'" ).fetchall() table_names = [str(row[0]) for row in table_rows if row and row[0]] + lid_map = {} + if "whatsmeow_lid_map" in table_names: + try: + lid_rows = cur.execute( + 'SELECT "lid", "pn" FROM "whatsmeow_lid_map" LIMIT 10000' + ).fetchall() + except Exception: + lid_rows = [] + for lid, pn in lid_rows: + lid_key = re.sub(r"[^0-9]", "", str(lid or "").strip()) + pn_value = re.sub(r"[^0-9]", "", str(pn or "").strip()) + if lid_key and pn_value: + lid_map[lid_key] = pn_value # Prefer the canonical contacts table when available. if "whatsmeow_contacts" in table_names: own_ids = set() @@ -784,6 +1369,11 @@ class WhatsAppClient(ClientBase): if "@s.whatsapp.net" not in jid_value and "@lid" not in jid_value: continue identifier = jid_value.split("@", 1)[0].strip().split(":", 1)[0] + if "@lid" in jid_value: + mapped = lid_map.get(re.sub(r"[^0-9]", "", identifier)) + if mapped: + identifier = mapped + jid_value = f"{mapped}@s.whatsapp.net" if not identifier: continue if identifier.lower() in own_ids: @@ -809,7 +1399,7 @@ class WhatsAppClient(ClientBase): if len(out) >= 500: break if out: - return out, "sqlite_contacts" + return out, "sqlite_contacts", lid_map account_keys = { str(value or "").strip().split("@", 1)[0].lower() @@ -911,8 +1501,8 @@ class WhatsAppClient(ClientBase): } ) if len(out) >= 500: - return out, "sqlite_tables" - return out, "sqlite_tables" + return out, "sqlite_tables", lid_map + return out, "sqlite_tables", lid_map finally: conn.close() @@ -956,8 +1546,18 @@ class WhatsAppClient(ClientBase): return False async def _handle_history_sync_event(self, event): + started_at = time.time() + self._publish_state( + history_sync_running=True, + history_sync_started_at=int(started_at), + last_event="history_sync_started", + ) data = self._pluck(event, "Data") or self._pluck(event, "data") if data is None: + self._publish_state( + history_sync_running=False, + history_sync_duration_ms=int((time.time() - started_at) * 1000), + ) return pushname_rows = ( @@ -989,6 +1589,7 @@ class WhatsAppClient(ClientBase): or [] ) found = 0 + imported_messages = 0 for row in conversation_rows: jid = "" for candidate in ( @@ -1023,6 +1624,10 @@ class WhatsAppClient(ClientBase): ).strip() self._remember_contact(identifier, jid=jid, name=name) found += 1 + imported_messages += await self._import_history_messages_for_conversation( + row=row, + chat_jid=jid, + ) if found: state = transport.get_runtime_state(self.service) @@ -1034,6 +1639,170 @@ class WhatsAppClient(ClientBase): last_event="history_sync_contacts", last_error="", ) + self._publish_state( + history_sync_running=False, + history_sync_finished_at=int(time.time()), + history_sync_duration_ms=int((time.time() - started_at) * 1000), + history_imported_messages=imported_messages, + last_event="history_sync_completed", + ) + + def _history_message_rows(self, conversation_row): + candidates = ( + self._pluck(conversation_row, "messages"), + self._pluck(conversation_row, "Messages"), + self._pluck(conversation_row, "msgs"), + self._pluck(conversation_row, "Msgs"), + ) + for value in candidates: + if isinstance(value, (list, tuple)): + return list(value) + return [] + + def _history_message_text(self, msg): + return str( + self._pluck(msg, "message", "conversation") + or self._pluck(msg, "message", "Conversation") + or self._pluck(msg, "message", "extendedTextMessage", "text") + or self._pluck(msg, "message", "ExtendedTextMessage", "Text") + or self._pluck(msg, "message", "imageMessage", "caption") + or self._pluck(msg, "message", "videoMessage", "caption") + or self._pluck(msg, "message", "documentMessage", "caption") + or self._pluck(msg, "Message", "conversation") + or self._pluck(msg, "Message", "Conversation") + or self._pluck(msg, "Message", "extendedTextMessage", "text") + or self._pluck(msg, "Message", "ExtendedTextMessage", "Text") + or self._pluck(msg, "conversation") + or self._pluck(msg, "Conversation") + or self._pluck(msg, "text") + or self._pluck(msg, "Text") + or "" + ).strip() + + def _message_text(self, msg_obj, event_obj=None): + """ + Extract user-visible text from diverse WhatsApp message payload shapes. + """ + candidates = ( + self._pluck(msg_obj, "conversation"), + self._pluck(msg_obj, "Conversation"), + self._pluck(msg_obj, "extendedTextMessage", "text"), + self._pluck(msg_obj, "ExtendedTextMessage", "Text"), + self._pluck(msg_obj, "extended_text_message", "text"), + self._pluck(msg_obj, "imageMessage", "caption"), + self._pluck(msg_obj, "videoMessage", "caption"), + self._pluck(msg_obj, "documentMessage", "caption"), + self._pluck(msg_obj, "ephemeralMessage", "message", "conversation"), + self._pluck(msg_obj, "ephemeralMessage", "message", "extendedTextMessage", "text"), + self._pluck(msg_obj, "viewOnceMessage", "message", "conversation"), + self._pluck(msg_obj, "viewOnceMessage", "message", "extendedTextMessage", "text"), + self._pluck(msg_obj, "viewOnceMessageV2", "message", "conversation"), + self._pluck(msg_obj, "viewOnceMessageV2", "message", "extendedTextMessage", "text"), + self._pluck(msg_obj, "viewOnceMessageV2Extension", "message", "conversation"), + self._pluck( + msg_obj, + "viewOnceMessageV2Extension", + "message", + "extendedTextMessage", + "text", + ), + self._pluck(event_obj, "message", "conversation"), + self._pluck(event_obj, "message", "extendedTextMessage", "text"), + self._pluck(event_obj, "Message", "conversation"), + self._pluck(event_obj, "Message", "extendedTextMessage", "text"), + self._pluck(event_obj, "conversation"), + self._pluck(event_obj, "text"), + ) + for value in candidates: + text = str(value or "").strip() + if text: + return text + return "" + + def _history_message_ts(self, msg): + raw_ts = ( + self._pluck(msg, "messageTimestamp") + or self._pluck(msg, "MessageTimestamp") + or self._pluck(msg, "timestamp") + or self._pluck(msg, "Timestamp") + or self._pluck(msg, "message", "messageTimestamp") + or self._pluck(msg, "Message", "messageTimestamp") + or int(time.time() * 1000) + ) + return self._normalize_timestamp(raw_ts) + + def _history_message_from_me(self, msg): + return bool( + self._pluck(msg, "key", "fromMe") + or self._pluck(msg, "Key", "FromMe") + or self._pluck(msg, "messageKey", "fromMe") + or self._pluck(msg, "MessageKey", "FromMe") + or self._pluck(msg, "fromMe") + or self._pluck(msg, "FromMe") + ) + + def _history_message_sender_jid(self, msg, fallback_chat_jid: str): + sender = self._jid_to_identifier( + self._pluck(msg, "key", "participant") + or self._pluck(msg, "Key", "Participant") + or self._pluck(msg, "messageKey", "participant") + or self._pluck(msg, "MessageKey", "Participant") + or self._pluck(msg, "participant") + or self._pluck(msg, "Participant") + or fallback_chat_jid + ) + return str(sender or fallback_chat_jid or "").strip() + + async def _import_history_messages_for_conversation(self, row, chat_jid: str) -> int: + imported = 0 + msg_rows = self._history_message_rows(row) + if not msg_rows: + return imported + chat_identifier = str(chat_jid or "").split("@", 1)[0].strip() + if not chat_identifier: + return imported + candidate_values = self._normalize_identifier_candidates(chat_jid, chat_identifier) + if not candidate_values: + return imported + identifiers = await sync_to_async(list)( + PersonIdentifier.objects.filter( + service="whatsapp", + identifier__in=list(candidate_values), + ) + ) + if not identifiers: + return imported + + for msg in msg_rows: + ts = self._history_message_ts(msg) + text = self._history_message_text(msg) + from_me = self._history_message_from_me(msg) + sender_jid = self._history_message_sender_jid(msg, chat_jid) + for identifier in identifiers: + session = await history.get_chat_session(identifier.user, identifier) + exists = await sync_to_async( + Message.objects.filter( + user=identifier.user, + session=session, + ts=ts, + sender_uuid=(sender_jid if not from_me else ""), + text=text, + custom_author=("BOT" if from_me else None), + ).exists + )() + if exists: + continue + await sync_to_async(Message.objects.create)( + user=identifier.user, + session=session, + ts=ts, + sender_uuid=(sender_jid if not from_me else ""), + text=text, + custom_author=("BOT" if from_me else None), + delivered_ts=ts, + ) + imported += 1 + return imported def _remember_contact(self, identifier, *, jid="", name="", chat=""): cleaned = str(identifier or "").strip() @@ -1066,6 +1835,42 @@ class WhatsAppClient(ClientBase): break self._publish_state(contacts=merged, last_contact_seen_at=now_ts) + def _remember_history_anchor( + self, + *, + identifier: str, + chat_jid: str, + msg_id: str, + ts: int, + from_me: bool, + sender_jid: str = "", + ): + if not msg_id: + return + candidate_keys = self._normalize_identifier_candidates(identifier, chat_jid) + if not candidate_keys: + return + state = transport.get_runtime_state(self.service) + anchors = dict(state.get("history_anchors") or {}) + row = { + "msg_id": str(msg_id), + "ts": int(ts or int(time.time() * 1000)), + "from_me": bool(from_me), + "chat_jid": str(chat_jid or ""), + "sender_jid": str(sender_jid or ""), + "updated_at": int(time.time()), + } + for key in candidate_keys: + anchors[str(key)] = row + if len(anchors) > 400: + recent = sorted( + anchors.items(), + key=lambda item: int((item[1] or {}).get("updated_at") or 0), + reverse=True, + )[:300] + anchors = {k: v for k, v in recent} + self._publish_state(history_anchors=anchors) + def _jid_to_identifier(self, value): raw = str(value or "").strip() if not raw: @@ -1149,14 +1954,14 @@ class WhatsAppClient(ClientBase): async def _handle_message_event(self, event): msg_obj = self._pluck(event, "message") or self._pluck(event, "Message") - text = ( - self._pluck(msg_obj, "conversation") - or self._pluck(msg_obj, "Conversation") - or self._pluck(msg_obj, "extendedTextMessage", "text") - or self._pluck(msg_obj, "ExtendedTextMessage", "Text") - or self._pluck(msg_obj, "extended_text_message", "text") - or "" - ) + text = self._message_text(msg_obj, event) + if not text: + self.log.info( + "whatsapp empty-text event shape: msg_keys=%s event_keys=%s type=%s", + self._shape_keys(msg_obj), + self._shape_keys(event), + str(type(event).__name__), + ) source = ( self._pluck(event, "Info", "MessageSource") or self._pluck(event, "info", "message_source") @@ -1166,8 +1971,6 @@ class WhatsAppClient(ClientBase): self._pluck(source, "IsFromMe") or self._pluck(source, "isFromMe") ) - if is_from_me: - return sender = self._jid_to_identifier( self._pluck(source, "Sender") @@ -1186,12 +1989,27 @@ class WhatsAppClient(ClientBase): or self._pluck(event, "Timestamp") or self._pluck(event, "timestamp") ) + msg_id = str( + self._pluck(event, "Info", "ID") + or self._pluck(event, "info", "id") + or self._pluck(event, "ID") + or self._pluck(event, "id") + or "" + ).strip() ts = self._normalize_timestamp(raw_ts) self._remember_contact( sender or chat, jid=sender, chat=chat, ) + self._remember_history_anchor( + identifier=(chat or sender or ""), + chat_jid=str(chat or sender or ""), + msg_id=msg_id, + ts=ts, + from_me=is_from_me, + sender_jid=str(sender or ""), + ) identifier_values = self._normalize_identifier_candidates(sender, chat) if not identifier_values: @@ -1225,7 +2043,7 @@ class WhatsAppClient(ClientBase): identifier.user, identifier, text, - is_outgoing_message=False, + is_outgoing_message=is_from_me, attachments=xmpp_attachments, ) display_text = text @@ -1241,12 +2059,23 @@ class WhatsAppClient(ClientBase): display_text = "\n".join(media_urls) session = await history.get_chat_session(identifier.user, identifier) + duplicate_exists = await sync_to_async( + Message.objects.filter( + user=identifier.user, + session=session, + ts=ts, + sender_uuid=str(sender or chat or ""), + text=display_text, + ).exists + )() + if duplicate_exists: + continue await history.store_message( session=session, sender=str(sender or chat or ""), text=display_text, ts=ts, - outgoing=False, + outgoing=is_from_me, ) await self.ur.message_received( self.service, diff --git a/core/templates/base.html b/core/templates/base.html index 13737ec..0e4327e 100644 --- a/core/templates/base.html +++ b/core/templates/base.html @@ -576,6 +576,67 @@ window.giaWindowAnchor = null; }; + window.giaEnableFloatingWindowInteractions = function (windowEl) { + if (!windowEl || windowEl.dataset.giaWindowInteractive === "1") { + return; + } + windowEl.dataset.giaWindowInteractive = "1"; + + // Disable magnet-block global drag so text inputs remain editable. + windowEl.setAttribute("unmovable", ""); + + const heading = windowEl.querySelector(".panel-heading"); + if (!heading) { + return; + } + + let dragging = false; + let startX = 0; + let startY = 0; + let startLeft = 0; + let startTop = 0; + + const onMove = function (event) { + if (!dragging) { + return; + } + const deltaX = event.clientX - startX; + const deltaY = event.clientY - startY; + windowEl.style.left = (startLeft + deltaX) + "px"; + windowEl.style.top = (startTop + deltaY) + "px"; + windowEl.style.right = "auto"; + windowEl.style.bottom = "auto"; + }; + + const stopDrag = function () { + dragging = false; + document.removeEventListener("pointermove", onMove); + document.removeEventListener("pointerup", stopDrag); + }; + + heading.addEventListener("pointerdown", function (event) { + if (event.button !== 0) { + return; + } + const interactive = event.target.closest( + "button, a, input, textarea, select, label, .delete, .icon" + ); + if (interactive) { + return; + } + const rect = windowEl.getBoundingClientRect(); + windowEl.style.position = "fixed"; + startLeft = rect.left; + startTop = rect.top; + startX = event.clientX; + startY = event.clientY; + dragging = true; + document.addEventListener("pointermove", onMove); + document.addEventListener("pointerup", stopDrag); + event.preventDefault(); + }); + }; + document.addEventListener("click", function (event) { const trigger = event.target.closest(".js-widget-spawn-trigger"); if (!trigger) { @@ -593,12 +654,13 @@ window.giaEnableWidgetSpawnButtons(target); const targetId = (target && target.id) || ""; if (targetId === "windows-here") { - const floatingWindow = target.querySelector(".floating-window"); - if (floatingWindow) { + const floatingWindows = target.querySelectorAll(".floating-window"); + floatingWindows.forEach(function (floatingWindow) { window.setTimeout(function () { window.giaPositionFloatingWindow(floatingWindow); + window.giaEnableFloatingWindowInteractions(floatingWindow); }, 0); - } + }); } }); diff --git a/core/templates/mixins/wm/widget.html b/core/templates/mixins/wm/widget.html index 8cd36bc..cc8451a 100644 --- a/core/templates/mixins/wm/widget.html +++ b/core/templates/mixins/wm/widget.html @@ -6,6 +6,9 @@ {% block close_button %} {% include "mixins/partials/close-widget.html" %} {% endblock %} + + + diff --git a/core/templates/pages/compose-contact-match.html b/core/templates/pages/compose-contact-match.html index 2ea4072..e03c65a 100644 --- a/core/templates/pages/compose-contact-match.html +++ b/core/templates/pages/compose-contact-match.html @@ -30,12 +30,12 @@