Files
GIA/core/events/shadow.py

149 lines
5.2 KiB
Python

from __future__ import annotations
from django.db.models import Count, Max, Q
from core.models import ConversationEvent, Person, User
def _kind_from_event_type(event_type: str) -> str:
normalized = str(event_type or "").strip().lower()
return {
"message_created": "message_sent",
"delivery_receipt": "message_delivered",
"read_receipt": "message_read",
"typing_started": "composing_started",
"typing_stopped": "composing_stopped",
"composing_abandoned": "composing_abandoned",
"presence_available": "presence_available",
"presence_unavailable": "presence_unavailable",
}.get(normalized, normalized)
def get_shadow_behavioral_availability_stats(*, user: User) -> list[dict]:
person_map = {
str(row["id"]): str(row["name"] or "")
for row in Person.objects.filter(user=user).values("id", "name")
}
rows = (
ConversationEvent.objects.filter(
user=user,
session__identifier__person__isnull=False,
)
.values("session__identifier__person_id", "origin_transport")
.annotate(
total_events=Count("id"),
presence_events=Count(
"id",
filter=Q(event_type__in=["presence_available", "presence_unavailable"]),
),
read_events=Count("id", filter=Q(event_type="read_receipt")),
typing_events=Count(
"id",
filter=Q(
event_type__in=["typing_started", "typing_stopped"]
),
),
message_events=Count("id", filter=Q(event_type="message_created")),
abandoned_events=Count("id", filter=Q(event_type="composing_abandoned")),
last_event_ts=Max("ts"),
)
.order_by("-total_events", "session__identifier__person_id", "origin_transport")
)
output = []
for row in rows:
person_id = str(row.get("session__identifier__person_id") or "").strip()
output.append(
{
"person_id": person_id,
"person_name": person_map.get(person_id, person_id or "-"),
"service": str(row.get("origin_transport") or "").strip().lower(),
"total_events": int(row.get("total_events") or 0),
"presence_events": int(row.get("presence_events") or 0),
"read_events": int(row.get("read_events") or 0),
"typing_events": int(row.get("typing_events") or 0),
"message_events": int(row.get("message_events") or 0),
"abandoned_events": int(row.get("abandoned_events") or 0),
"last_event_ts": int(row.get("last_event_ts") or 0),
}
)
return output
def get_shadow_behavioral_latest_states(
*, user: User, person_ids: list[str], transport: str = ""
) -> list[dict]:
queryset = ConversationEvent.objects.filter(
user=user,
session__identifier__person_id__in=[str(value) for value in person_ids],
event_type__in=[
"message_created",
"delivery_receipt",
"read_receipt",
"typing_started",
"typing_stopped",
"composing_abandoned",
"presence_available",
"presence_unavailable",
],
).select_related("session__identifier")
if transport:
queryset = queryset.filter(origin_transport=str(transport).strip().lower())
rows = []
seen = set()
for row in queryset.order_by(
"session__identifier__person_id", "-ts", "-created_at"
)[:500]:
person_id = str(getattr(row.session.identifier, "person_id", "") or "").strip()
if not person_id or person_id in seen:
continue
seen.add(person_id)
rows.append(
{
"person_id": person_id,
"transport": str(row.origin_transport or "").strip().lower(),
"kind": _kind_from_event_type(row.event_type),
"ts": int(row.ts or 0),
}
)
return rows
def get_shadow_behavioral_events_for_range(
*,
user: User,
person_id: str,
start_ts: int,
end_ts: int,
transport: str = "",
) -> list[dict]:
queryset = ConversationEvent.objects.filter(
user=user,
session__identifier__person_id=str(person_id or "").strip(),
ts__gte=int(start_ts),
ts__lte=int(end_ts),
event_type__in=[
"message_created",
"delivery_receipt",
"read_receipt",
"typing_started",
"typing_stopped",
"composing_abandoned",
"presence_available",
"presence_unavailable",
],
).order_by("ts", "created_at")
if transport:
queryset = queryset.filter(origin_transport=str(transport).strip().lower())
return [
{
"person_id": str(person_id or "").strip(),
"session_id": str(row.session_id or ""),
"transport": str(row.origin_transport or "").strip().lower(),
"kind": _kind_from_event_type(row.event_type),
"direction": str(row.direction or "").strip().lower(),
"ts": int(row.ts or 0),
"payload": dict(row.payload or {}),
}
for row in queryset[:1000]
]