Tightly integrate WhatsApp selectors into existing UIs

This commit is contained in:
2026-02-16 10:51:57 +00:00
parent a38339c809
commit 15af8af6b2
19 changed files with 2846 additions and 156 deletions

View File

@@ -46,6 +46,10 @@ COMPOSE_WS_TOKEN_SALT = "compose-ws"
COMPOSE_ENGAGE_TOKEN_SALT = "compose-engage"
COMPOSE_AI_CACHE_TTL = 60 * 30
URL_PATTERN = re.compile(r"https?://[^\s<>'\"\\]+")
SIGNAL_UUID_PATTERN = re.compile(
r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$",
re.IGNORECASE,
)
IMAGE_EXTENSIONS = (
".png",
".jpg",
@@ -1349,6 +1353,42 @@ def _service_icon_class(service: str) -> str:
return "fa-solid fa-address-card"
def _service_label(service: str) -> str:
key = str(service or "").strip().lower()
labels = {
"signal": "Signal",
"whatsapp": "WhatsApp",
"instagram": "Instagram",
"xmpp": "XMPP",
}
return labels.get(key, key.title() if key else "Unknown")
def _service_order(service: str) -> int:
key = str(service or "").strip().lower()
order = {
"signal": 0,
"whatsapp": 1,
"instagram": 2,
"xmpp": 3,
}
return order.get(key, 99)
def _signal_identifier_shape(value: str) -> str:
raw = str(value or "").strip()
if not raw:
return "unknown"
if SIGNAL_UUID_PATTERN.fullmatch(raw):
return "uuid"
digits = re.sub(r"[^0-9]", "", raw)
if digits and raw.replace("+", "").replace(" ", "").replace("-", "").isdigit():
return "phone"
if digits and raw.isdigit():
return "phone"
return "other"
def _manual_contact_rows(user):
rows = []
seen = set()
@@ -1397,6 +1437,7 @@ def _manual_contact_rows(user):
{
"person_name": person_name,
"linked_person_name": linked_person_name,
"person_id": str(person.id) if person else "",
"detected_name": detected,
"service": service_key,
"service_icon_class": _service_icon_class(service_key),
@@ -1489,7 +1530,94 @@ def _manual_contact_rows(user):
detected_name=detected_name,
)
rows.sort(key=lambda row: (row["person_name"].lower(), row["service"], row["identifier"]))
rows.sort(
key=lambda row: (
0 if row.get("linked_person") else 1,
row["person_name"].lower(),
_service_order(row.get("service")),
row["identifier"],
)
)
return rows
def _recent_manual_contacts(
user,
*,
current_service: str,
current_identifier: str,
current_person: Person | None,
limit: int = 12,
):
all_rows = _manual_contact_rows(user)
if not all_rows:
return []
row_by_key = {
(str(row.get("service") or "").strip().lower(), str(row.get("identifier") or "").strip()): row
for row in all_rows
}
ordered_keys = []
seen_keys = set()
recent_values = (
Message.objects.filter(
user=user,
session__identifier__isnull=False,
)
.values_list(
"session__identifier__service",
"session__identifier__identifier",
)
.order_by("-ts", "-id")[:1000]
)
for service_value, identifier_value in recent_values:
key = (
_default_service(service_value),
str(identifier_value or "").strip(),
)
if not key[1] or key in seen_keys:
continue
seen_keys.add(key)
ordered_keys.append(key)
if len(ordered_keys) >= limit:
break
current_key = (_default_service(current_service), str(current_identifier or "").strip())
if current_key[1]:
if current_key in ordered_keys:
ordered_keys.remove(current_key)
ordered_keys.insert(0, current_key)
if len(ordered_keys) > limit:
ordered_keys = ordered_keys[:limit]
rows = []
for service_key, identifier_value in ordered_keys:
row = dict(row_by_key.get((service_key, identifier_value)) or {})
if not row:
urls = _compose_urls(
service_key,
identifier_value,
current_person.id if current_person else None,
)
row = {
"person_name": identifier_value,
"linked_person_name": "",
"detected_name": "",
"service": service_key,
"service_icon_class": _service_icon_class(service_key),
"identifier": identifier_value,
"compose_url": urls["page_url"],
"compose_widget_url": urls["widget_url"],
"linked_person": False,
"source": "recent",
}
row["service_label"] = _service_label(service_key)
row["person_id"] = str(row.get("person_id") or "")
row["is_active"] = (
service_key == _default_service(current_service)
and identifier_value == str(current_identifier or "").strip()
)
rows.append(row)
return rows
@@ -1591,7 +1719,59 @@ def _panel_context(
identifier=base["identifier"],
person_id=base["person"].id if base["person"] else None,
)
ws_url = f"/ws/compose/thread/?{urlencode({'token': ws_token})}"
ws_url = ""
if bool(getattr(settings, "COMPOSE_WS_ENABLED", False)):
ws_url = f"/ws/compose/thread/?{urlencode({'token': ws_token})}"
platform_options = []
if base["person"] is not None:
linked_identifiers = list(
PersonIdentifier.objects.filter(
user=request.user,
person=base["person"],
).order_by("service", "id")
)
by_service = {}
for row in linked_identifiers:
service_key = _default_service(row.service)
identifier_value = str(row.identifier or "").strip()
if not identifier_value:
continue
if service_key not in by_service:
by_service[service_key] = identifier_value
if base["service"] and base["identifier"]:
by_service[base["service"]] = base["identifier"]
for service_key in sorted(by_service.keys(), key=_service_order):
identifier_value = by_service[service_key]
option_urls = _compose_urls(service_key, identifier_value, base["person"].id)
platform_options.append(
{
"service": service_key,
"service_label": _service_label(service_key),
"identifier": identifier_value,
"person_id": str(base["person"].id),
"page_url": option_urls["page_url"],
"widget_url": option_urls["widget_url"],
"is_active": (
service_key == base["service"]
and identifier_value == base["identifier"]
),
}
)
elif base["identifier"]:
option_urls = _compose_urls(base["service"], base["identifier"], None)
platform_options.append(
{
"service": base["service"],
"service_label": _service_label(base["service"]),
"identifier": base["identifier"],
"person_id": "",
"page_url": option_urls["page_url"],
"widget_url": option_urls["widget_url"],
"is_active": True,
}
)
unique_raw = (
f"{base['service']}|{base['identifier']}|{request.user.id}|{time.time_ns()}"
@@ -1601,6 +1781,13 @@ def _panel_context(
user_id=request.user.id,
person_id=base["person"].id if base["person"] else None,
)
recent_contacts = _recent_manual_contacts(
request.user,
current_service=base["service"],
current_identifier=base["identifier"],
current_person=base["person"],
limit=12,
)
return {
"service": base["service"],
@@ -1627,6 +1814,7 @@ def _panel_context(
"compose_engage_preview_url": reverse("compose_engage_preview"),
"compose_engage_send_url": reverse("compose_engage_send"),
"compose_quick_insights_url": reverse("compose_quick_insights"),
"compose_history_sync_url": reverse("compose_history_sync"),
"compose_ws_url": ws_url,
"ai_workspace_url": (
f"{reverse('ai_workspace')}?person={base['person'].id}"
@@ -1644,6 +1832,8 @@ def _panel_context(
"manual_icon_class": "fa-solid fa-paper-plane",
"panel_id": f"compose-panel-{unique}",
"typing_state_json": json.dumps(typing_state),
"platform_options": platform_options,
"recent_contacts": recent_contacts,
}
@@ -1757,6 +1947,28 @@ class ComposeContactMatch(LoginRequiredMixin, View):
def get(self, request):
return render(request, self.template_name, self._context(request))
def _signal_companion_identifiers(self, identifier: str) -> set[str]:
value = str(identifier or "").strip()
if not value:
return set()
source_shape = _signal_identifier_shape(value)
companions = set()
signal_rows = Chat.objects.filter(source_uuid=value) | Chat.objects.filter(
source_number=value
)
for chat in signal_rows.order_by("-id")[:1000]:
for candidate in (chat.source_uuid, chat.source_number):
cleaned = str(candidate or "").strip()
if not cleaned or cleaned == value:
continue
# Keep auto-linking conservative: only same-shape companions.
if source_shape != "other":
candidate_shape = _signal_identifier_shape(cleaned)
if candidate_shape != source_shape:
continue
companions.add(cleaned)
return companions
def post(self, request):
person_id = str(request.POST.get("person_id") or "").strip()
person_name = str(request.POST.get("person_name") or "").strip()
@@ -1800,6 +2012,38 @@ class ComposeContactMatch(LoginRequiredMixin, View):
message = f"Re-linked {identifier} ({service}) to {person.name}."
else:
message = f"{identifier} ({service}) is already linked to {person.name}."
linked_companions = 0
skipped_companions = 0
if service == "signal":
companions = self._signal_companion_identifiers(identifier)
for candidate in companions:
existing = PersonIdentifier.objects.filter(
user=request.user,
service="signal",
identifier=candidate,
).first()
if existing is None:
PersonIdentifier.objects.create(
user=request.user,
person=person,
service="signal",
identifier=candidate,
)
linked_companions += 1
continue
if existing.person_id != person.id:
skipped_companions += 1
if linked_companions:
message = (
f"{message} Added {linked_companions} companion Signal identifier"
f"{'' if linked_companions == 1 else 's'}."
)
if skipped_companions:
message = (
f"{message} Skipped {skipped_companions} companion identifier"
f"{'' if skipped_companions == 1 else 's'} already linked to another person."
)
return render(
request,
self.template_name,
@@ -1880,12 +2124,24 @@ class ComposeThread(LoginRequiredMixin, View):
latest_ts = after_ts
messages = []
seed_previous = None
session_ids = ComposeHistorySync._session_ids_for_scope(
user=request.user,
person=base["person"],
service=service,
person_identifier=base["person_identifier"],
explicit_identifier=base["identifier"],
)
if base["person_identifier"] is not None:
session, _ = ChatSession.objects.get_or_create(
user=request.user,
identifier=base["person_identifier"],
)
base_queryset = Message.objects.filter(user=request.user, session=session)
session_ids = list({*session_ids, int(session.id)})
if session_ids:
base_queryset = Message.objects.filter(
user=request.user,
session_id__in=session_ids,
)
queryset = base_queryset
if after_ts > 0:
seed_previous = (
@@ -1901,7 +2157,10 @@ class ComposeThread(LoginRequiredMixin, View):
.order_by("ts")[:limit]
)
newest = (
Message.objects.filter(user=request.user, session=session)
Message.objects.filter(
user=request.user,
session_id__in=session_ids,
)
.order_by("-ts")
.values_list("ts", flat=True)
.first()
@@ -1928,6 +2187,284 @@ class ComposeThread(LoginRequiredMixin, View):
return JsonResponse(payload)
class ComposeHistorySync(LoginRequiredMixin, View):
@staticmethod
def _session_ids_for_identifier(user, person_identifier):
if person_identifier is None:
return []
return list(
ChatSession.objects.filter(
user=user,
identifier=person_identifier,
).values_list("id", flat=True)
)
@staticmethod
def _identifier_variants(service: str, identifier: str):
raw = str(identifier or "").strip()
if not raw:
return []
values = {raw}
if service == "whatsapp":
digits = re.sub(r"[^0-9]", "", raw)
if digits:
values.add(digits)
values.add(f"+{digits}")
values.add(f"{digits}@s.whatsapp.net")
if "@" in raw:
local = raw.split("@", 1)[0].strip()
if local:
values.add(local)
return [value for value in values if value]
@classmethod
def _session_ids_for_scope(
cls,
user,
person,
service: str,
person_identifier,
explicit_identifier: str,
):
identifiers = []
if person_identifier is not None:
identifiers.append(person_identifier)
if person is not None:
identifiers.extend(
list(
PersonIdentifier.objects.filter(
user=user,
person=person,
service=service,
)
)
)
variants = cls._identifier_variants(service, explicit_identifier)
if variants:
identifiers.extend(
list(
PersonIdentifier.objects.filter(
user=user,
service=service,
identifier__in=variants,
)
)
)
unique_ids = []
seen = set()
for row in identifiers:
row_id = int(row.id)
if row_id in seen:
continue
seen.add(row_id)
unique_ids.append(row_id)
if not unique_ids:
return []
return list(
ChatSession.objects.filter(
user=user,
identifier_id__in=unique_ids,
).values_list("id", flat=True)
)
@staticmethod
def _reconcile_duplicate_messages(user, session_ids):
if not session_ids:
return 0
rows = list(
Message.objects.filter(
user=user,
session_id__in=session_ids,
)
.order_by("id")
.values("id", "session_id", "ts", "sender_uuid", "text", "custom_author")
)
seen = {}
duplicate_ids = []
for row in rows:
dedupe_key = (
int(row.get("session_id") or 0),
int(row.get("ts") or 0),
str(row.get("sender_uuid") or ""),
str(row.get("text") or ""),
str(row.get("custom_author") or ""),
)
if dedupe_key in seen:
duplicate_ids.append(row["id"])
continue
seen[dedupe_key] = row["id"]
if not duplicate_ids:
return 0
Message.objects.filter(user=user, id__in=duplicate_ids).delete()
return len(duplicate_ids)
def post(self, request):
service = _default_service(request.POST.get("service"))
identifier = str(request.POST.get("identifier") or "").strip()
person = None
person_id = request.POST.get("person")
if person_id:
person = get_object_or_404(Person, id=person_id, user=request.user)
if not identifier and person is None:
return JsonResponse(
{"ok": False, "message": "Missing contact identifier.", "level": "danger"}
)
base = _context_base(request.user, service, identifier, person)
if base["person_identifier"] is None:
return JsonResponse(
{
"ok": False,
"message": "No linked identifier for this contact yet.",
"level": "warning",
}
)
session_ids = self._session_ids_for_scope(
user=request.user,
person=base["person"],
service=base["service"],
person_identifier=base["person_identifier"],
explicit_identifier=base["identifier"],
)
before_count = 0
if session_ids:
before_count = Message.objects.filter(
user=request.user,
session_id__in=session_ids,
).count()
runtime_result = {}
if base["service"] == "whatsapp":
command_id = transport.enqueue_runtime_command(
"whatsapp",
"force_history_sync",
{
"identifier": base["identifier"],
"person_id": str(base["person"].id) if base["person"] else "",
},
)
runtime_result = async_to_sync(transport.wait_runtime_command_result)(
"whatsapp",
command_id,
timeout=25,
)
if runtime_result is None:
return JsonResponse(
{
"ok": False,
"message": (
"History sync timed out. Runtime may still be processing; "
"watch Runtime Debug and retry."
),
"level": "warning",
}
)
if not runtime_result.get("ok"):
error_text = str(runtime_result.get("error") or "history_sync_failed")
return JsonResponse(
{
"ok": False,
"message": f"History sync failed: {error_text}",
"level": "danger",
}
)
else:
return JsonResponse(
{
"ok": False,
"message": (
f"Force history sync is only available for WhatsApp right now "
f"(current: {base['service']})."
),
"level": "warning",
}
)
session_ids = self._session_ids_for_scope(
user=request.user,
person=base["person"],
service=base["service"],
person_identifier=base["person_identifier"],
explicit_identifier=base["identifier"],
)
raw_after_count = 0
if session_ids:
raw_after_count = Message.objects.filter(
user=request.user,
session_id__in=session_ids,
).count()
dedup_removed = self._reconcile_duplicate_messages(request.user, session_ids)
after_count = raw_after_count
if dedup_removed > 0:
after_count = Message.objects.filter(
user=request.user,
session_id__in=session_ids,
).count()
imported_count = max(0, int(raw_after_count) - int(before_count))
net_new_count = max(0, int(after_count) - int(before_count))
delta = max(0, int(after_count) - int(before_count))
if delta > 0:
detail = []
if imported_count:
detail.append(f"imported {imported_count}")
if dedup_removed:
detail.append(f"reconciled {dedup_removed} duplicate(s)")
suffix = f" ({', '.join(detail)})" if detail else ""
return JsonResponse(
{
"ok": True,
"message": f"History sync complete. Net +{net_new_count} message(s){suffix}.",
"level": "success",
"new_messages": net_new_count,
"imported_messages": imported_count,
"reconciled_duplicates": dedup_removed,
"before": before_count,
"after": after_count,
"runtime_result": runtime_result,
}
)
if dedup_removed > 0:
return JsonResponse(
{
"ok": True,
"message": (
f"History sync complete. Reconciled {dedup_removed} duplicate message(s)."
),
"level": "success",
"new_messages": 0,
"imported_messages": imported_count,
"reconciled_duplicates": dedup_removed,
"before": before_count,
"after": after_count,
"runtime_result": runtime_result,
}
)
return JsonResponse(
{
"ok": True,
"message": (
(
"History sync completed, but this WhatsApp runtime session does not expose "
"message text history yet "
f"({str(runtime_result.get('sqlite_error') or 'no_message_history_source')}). "
"Live incoming/outgoing messages will continue to sync."
)
if str(runtime_result.get("sqlite_error") or "").strip()
else "History sync completed. No new messages were found yet; retry in a few seconds."
),
"level": "info",
"new_messages": 0,
"imported_messages": imported_count,
"reconciled_duplicates": dedup_removed,
"before": before_count,
"after": after_count,
"runtime_result": runtime_result,
}
)
class ComposeMediaBlob(LoginRequiredMixin, View):
"""
Serve cached media blobs for authenticated compose image previews.
@@ -2151,13 +2688,15 @@ class ComposeQuickInsights(LoginRequiredMixin, View):
payload = _quick_insights_rows(conversation)
participant_state = _participant_feedback_state_label(conversation, person)
selected_platform_label = _service_label(base["service"])
return JsonResponse(
{
"ok": True,
"empty": False,
"summary": {
"person_name": person.name,
"platform": conversation.get_platform_type_display(),
"platform": selected_platform_label,
"platform_scope": "All linked platforms",
"state": participant_state
or conversation.get_stability_state_display(),
"stability_state": conversation.get_stability_state_display(),
@@ -2194,6 +2733,7 @@ class ComposeQuickInsights(LoginRequiredMixin, View):
"Each row shows current value, percent change vs previous point, and data-point count.",
"Arrow color indicates improving or risk direction for that metric.",
"State uses participant feedback (Withdrawing/Overextending/Balanced) when available.",
"Values are computed from all linked platform messages for this person.",
"Face indicator maps value range to positive, mixed, or strained climate.",
"Use this card for fast triage; open AI Workspace for full graphs and details.",
],