Implement reactions and image sync
This commit is contained in:
@@ -9,6 +9,7 @@ from urllib.parse import quote_plus
|
||||
import aiohttp
|
||||
import orjson
|
||||
import qrcode
|
||||
from asgiref.sync import sync_to_async
|
||||
from django.conf import settings
|
||||
from django.core.cache import cache
|
||||
|
||||
@@ -21,6 +22,7 @@ log = logs.get_logger("transport")
|
||||
_RUNTIME_STATE_TTL = 60 * 60 * 24
|
||||
_RUNTIME_COMMANDS_TTL = 60 * 15
|
||||
_RUNTIME_COMMAND_RESULT_TTL = 60
|
||||
_BRIDGE_MAP_TTL = 60 * 60 * 24 * 14
|
||||
_RUNTIME_CLIENTS: dict[str, Any] = {}
|
||||
|
||||
|
||||
@@ -48,6 +50,10 @@ def _runtime_command_meta_key(service: str, command_id: str) -> str:
|
||||
return f"gia:service:command-meta:{_service_key(service)}:{command_id}"
|
||||
|
||||
|
||||
def _bridge_map_key(user_id: int, person_id: int, service: str) -> str:
|
||||
return f"gia:bridge:map:{int(user_id)}:{int(person_id)}:{_service_key(service)}"
|
||||
|
||||
|
||||
def _gateway_base(service: str) -> str:
|
||||
key = f"{service.upper()}_HTTP_URL"
|
||||
default = f"http://{service}:8080"
|
||||
@@ -69,6 +75,50 @@ def _parse_timestamp(data: Any):
|
||||
return None
|
||||
|
||||
|
||||
def _attachment_has_inline_content(attachment: dict | None) -> bool:
|
||||
value = (attachment or {}).get("content")
|
||||
return isinstance(value, (bytes, bytearray, memoryview))
|
||||
|
||||
|
||||
def _normalize_inline_content(attachment: dict) -> dict:
|
||||
row = dict(attachment or {})
|
||||
content = row.get("content")
|
||||
if isinstance(content, memoryview):
|
||||
row["content"] = content.tobytes()
|
||||
elif isinstance(content, bytearray):
|
||||
row["content"] = bytes(content)
|
||||
if isinstance(row.get("content"), bytes) and not row.get("size"):
|
||||
row["size"] = len(row["content"])
|
||||
return row
|
||||
|
||||
|
||||
async def prepare_outbound_attachments(service: str, attachments: list | None) -> list:
|
||||
"""
|
||||
Resolve outbound attachment refs into payloads once, in parallel.
|
||||
|
||||
This is the shared media-prep layer for XMPP -> {Signal, WhatsApp} sends,
|
||||
so attachment performance improvements live in one place.
|
||||
|
||||
TODO: Stream per-attachment send as each payload resolves (as_completed)
|
||||
to reduce first-byte latency for large media batches.
|
||||
"""
|
||||
rows = [dict(att or {}) for att in (attachments or [])]
|
||||
if not rows:
|
||||
return []
|
||||
|
||||
async def _resolve(row: dict):
|
||||
if _attachment_has_inline_content(row):
|
||||
return _normalize_inline_content(row)
|
||||
fetched = await fetch_attachment(service, row)
|
||||
if not fetched:
|
||||
return row
|
||||
merged = dict(row)
|
||||
merged.update(dict(fetched or {}))
|
||||
return _normalize_inline_content(merged)
|
||||
|
||||
return await asyncio.gather(*[_resolve(row) for row in rows])
|
||||
|
||||
|
||||
def register_runtime_client(service: str, client: Any):
|
||||
"""
|
||||
Register an in-process runtime client (UR process).
|
||||
@@ -96,6 +146,178 @@ def update_runtime_state(service: str, **updates):
|
||||
return state
|
||||
|
||||
|
||||
def record_bridge_mapping(
|
||||
*,
|
||||
user_id: int,
|
||||
person_id: int,
|
||||
service: str,
|
||||
xmpp_message_id: str = "",
|
||||
xmpp_ts: int | None = None,
|
||||
upstream_message_id: str = "",
|
||||
upstream_author: str = "",
|
||||
upstream_ts: int | None = None,
|
||||
text_preview: str = "",
|
||||
local_message_id: str = "",
|
||||
):
|
||||
key = _bridge_map_key(user_id, person_id, service)
|
||||
rows = list(cache.get(key) or [])
|
||||
now_ts = int(time.time() * 1000)
|
||||
entry = {
|
||||
"xmpp_message_id": str(xmpp_message_id or "").strip(),
|
||||
"xmpp_ts": int(xmpp_ts or 0),
|
||||
"upstream_message_id": str(upstream_message_id or "").strip(),
|
||||
"upstream_author": str(upstream_author or "").strip(),
|
||||
"upstream_ts": int(upstream_ts or 0),
|
||||
"text_preview": str(text_preview or "").strip()[:1000],
|
||||
"local_message_id": str(local_message_id or "").strip(),
|
||||
"updated_at": now_ts,
|
||||
}
|
||||
if not entry["xmpp_message_id"] and not entry["upstream_message_id"]:
|
||||
if entry["upstream_ts"] <= 0 and entry["xmpp_ts"] <= 0:
|
||||
return None
|
||||
|
||||
deduped = []
|
||||
for row in rows:
|
||||
same_xmpp = bool(entry["xmpp_message_id"]) and (
|
||||
str((row or {}).get("xmpp_message_id") or "").strip()
|
||||
== entry["xmpp_message_id"]
|
||||
)
|
||||
same_upstream = bool(entry["upstream_message_id"]) and (
|
||||
str((row or {}).get("upstream_message_id") or "").strip()
|
||||
== entry["upstream_message_id"]
|
||||
)
|
||||
if same_xmpp or same_upstream:
|
||||
continue
|
||||
deduped.append(dict(row or {}))
|
||||
|
||||
deduped.append(entry)
|
||||
if len(deduped) > 2000:
|
||||
deduped = deduped[-2000:]
|
||||
cache.set(key, deduped, timeout=_BRIDGE_MAP_TTL)
|
||||
log.debug(
|
||||
"reaction-bridge map-write service=%s user=%s person=%s xmpp_id=%s upstream_id=%s upstream_ts=%s local_id=%s rows=%s",
|
||||
service,
|
||||
user_id,
|
||||
person_id,
|
||||
entry.get("xmpp_message_id") or "-",
|
||||
entry.get("upstream_message_id") or "-",
|
||||
entry.get("upstream_ts") or 0,
|
||||
entry.get("local_message_id") or "-",
|
||||
len(deduped),
|
||||
)
|
||||
return entry
|
||||
|
||||
|
||||
def resolve_bridge_from_xmpp(
|
||||
*, user_id: int, person_id: int, service: str, xmpp_message_id: str
|
||||
):
|
||||
target_id = str(xmpp_message_id or "").strip()
|
||||
if not target_id:
|
||||
return None
|
||||
key = _bridge_map_key(user_id, person_id, service)
|
||||
rows = list(cache.get(key) or [])
|
||||
for row in reversed(rows):
|
||||
if str((row or {}).get("xmpp_message_id") or "").strip() == target_id:
|
||||
log.debug(
|
||||
"reaction-bridge resolve-xmpp-hit service=%s user=%s person=%s xmpp_id=%s upstream_id=%s",
|
||||
service,
|
||||
user_id,
|
||||
person_id,
|
||||
target_id,
|
||||
str((row or {}).get("upstream_message_id") or "-").strip(),
|
||||
)
|
||||
return dict(row or {})
|
||||
log.debug(
|
||||
"reaction-bridge resolve-xmpp-miss service=%s user=%s person=%s xmpp_id=%s rows=%s",
|
||||
service,
|
||||
user_id,
|
||||
person_id,
|
||||
target_id,
|
||||
len(rows),
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
def resolve_bridge_from_upstream(
|
||||
*,
|
||||
user_id: int,
|
||||
person_id: int,
|
||||
service: str,
|
||||
upstream_message_id: str = "",
|
||||
upstream_ts: int | None = None,
|
||||
):
|
||||
key = _bridge_map_key(user_id, person_id, service)
|
||||
rows = list(cache.get(key) or [])
|
||||
target_id = str(upstream_message_id or "").strip()
|
||||
if target_id:
|
||||
for row in reversed(rows):
|
||||
if str((row or {}).get("upstream_message_id") or "").strip() == target_id:
|
||||
log.debug(
|
||||
"reaction-bridge resolve-upstream-id-hit service=%s user=%s person=%s upstream_id=%s xmpp_id=%s",
|
||||
service,
|
||||
user_id,
|
||||
person_id,
|
||||
target_id,
|
||||
str((row or {}).get("xmpp_message_id") or "-").strip(),
|
||||
)
|
||||
return dict(row or {})
|
||||
target_ts = int(upstream_ts or 0)
|
||||
if target_ts > 0:
|
||||
best = None
|
||||
best_gap = None
|
||||
for row in rows:
|
||||
row_ts = int((row or {}).get("upstream_ts") or 0)
|
||||
if row_ts <= 0:
|
||||
continue
|
||||
gap = abs(row_ts - target_ts)
|
||||
row_updated = int((row or {}).get("updated_at") or 0)
|
||||
best_updated = int((best or {}).get("updated_at") or 0) if best else 0
|
||||
if (
|
||||
best is None
|
||||
or gap < best_gap
|
||||
or (gap == best_gap and row_updated > best_updated)
|
||||
):
|
||||
best = dict(row or {})
|
||||
best_gap = gap
|
||||
if best is not None and best_gap is not None and best_gap <= 15_000:
|
||||
log.debug(
|
||||
"reaction-bridge resolve-upstream-ts-hit service=%s user=%s person=%s target_ts=%s gap_ms=%s picked_xmpp_id=%s picked_upstream_ts=%s",
|
||||
service,
|
||||
user_id,
|
||||
person_id,
|
||||
target_ts,
|
||||
best_gap,
|
||||
str((best or {}).get("xmpp_message_id") or "-").strip(),
|
||||
int((best or {}).get("upstream_ts") or 0),
|
||||
)
|
||||
return best
|
||||
log.debug(
|
||||
"reaction-bridge resolve-upstream-miss service=%s user=%s person=%s upstream_id=%s upstream_ts=%s rows=%s",
|
||||
service,
|
||||
user_id,
|
||||
person_id,
|
||||
target_id or "-",
|
||||
target_ts,
|
||||
len(rows),
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
def resolve_bridge_from_text_hint(
|
||||
*, user_id: int, person_id: int, service: str, text_hint: str
|
||||
):
|
||||
hint = str(text_hint or "").strip().lower()
|
||||
if not hint:
|
||||
return None
|
||||
key = _bridge_map_key(user_id, person_id, service)
|
||||
rows = list(cache.get(key) or [])
|
||||
for row in reversed(rows):
|
||||
preview = str((row or {}).get("text_preview") or "").strip().lower()
|
||||
if preview and (preview == hint or hint in preview):
|
||||
return dict(row or {})
|
||||
return None
|
||||
|
||||
|
||||
def enqueue_runtime_command(
|
||||
service: str, action: str, payload: dict | None = None
|
||||
) -> str:
|
||||
@@ -478,13 +700,62 @@ async def _gateway_typing(service: str, recipient: str, started: bool):
|
||||
return False
|
||||
|
||||
|
||||
async def send_message_raw(service: str, recipient: str, text=None, attachments=None):
|
||||
async def send_message_raw(
|
||||
service: str,
|
||||
recipient: str,
|
||||
text=None,
|
||||
attachments=None,
|
||||
metadata: dict | None = None,
|
||||
):
|
||||
"""
|
||||
Unified outbound send path used by models/views/UR.
|
||||
"""
|
||||
service_key = _service_key(service)
|
||||
if service_key == "signal":
|
||||
return await signalapi.send_message_raw(recipient, text, attachments or [])
|
||||
prepared_attachments = await prepare_outbound_attachments(
|
||||
service_key, attachments or []
|
||||
)
|
||||
result = await signalapi.send_message_raw(recipient, text, prepared_attachments)
|
||||
meta = dict(metadata or {})
|
||||
xmpp_source_id = str(meta.get("xmpp_source_id") or "").strip()
|
||||
if xmpp_source_id and result:
|
||||
from core.models import PersonIdentifier
|
||||
|
||||
identifier_row = await sync_to_async(
|
||||
lambda: PersonIdentifier.objects.filter(
|
||||
service="signal",
|
||||
identifier=recipient,
|
||||
)
|
||||
.select_related("user", "person")
|
||||
.first()
|
||||
)()
|
||||
if identifier_row is not None:
|
||||
record_bridge_mapping(
|
||||
user_id=identifier_row.user_id,
|
||||
person_id=identifier_row.person_id,
|
||||
service="signal",
|
||||
xmpp_message_id=xmpp_source_id,
|
||||
xmpp_ts=int(meta.get("xmpp_source_ts") or 0),
|
||||
upstream_message_id="",
|
||||
upstream_author=str(meta.get("upstream_author") or ""),
|
||||
upstream_ts=int(result) if isinstance(result, int) else 0,
|
||||
text_preview=str(meta.get("xmpp_body") or text or ""),
|
||||
local_message_id=str(meta.get("legacy_message_id") or ""),
|
||||
)
|
||||
from core.messaging import history
|
||||
|
||||
await history.save_bridge_ref(
|
||||
user=identifier_row.user,
|
||||
identifier=identifier_row,
|
||||
source_service="signal",
|
||||
local_message_id=str(meta.get("legacy_message_id") or ""),
|
||||
local_ts=int(meta.get("xmpp_source_ts") or 0),
|
||||
xmpp_message_id=xmpp_source_id,
|
||||
upstream_message_id="",
|
||||
upstream_author=str(meta.get("upstream_author") or ""),
|
||||
upstream_ts=int(result) if isinstance(result, int) else 0,
|
||||
)
|
||||
return result
|
||||
|
||||
if service_key == "whatsapp":
|
||||
runtime_client = get_runtime_client(service_key)
|
||||
@@ -493,7 +764,10 @@ async def send_message_raw(service: str, recipient: str, text=None, attachments=
|
||||
runtime_result = await runtime_client.send_message_raw(
|
||||
recipient,
|
||||
text=text,
|
||||
attachments=attachments or [],
|
||||
attachments=await prepare_outbound_attachments(
|
||||
service_key, attachments or []
|
||||
),
|
||||
metadata=dict(metadata or {}),
|
||||
)
|
||||
if runtime_result is not False and runtime_result is not None:
|
||||
return runtime_result
|
||||
@@ -501,8 +775,11 @@ async def send_message_raw(service: str, recipient: str, text=None, attachments=
|
||||
log.warning("%s runtime send failed: %s", service_key, exc)
|
||||
# Web/UI process cannot access UR in-process runtime client directly.
|
||||
# Hand off send to UR via shared cache command queue.
|
||||
prepared_attachments = await prepare_outbound_attachments(
|
||||
service_key, attachments or []
|
||||
)
|
||||
command_attachments = []
|
||||
for att in attachments or []:
|
||||
for att in prepared_attachments:
|
||||
row = dict(att or {})
|
||||
# Keep payload cache-friendly and avoid embedding raw bytes.
|
||||
for key in ("content",):
|
||||
@@ -515,6 +792,7 @@ async def send_message_raw(service: str, recipient: str, text=None, attachments=
|
||||
"recipient": recipient,
|
||||
"text": text or "",
|
||||
"attachments": command_attachments,
|
||||
"metadata": dict(metadata or {}),
|
||||
},
|
||||
)
|
||||
command_result = await wait_runtime_command_result(
|
||||
@@ -540,6 +818,7 @@ async def send_message_raw(service: str, recipient: str, text=None, attachments=
|
||||
recipient,
|
||||
text=text,
|
||||
attachments=attachments or [],
|
||||
metadata=dict(metadata or {}),
|
||||
)
|
||||
if runtime_result is not False and runtime_result is not None:
|
||||
return runtime_result
|
||||
@@ -557,6 +836,85 @@ async def send_message_raw(service: str, recipient: str, text=None, attachments=
|
||||
raise NotImplementedError(f"Unsupported service: {service}")
|
||||
|
||||
|
||||
async def send_reaction(
|
||||
service: str,
|
||||
recipient: str,
|
||||
*,
|
||||
emoji: str,
|
||||
target_message_id: str = "",
|
||||
target_timestamp: int | None = None,
|
||||
target_author: str = "",
|
||||
remove: bool = False,
|
||||
):
|
||||
service_key = _service_key(service)
|
||||
if not str(emoji or "").strip() and not remove:
|
||||
return False
|
||||
|
||||
if service_key == "signal":
|
||||
log.debug(
|
||||
"reaction-bridge send service=signal recipient=%s target_ts=%s target_author=%s remove=%s",
|
||||
recipient,
|
||||
int(target_timestamp or 0),
|
||||
str(target_author or recipient),
|
||||
bool(remove),
|
||||
)
|
||||
return await signalapi.send_reaction(
|
||||
recipient_uuid=recipient,
|
||||
emoji=str(emoji or ""),
|
||||
target_timestamp=target_timestamp,
|
||||
target_author=str(target_author or recipient),
|
||||
remove=remove,
|
||||
)
|
||||
|
||||
if service_key == "whatsapp":
|
||||
runtime_client = get_runtime_client(service_key)
|
||||
if runtime_client and hasattr(runtime_client, "send_reaction"):
|
||||
try:
|
||||
log.debug(
|
||||
"reaction-bridge send service=whatsapp runtime recipient=%s target_id=%s target_ts=%s remove=%s",
|
||||
recipient,
|
||||
str(target_message_id or "") or "-",
|
||||
int(target_timestamp or 0),
|
||||
bool(remove),
|
||||
)
|
||||
result = await runtime_client.send_reaction(
|
||||
recipient,
|
||||
emoji=str(emoji or ""),
|
||||
target_message_id=str(target_message_id or ""),
|
||||
target_timestamp=(int(target_timestamp) if target_timestamp else 0),
|
||||
remove=bool(remove),
|
||||
)
|
||||
if result:
|
||||
return True
|
||||
except Exception as exc:
|
||||
log.warning("%s runtime reaction failed: %s", service_key, exc)
|
||||
|
||||
command_id = enqueue_runtime_command(
|
||||
service_key,
|
||||
"send_reaction",
|
||||
{
|
||||
"recipient": recipient,
|
||||
"emoji": str(emoji or ""),
|
||||
"target_message_id": str(target_message_id or ""),
|
||||
"target_timestamp": int(target_timestamp or 0),
|
||||
"remove": bool(remove),
|
||||
},
|
||||
)
|
||||
command_result = await wait_runtime_command_result(
|
||||
service_key,
|
||||
command_id,
|
||||
timeout=20.0,
|
||||
)
|
||||
log.debug(
|
||||
"reaction-bridge send service=whatsapp queued-result ok=%s command_id=%s",
|
||||
bool(isinstance(command_result, dict) and command_result.get("ok")),
|
||||
command_id,
|
||||
)
|
||||
return bool(isinstance(command_result, dict) and command_result.get("ok"))
|
||||
|
||||
return False
|
||||
|
||||
|
||||
async def start_typing(service: str, recipient: str):
|
||||
service_key = _service_key(service)
|
||||
if service_key == "signal":
|
||||
|
||||
Reference in New Issue
Block a user