import asyncio import json import time from datetime import datetime, timezone as dt_timezone from urllib.parse import parse_qs from asgiref.sync import sync_to_async from django.core import signing from core.models import ChatSession, Message, PersonIdentifier from core.views.compose import COMPOSE_WS_TOKEN_SALT def _safe_int(value, default=0): try: return int(value) except (TypeError, ValueError): return default def _fmt_ts(ts_value): try: dt = datetime.fromtimestamp(int(ts_value) / 1000, tz=dt_timezone.utc) return dt.strftime("%H:%M") except Exception: return str(ts_value or "") def _serialize_message(msg): author = str(msg.custom_author or "").strip() return { "id": str(msg.id), "ts": int(msg.ts or 0), "display_ts": _fmt_ts(msg.ts), "text": str(msg.text or ""), "author": author, "outgoing": author.upper() in {"USER", "BOT"}, } def _load_since(user_id, service, identifier, person_id, after_ts, limit): person_identifier = None if person_id: person_identifier = ( PersonIdentifier.objects.filter( user_id=user_id, person_id=person_id, service=service, ).first() or PersonIdentifier.objects.filter( user_id=user_id, person_id=person_id, ).first() ) if person_identifier is None and identifier: person_identifier = PersonIdentifier.objects.filter( user_id=user_id, service=service, identifier=identifier, ).first() if person_identifier is None: return {"messages": [], "last_ts": after_ts} session = ChatSession.objects.filter( user_id=user_id, identifier=person_identifier, ).first() if session is None: return {"messages": [], "last_ts": after_ts} qs = Message.objects.filter( user_id=user_id, session=session, ).order_by("ts") if after_ts > 0: qs = qs.filter(ts__gt=after_ts) rows = list(qs[: max(10, min(limit, 200))]) newest = ( Message.objects.filter(user_id=user_id, session=session) .order_by("-ts") .values_list("ts", flat=True) .first() ) return { "messages": [_serialize_message(row) for row in rows], "last_ts": int(newest or after_ts or 0), } async def compose_ws_application(scope, receive, send): if scope.get("type") != "websocket": return query_string = (scope.get("query_string") or b"").decode("utf-8", errors="ignore") params = parse_qs(query_string) token = (params.get("token") or [""])[0] try: payload = signing.loads(token, salt=COMPOSE_WS_TOKEN_SALT) except Exception: await send({"type": "websocket.close", "code": 4401}) return if _safe_int(payload.get("exp")) < int(time.time()): await send({"type": "websocket.close", "code": 4401}) return user_id = _safe_int(payload.get("u")) service = str(payload.get("s") or "").strip() identifier = str(payload.get("i") or "").strip() person_id = str(payload.get("p") or "").strip() if user_id <= 0 or (not identifier and not person_id): await send({"type": "websocket.close", "code": 4401}) return await send({"type": "websocket.accept"}) last_ts = 0 limit = 100 while True: event = None try: event = await asyncio.wait_for(receive(), timeout=1.2) except asyncio.TimeoutError: event = None if event and event.get("type") == "websocket.disconnect": break if event and event.get("type") == "websocket.receive": try: body = json.loads(event.get("text") or "{}") except Exception: body = {} if body.get("kind") == "sync": last_ts = max(last_ts, _safe_int(body.get("last_ts"), 0)) payload = await sync_to_async(_load_since)( user_id=user_id, service=service, identifier=identifier, person_id=person_id, after_ts=last_ts, limit=limit, ) messages = payload.get("messages") or [] latest = _safe_int(payload.get("last_ts"), last_ts) if messages: last_ts = max(last_ts, latest) await send( { "type": "websocket.send", "text": json.dumps( { "messages": messages, "last_ts": last_ts, } ), } ) else: last_ts = max(last_ts, latest)