Files
GIA/core/management/commands/memory_hygiene.py

41 lines
1.4 KiB
Python

from __future__ import annotations
import json
from django.core.management.base import BaseCommand
from core.memory.pipeline import run_memory_hygiene
class Command(BaseCommand):
help = "Run memory hygiene checks (stale decay + contradiction queueing)."
def add_arguments(self, parser):
parser.add_argument("--user-id", default="")
parser.add_argument("--dry-run", action="store_true", default=False)
parser.add_argument("--json", action="store_true", default=False)
def handle(self, *args, **options):
user_id_raw = str(options.get("user_id") or "").strip()
dry_run = bool(options.get("dry_run"))
as_json = bool(options.get("json"))
user_id = int(user_id_raw) if user_id_raw else None
result = run_memory_hygiene(user_id=user_id, dry_run=dry_run)
payload = {
"user_id": user_id,
"dry_run": dry_run,
"result": result,
}
if as_json:
self.stdout.write(json.dumps(payload, indent=2, sort_keys=True))
return
self.stdout.write(
"memory-hygiene "
f"user={user_id if user_id is not None else '-'} "
f"dry_run={'yes' if dry_run else 'no'} "
f"expired={int(result.get('expired') or 0)} "
f"contradictions={int(result.get('contradictions') or 0)} "
f"queued={int(result.get('queued_requests') or 0)}"
)