Work on fixing bugs and reformat

This commit is contained in:
2026-02-16 16:01:17 +00:00
parent 8cfd93d0d2
commit d11355a46b
32 changed files with 1100 additions and 442 deletions

View File

@@ -1,4 +1,5 @@
import asyncio
import logging
import os
import re
import sqlite3
@@ -8,6 +9,7 @@ from urllib.parse import quote_plus
import aiohttp
from asgiref.sync import sync_to_async
from django.conf import settings
from django.core.cache import cache
from core.clients import ClientBase, transport
from core.messaging import history, media_bridge
@@ -45,12 +47,11 @@ class WhatsAppClient(ClientBase):
str(getattr(settings, "WHATSAPP_ENABLED", "false")).lower()
in {"1", "true", "yes", "on"}
)
self.client_name = str(
getattr(settings, "WHATSAPP_CLIENT_NAME", "gia_whatsapp")
).strip() or "gia_whatsapp"
self.database_url = str(
getattr(settings, "WHATSAPP_DATABASE_URL", "")
).strip()
self.client_name = (
str(getattr(settings, "WHATSAPP_CLIENT_NAME", "gia_whatsapp")).strip()
or "gia_whatsapp"
)
self.database_url = str(getattr(settings, "WHATSAPP_DATABASE_URL", "")).strip()
safe_name = re.sub(r"[^a-zA-Z0-9_.-]+", "_", self.client_name) or "gia_whatsapp"
# Use a persistent default path (under project mount) instead of /tmp so
# link state and contact cache survive container restarts.
@@ -70,14 +71,24 @@ class WhatsAppClient(ClientBase):
self._publish_state(
connected=False,
warning=(
"WhatsApp runtime is disabled by settings."
if not self.enabled
else ""
"WhatsApp runtime is disabled by settings." if not self.enabled else ""
),
accounts=prior_accounts,
last_event="init",
session_db=self.session_db,
)
# Reduce third-party library logging noise (neonize/grpc/protobuf).
try:
# Common noisy libraries used by Neonize/WhatsApp stacks
logging.getLogger("neonize").setLevel(logging.WARNING)
logging.getLogger("grpc").setLevel(logging.WARNING)
logging.getLogger("google").setLevel(logging.WARNING)
logging.getLogger("protobuf").setLevel(logging.WARNING)
logging.getLogger("whatsmeow").setLevel(logging.WARNING)
logging.getLogger("whatsmeow.Client").setLevel(logging.WARNING)
logging.getLogger("aiohttp.access").setLevel(logging.WARNING)
except Exception:
pass
def _publish_state(self, **updates):
state = transport.update_runtime_state(self.service, **updates)
@@ -96,8 +107,9 @@ class WhatsAppClient(ClientBase):
async def _run(self):
try:
import neonize.aioze.client as wa_client_mod
from neonize.aioze.client import NewAClient
from neonize.aioze import events as wa_events
from neonize.aioze.client import NewAClient
try:
from neonize.utils.enum import ChatPresence, ChatPresenceMedia
except Exception:
@@ -432,7 +444,9 @@ class WhatsAppClient(ClientBase):
try:
return cls(self.session_db)
except Exception as exc:
self.log.warning("whatsapp client init failed (%s): %s", self.session_db, exc)
self.log.warning(
"whatsapp client init failed (%s): %s", self.session_db, exc
)
self._publish_state(
last_event="client_init_exception",
last_error=str(exc),
@@ -611,7 +625,9 @@ class WhatsAppClient(ClientBase):
if offline_sync_completed_ev is not None:
async def on_offline_sync_completed(client, event: offline_sync_completed_ev):
async def on_offline_sync_completed(
client, event: offline_sync_completed_ev
):
self._publish_state(last_event="offline_sync_completed")
await self._sync_contacts_from_client()
@@ -633,8 +649,7 @@ class WhatsAppClient(ClientBase):
self._publish_state(
last_event="pair_waiting_no_qr",
warning=(
"Waiting for WhatsApp QR from Neonize. "
"No QR callback received yet."
"Waiting for WhatsApp QR from Neonize. " "No QR callback received yet."
),
)
@@ -663,10 +678,12 @@ class WhatsAppClient(ClientBase):
text = payload.get("text")
attachments = payload.get("attachments") or []
try:
# Include command_id so send_message_raw can observe cancel requests
result = await self.send_message_raw(
recipient=recipient,
text=text,
attachments=attachments,
command_id=command_id,
)
if result is not False and result is not None:
transport.set_runtime_command_result(
@@ -778,10 +795,14 @@ class WhatsAppClient(ClientBase):
state = transport.get_runtime_state(self.service)
return {
"started_at": started_at,
"finished_at": int(state.get("history_sync_finished_at") or int(time.time())),
"finished_at": int(
state.get("history_sync_finished_at") or int(time.time())
),
"duration_ms": int(state.get("history_sync_duration_ms") or 0),
"contacts_sync_count": int(state.get("contacts_sync_count") or 0),
"history_imported_messages": int(state.get("history_imported_messages") or 0),
"history_imported_messages": int(
state.get("history_imported_messages") or 0
),
"sqlite_imported_messages": int(state.get("history_sqlite_imported") or 0),
"sqlite_scanned_messages": int(state.get("history_sqlite_scanned") or 0),
"sqlite_table": str(state.get("history_sqlite_table") or ""),
@@ -838,7 +859,9 @@ class WhatsAppClient(ClientBase):
ID=str(anchor.get("msg_id") or ""),
Timestamp=int(anchor.get("ts") or int(time.time() * 1000)),
)
request_msg = build_history_sync_request(info, max(10, min(int(count or 120), 500)))
request_msg = build_history_sync_request(
info, max(10, min(int(count or 120), 500))
)
await self._maybe_await(self._client.send_message(chat_jid, request_msg))
self._publish_state(
last_event="history_on_demand_requested",
@@ -966,11 +989,17 @@ class WhatsAppClient(ClientBase):
selected = name
break
if not selected:
return {"rows": [], "table": "", "error": "messages_table_not_found"}
return {
"rows": [],
"table": "",
"error": "messages_table_not_found",
}
columns = [
str(row[1] or "")
for row in cur.execute(f'PRAGMA table_info("{selected}")').fetchall()
for row in cur.execute(
f'PRAGMA table_info("{selected}")'
).fetchall()
]
if not columns:
return {"rows": [], "table": selected, "error": "no_columns"}
@@ -1039,7 +1068,11 @@ class WhatsAppClient(ClientBase):
"table": selected,
"error": "required_message_columns_missing",
}
select_cols = [col for col in [text_col, ts_col, from_me_col, sender_col, chat_col] if col]
select_cols = [
col
for col in [text_col, ts_col, from_me_col, sender_col, chat_col]
if col
]
quoted = ", ".join(f'"{col}"' for col in select_cols)
order_expr = f'"{ts_col}" DESC' if ts_col else "ROWID DESC"
sql = f'SELECT {quoted} FROM "{selected}" ORDER BY {order_expr} LIMIT 12000'
@@ -1057,8 +1090,12 @@ class WhatsAppClient(ClientBase):
text = str(row_map.get(text_col) or "").strip()
if not text:
continue
raw_sender = str(row_map.get(sender_col) or "").strip() if sender_col else ""
raw_chat = str(row_map.get(chat_col) or "").strip() if chat_col else ""
raw_sender = (
str(row_map.get(sender_col) or "").strip() if sender_col else ""
)
raw_chat = (
str(row_map.get(chat_col) or "").strip() if chat_col else ""
)
raw_from_me = row_map.get(from_me_col) if from_me_col else None
parsed.append(
{
@@ -1089,7 +1126,9 @@ class WhatsAppClient(ClientBase):
target_candidates = self._normalize_identifier_candidates(identifier)
target_local = str(identifier or "").strip().split("@", 1)[0]
if target_local:
target_candidates.update(self._normalize_identifier_candidates(target_local))
target_candidates.update(
self._normalize_identifier_candidates(target_local)
)
identifiers = await sync_to_async(list)(
PersonIdentifier.objects.filter(service="whatsapp")
@@ -1348,7 +1387,9 @@ class WhatsAppClient(ClientBase):
for row in cur.execute(
'SELECT DISTINCT "our_jid" FROM "whatsmeow_contacts"'
).fetchall():
base = str((row or [None])[0] or "").strip().split("@", 1)[0]
base = (
str((row or [None])[0] or "").strip().split("@", 1)[0]
)
base = base.split(":", 1)[0]
if base:
own_ids.add(base.lower())
@@ -1364,9 +1405,18 @@ class WhatsAppClient(ClientBase):
).fetchall()
except Exception:
rows = []
for their_jid, first_name, full_name, push_name, business_name in rows:
for (
their_jid,
first_name,
full_name,
push_name,
business_name,
) in rows:
jid_value = str(their_jid or "").strip()
if "@s.whatsapp.net" not in jid_value and "@lid" not in jid_value:
if (
"@s.whatsapp.net" not in jid_value
and "@lid" not in jid_value
):
continue
identifier = jid_value.split("@", 1)[0].strip().split(":", 1)[0]
if "@lid" in jid_value:
@@ -1539,7 +1589,9 @@ class WhatsAppClient(ClientBase):
me = await self._maybe_await(self._client.get_me())
if me:
self._connected = True
self._publish_state(connected=True, warning="", pair_status="connected")
self._publish_state(
connected=True, warning="", pair_status="connected"
)
return True
except Exception:
pass
@@ -1561,22 +1613,17 @@ class WhatsAppClient(ClientBase):
return
pushname_rows = (
self._pluck(data, "pushnames")
or self._pluck(data, "Pushnames")
or []
self._pluck(data, "pushnames") or self._pluck(data, "Pushnames") or []
)
pushname_map = {}
for row in pushname_rows:
raw_id = (
self._jid_to_identifier(self._pluck(row, "ID"))
or self._jid_to_identifier(self._pluck(row, "id"))
)
raw_id = self._jid_to_identifier(
self._pluck(row, "ID")
) or self._jid_to_identifier(self._pluck(row, "id"))
if not raw_id:
continue
pushname = str(
self._pluck(row, "pushname")
or self._pluck(row, "Pushname")
or ""
self._pluck(row, "pushname") or self._pluck(row, "Pushname") or ""
).strip()
if not pushname:
continue
@@ -1693,12 +1740,20 @@ class WhatsAppClient(ClientBase):
self._pluck(msg_obj, "videoMessage", "caption"),
self._pluck(msg_obj, "documentMessage", "caption"),
self._pluck(msg_obj, "ephemeralMessage", "message", "conversation"),
self._pluck(msg_obj, "ephemeralMessage", "message", "extendedTextMessage", "text"),
self._pluck(
msg_obj, "ephemeralMessage", "message", "extendedTextMessage", "text"
),
self._pluck(msg_obj, "viewOnceMessage", "message", "conversation"),
self._pluck(msg_obj, "viewOnceMessage", "message", "extendedTextMessage", "text"),
self._pluck(
msg_obj, "viewOnceMessage", "message", "extendedTextMessage", "text"
),
self._pluck(msg_obj, "viewOnceMessageV2", "message", "conversation"),
self._pluck(msg_obj, "viewOnceMessageV2", "message", "extendedTextMessage", "text"),
self._pluck(msg_obj, "viewOnceMessageV2Extension", "message", "conversation"),
self._pluck(
msg_obj, "viewOnceMessageV2", "message", "extendedTextMessage", "text"
),
self._pluck(
msg_obj, "viewOnceMessageV2Extension", "message", "conversation"
),
self._pluck(
msg_obj,
"viewOnceMessageV2Extension",
@@ -1753,7 +1808,9 @@ class WhatsAppClient(ClientBase):
)
return str(sender or fallback_chat_jid or "").strip()
async def _import_history_messages_for_conversation(self, row, chat_jid: str) -> int:
async def _import_history_messages_for_conversation(
self, row, chat_jid: str
) -> int:
imported = 0
msg_rows = self._history_message_rows(row)
if not msg_rows:
@@ -1761,7 +1818,9 @@ class WhatsAppClient(ClientBase):
chat_identifier = str(chat_jid or "").split("@", 1)[0].strip()
if not chat_identifier:
return imported
candidate_values = self._normalize_identifier_candidates(chat_jid, chat_identifier)
candidate_values = self._normalize_identifier_candidates(
chat_jid, chat_identifier
)
if not candidate_values:
return imported
identifiers = await sync_to_async(list)(
@@ -1968,8 +2027,7 @@ class WhatsAppClient(ClientBase):
or self._pluck(event, "info", "messageSource")
)
is_from_me = bool(
self._pluck(source, "IsFromMe")
or self._pluck(source, "isFromMe")
self._pluck(source, "IsFromMe") or self._pluck(source, "isFromMe")
)
sender = self._jid_to_identifier(
@@ -1979,8 +2037,7 @@ class WhatsAppClient(ClientBase):
or self._pluck(source, "senderAlt")
)
chat = self._jid_to_identifier(
self._pluck(source, "Chat")
or self._pluck(source, "chat")
self._pluck(source, "Chat") or self._pluck(source, "chat")
)
raw_ts = (
self._pluck(event, "Info", "Timestamp")
@@ -2092,14 +2149,15 @@ class WhatsAppClient(ClientBase):
or self._pluck(event, "info", "messageSource")
)
sender = self._jid_to_identifier(
self._pluck(source, "Sender")
or self._pluck(source, "sender")
self._pluck(source, "Sender") or self._pluck(source, "sender")
)
chat = self._jid_to_identifier(
self._pluck(source, "Chat") or self._pluck(source, "chat")
)
timestamps = []
raw_ids = self._pluck(event, "MessageIDs") or self._pluck(event, "message_ids") or []
raw_ids = (
self._pluck(event, "MessageIDs") or self._pluck(event, "message_ids") or []
)
if isinstance(raw_ids, (list, tuple, set)):
for item in raw_ids:
try:
@@ -2141,8 +2199,7 @@ class WhatsAppClient(ClientBase):
or {}
)
sender = self._jid_to_identifier(
self._pluck(source, "Sender")
or self._pluck(source, "sender")
self._pluck(source, "Sender") or self._pluck(source, "sender")
)
chat = self._jid_to_identifier(
self._pluck(source, "Chat") or self._pluck(source, "chat")
@@ -2161,19 +2218,26 @@ class WhatsAppClient(ClientBase):
await self.ur.started_typing(
self.service,
identifier=candidate,
payload={"presence": state_text, "sender": str(sender), "chat": str(chat)},
payload={
"presence": state_text,
"sender": str(sender),
"chat": str(chat),
},
)
else:
await self.ur.stopped_typing(
await self.ur.stopped_typing(
self.service,
identifier=candidate,
payload={"presence": state_text, "sender": str(sender), "chat": str(chat)},
payload={
"presence": state_text,
"sender": str(sender),
"chat": str(chat),
},
)
async def _handle_presence_event(self, event):
sender = self._jid_to_identifier(
self._pluck(event, "From", "User")
or self._pluck(event, "from", "user")
self._pluck(event, "From", "User") or self._pluck(event, "from", "user")
)
is_unavailable = bool(
self._pluck(event, "Unavailable") or self._pluck(event, "unavailable")
@@ -2271,7 +2335,9 @@ class WhatsAppClient(ClientBase):
}
return None
async def send_message_raw(self, recipient, text=None, attachments=None):
async def send_message_raw(
self, recipient, text=None, attachments=None, command_id: str | None = None
):
if not self._client:
return False
if self._build_jid is None:
@@ -2283,8 +2349,12 @@ class WhatsAppClient(ClientBase):
try:
jid = self._build_jid(jid_str)
# Verify it's a proper JID object with SerializeToString method
if not hasattr(jid, 'SerializeToString'):
self.log.error("whatsapp build_jid returned non-JID object: type=%s repr=%s", type(jid).__name__, repr(jid)[:100])
if not hasattr(jid, "SerializeToString"):
self.log.error(
"whatsapp build_jid returned non-JID object: type=%s repr=%s",
type(jid).__name__,
repr(jid)[:100],
)
return False
except Exception as exc:
self.log.warning("whatsapp failed to build JID from %s: %s", jid_str, exc)
@@ -2313,7 +2383,9 @@ class WhatsAppClient(ClientBase):
payload = await self._fetch_attachment_payload(attachment)
if not payload:
continue
mime = str(payload.get("content_type") or "application/octet-stream").lower()
mime = str(
payload.get("content_type") or "application/octet-stream"
).lower()
data = payload.get("content") or b""
filename = payload.get("filename") or "attachment.bin"
@@ -2327,7 +2399,9 @@ class WhatsAppClient(ClientBase):
self._client.send_video(jid, data, caption="")
)
elif mime.startswith("audio/") and hasattr(self._client, "send_audio"):
response = await self._maybe_await(self._client.send_audio(jid, data))
response = await self._maybe_await(
self._client.send_audio(jid, data)
)
elif hasattr(self._client, "send_document"):
response = await self._maybe_await(
self._client.send_document(
@@ -2351,21 +2425,46 @@ class WhatsAppClient(ClientBase):
if text:
response = None
last_error = None
for attempt in range(3):
# Prepare cancel key (if caller provided command_id)
cancel_key = None
try:
if command_id:
cancel_key = transport._runtime_command_cancel_key(
self.service, str(command_id)
)
except Exception:
cancel_key = None
for attempt in range(5): # Increased from 3 to 5 attempts
# Check for a cancellation marker set by transport.cancel_runtime_command
try:
if cancel_key and cache.get(cancel_key):
self.log.info("whatsapp send cancelled via cancel marker")
return False
except Exception:
pass
try:
# Log what we're about to send for debugging
self.log.debug(f"send_message attempt {attempt+1}: jid={jid} text_type={type(text).__name__} text_len={len(text)}")
response = await self._maybe_await(self._client.send_message(jid, text))
if getattr(settings, "WHATSAPP_DEBUG", False):
self.log.debug(
f"send_message attempt {attempt+1}: jid={jid} text_type={type(text).__name__} text_len={len(text)}"
)
response = await self._maybe_await(
self._client.send_message(jid, text)
)
sent_any = True
last_error = None
break
except Exception as exc:
self.log.debug(f"send_message attempt {attempt+1} failed: {type(exc).__name__}: {exc}")
if getattr(settings, "WHATSAPP_DEBUG", False):
self.log.debug(
f"send_message attempt {attempt+1} failed: {type(exc).__name__}: {exc}"
)
last_error = exc
error_text = str(last_error or "").lower()
is_transient = "usync query" in error_text or "timed out" in error_text
if is_transient and attempt < 2:
if is_transient and attempt < 4: # Updated to match new attempt range
if hasattr(self._client, "connect"):
try:
await self._maybe_await(self._client.connect())
@@ -2381,7 +2480,21 @@ class WhatsAppClient(ClientBase):
last_event="send_retry_reconnect_failed",
last_error=str(reconnect_exc),
)
await asyncio.sleep(0.8 * (attempt + 1))
# Sleep but wake earlier if cancelled: poll small intervals
# Increase backoff time for device list queries
total_sleep = 1.5 * (attempt + 1)
slept = 0.0
while slept < total_sleep:
try:
if cancel_key and cache.get(cancel_key):
self.log.info(
"whatsapp send cancelled during retry backoff"
)
return False
except Exception:
pass
await asyncio.sleep(0.2)
slept += 0.2
continue
break
if last_error is not None and not sent_any:
@@ -2420,7 +2533,9 @@ class WhatsAppClient(ClientBase):
pass
if hasattr(self._client, "set_chat_presence"):
try:
await self._maybe_await(self._client.set_chat_presence(jid, "composing"))
await self._maybe_await(
self._client.set_chat_presence(jid, "composing")
)
return True
except Exception:
pass