Files
GIA/core/management/commands/processing.py
2025-05-09 21:01:22 +00:00

63 lines
2.0 KiB
Python

import msgpack
from django.core.management.base import BaseCommand
from django.conf import settings
from signalbot import SignalBot, Command, Context
from asgiref.sync import sync_to_async
from django.urls import reverse
import json
import asyncio
from core.util import logs
from core.lib.prompts.functions import truncate_and_summarize, messages_to_string, delete_messages
from core.lib import deferred
from core.messaging import replies, ai, natural, history, utils
from core.models import Chat, Manipulation, PersonIdentifier, ChatSession, Message, QueuedMessage
import aiohttp
from django.utils import timezone
from django.conf import settings
from core.clients.signal import NewSignalBot
from redis import asyncio as aioredis
SIGNAL_URL = "signal:8080"
log = logs.get_logger("processing")
redis = aioredis.from_url("unix://var/run/gia-redis.sock", db=10)
async def stream():
pubsub = redis.pubsub()
await pubsub.subscribe("processing")
while True:
message = await pubsub.get_message(ignore_subscribe_messages=True)
if message is not None:
try:
data = message["data"]
unpacked = msgpack.unpackb(data, raw=False)
except TypeError:
continue
if "type" in unpacked.keys():
if unpacked["type"] == "def":
await deferred.process_deferred(unpacked)
await asyncio.sleep(0.01)
class Command(BaseCommand):
def handle(self, *args, **options):
bot = NewSignalBot({
"signal_service": SIGNAL_URL,
"phone_number": "+447490296227",
})
bot.register(HandleMessage())
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
bot._event_loop = loop
loop.create_task(stream())
bot.start()
try:
loop.run_forever()
except (KeyboardInterrupt, SystemExit):
log.info("Process terminating")
finally:
loop.close()