47 lines
1.6 KiB
Python
47 lines
1.6 KiB
Python
from __future__ import annotations
|
|
|
|
from django.core.management.base import BaseCommand, CommandError
|
|
|
|
from core.events.manticore import upsert_conversation_event
|
|
from core.models import ConversationEvent
|
|
|
|
|
|
class Command(BaseCommand):
|
|
help = "Backfill behavioral events into Manticore from ConversationEvent rows."
|
|
|
|
def add_arguments(self, parser):
|
|
parser.add_argument(
|
|
"--from-conversation-events",
|
|
action="store_true",
|
|
help="Replay ConversationEvent rows into the Manticore event table.",
|
|
)
|
|
parser.add_argument("--user-id", type=int, default=None)
|
|
parser.add_argument("--limit", type=int, default=5000)
|
|
|
|
def handle(self, *args, **options):
|
|
if not bool(options.get("from_conversation_events")):
|
|
raise CommandError("Pass --from-conversation-events to run this backfill.")
|
|
|
|
queryset = (
|
|
ConversationEvent.objects.select_related("session__identifier")
|
|
.order_by("ts", "created_at")
|
|
)
|
|
user_id = options.get("user_id")
|
|
if user_id is not None:
|
|
queryset = queryset.filter(user_id=int(user_id))
|
|
|
|
scanned = 0
|
|
indexed = 0
|
|
limit = max(1, int(options.get("limit") or 5000))
|
|
for event in queryset[:limit]:
|
|
scanned += 1
|
|
upsert_conversation_event(event)
|
|
indexed += 1
|
|
|
|
self.stdout.write(
|
|
self.style.SUCCESS(
|
|
"manticore-backfill scanned=%s indexed=%s user=%s"
|
|
% (scanned, indexed, user_id if user_id is not None else "-")
|
|
)
|
|
)
|