Files
GIA/core/modules/mixed_protocol.py

60 lines
1.7 KiB
Python

from dataclasses import dataclass, field
from typing import Any
@dataclass(slots=True)
class UnifiedEvent:
"""
Normalized event envelope shared across protocol adapters.
"""
service: str
event_type: str
identifier: str = ""
text: str = ""
ts: int | None = None
message_timestamps: list[int] = field(default_factory=list)
attachments: list[dict[str, Any]] = field(default_factory=list)
payload: dict[str, Any] = field(default_factory=dict)
def normalize_gateway_event(service: str, payload: dict[str, Any]) -> UnifiedEvent:
event_type = str(payload.get("type") or "").strip().lower()
message_timestamps = []
raw_timestamps = (
payload.get("message_timestamps") or payload.get("timestamps") or []
)
if isinstance(raw_timestamps, list):
for item in raw_timestamps:
try:
message_timestamps.append(int(item))
except Exception:
continue
elif raw_timestamps:
try:
message_timestamps = [int(raw_timestamps)]
except Exception:
message_timestamps = []
ts = payload.get("ts") or payload.get("timestamp")
try:
ts = int(ts) if ts is not None else None
except Exception:
ts = None
return UnifiedEvent(
service=service,
event_type=event_type,
identifier=str(
payload.get("identifier")
or payload.get("source")
or payload.get("from")
or ""
).strip(),
text=str(payload.get("text") or ""),
ts=ts,
message_timestamps=message_timestamps,
attachments=list(payload.get("attachments") or []),
payload=dict(payload or {}),
)