import time from django.core.cache import cache TYPING_TTL_SECONDS = 12 def _person_key(user_id, person_id): return f"compose:typing:user:{int(user_id)}:person:{int(person_id)}" def set_person_typing_state( *, user_id, person_id, started, source_service="", display_name="", ): if not user_id or not person_id: return now_ms = int(time.time() * 1000) state = { "typing": bool(started), "source_service": str(source_service or ""), "display_name": str(display_name or ""), "updated_ts": now_ms, "expires_ts": (now_ms + (TYPING_TTL_SECONDS * 1000) if started else now_ms), } cache.set( _person_key(user_id, person_id), state, timeout=max(TYPING_TTL_SECONDS * 2, 30), ) def get_person_typing_state(*, user_id, person_id): if not user_id or not person_id: return { "typing": False, "source_service": "", "display_name": "", "updated_ts": 0, "expires_ts": 0, } key = _person_key(user_id, person_id) state = dict(cache.get(key) or {}) if not state: return { "typing": False, "source_service": "", "display_name": "", "updated_ts": 0, "expires_ts": 0, } now_ms = int(time.time() * 1000) is_typing = bool(state.get("typing")) expires_ts = int(state.get("expires_ts") or 0) if is_typing and expires_ts and now_ms > expires_ts: state["typing"] = False state["updated_ts"] = now_ms cache.set(key, state, timeout=max(TYPING_TTL_SECONDS * 2, 30)) return { "typing": bool(state.get("typing")), "source_service": str(state.get("source_service") or ""), "display_name": str(state.get("display_name") or ""), "updated_ts": int(state.get("updated_ts") or 0), "expires_ts": int(state.get("expires_ts") or 0), }