Files
GIA/core/gateway/commands.py

138 lines
4.1 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from typing import Awaitable, Callable
from asgiref.sync import sync_to_async
from core.models import GatewayCommandEvent
from core.security.command_policy import CommandSecurityContext, evaluate_command_policy
GatewayEmit = Callable[[str], None]
GatewayHandler = Callable[["GatewayCommandContext", GatewayEmit], Awaitable[bool]]
GatewayMatcher = Callable[[str], bool]
@dataclass(slots=True)
class GatewayCommandContext:
user: object
source_message: object
service: str
channel_identifier: str
sender_identifier: str
message_text: str
message_meta: dict
payload: dict
@dataclass(slots=True)
class GatewayCommandRoute:
name: str
scope_key: str
matcher: GatewayMatcher
handler: GatewayHandler
def _first_token(text: str) -> str:
body = str(text or "").strip()
if not body:
return ""
return str(body.split()[0] or "").strip().lower()
def _derive_unknown_scope(text: str) -> str:
token = _first_token(text).lstrip(".")
if not token:
token = "message"
return f"gateway.{token}"
async def dispatch_gateway_command(
*,
context: GatewayCommandContext,
routes: list[GatewayCommandRoute],
emit: GatewayEmit,
) -> bool:
text = str(context.message_text or "").strip()
if not text:
return False
route = next((row for row in routes if row.matcher(text)), None)
scope_key = route.scope_key if route is not None else _derive_unknown_scope(text)
command_name = route.name if route is not None else _first_token(text).lstrip(".")
event = await sync_to_async(GatewayCommandEvent.objects.create)(
user=context.user,
source_message=context.source_message,
service=str(context.service or "").strip().lower() or "xmpp",
channel_identifier=str(context.channel_identifier or "").strip(),
sender_identifier=str(context.sender_identifier or "").strip(),
scope_key=scope_key,
command_name=command_name,
command_text=text,
status="pending",
request_meta={
"payload": dict(context.payload or {}),
"message_meta": dict(context.message_meta or {}),
},
)
if route is None:
event.status = "ignored"
event.error = "unmatched_gateway_command"
await sync_to_async(event.save)(update_fields=["status", "error", "updated_at"])
return False
decision = await sync_to_async(evaluate_command_policy)(
user=context.user,
scope_key=scope_key,
context=CommandSecurityContext(
service=context.service,
channel_identifier=context.channel_identifier,
message_meta=dict(context.message_meta or {}),
payload=dict(context.payload or {}),
),
)
if not decision.allowed:
message = (
f"blocked by policy: {decision.code}"
if not decision.reason
else f"blocked by policy: {decision.reason}"
)
emit(message)
event.status = "blocked"
event.error = f"{decision.code}:{decision.reason}"
event.response_meta = {
"policy_code": decision.code,
"policy_reason": decision.reason,
}
await sync_to_async(event.save)(
update_fields=["status", "error", "response_meta", "updated_at"]
)
return True
responses: list[str] = []
def _captured_emit(value: str) -> None:
row = str(value or "")
responses.append(row)
emit(row)
try:
handled = await route.handler(context, _captured_emit)
except Exception as exc:
event.status = "failed"
event.error = f"handler_exception:{exc}"
event.response_meta = {"responses": responses}
await sync_to_async(event.save)(
update_fields=["status", "error", "response_meta", "updated_at"]
)
return True
event.status = "ok" if handled else "ignored"
event.response_meta = {"responses": responses}
await sync_to_async(event.save)(
update_fields=["status", "response_meta", "updated_at"]
)
return bool(handled)