from __future__ import annotations import hashlib import time from asgiref.sync import sync_to_async from core.clients import transport from core.messaging import ai as ai_runner from core.messaging.reply_sync import apply_sync_origin, is_mirrored_origin from core.models import AI, Message, TranslationBridge, TranslationEventLog from core.util import logs log = logs.get_logger("translation_engine") def _direction_allowed(bridge: TranslationBridge, source_side: str) -> bool: if bridge.direction == "bidirectional": return True if source_side == "a" and bridge.direction == "a_to_b": return True if source_side == "b" and bridge.direction == "b_to_a": return True return False def _target_for_side(bridge: TranslationBridge, source_side: str): if source_side == "a": return ("b", bridge.b_service, bridge.b_channel_identifier, bridge.b_language) return ("a", bridge.a_service, bridge.a_channel_identifier, bridge.a_language) def _source_language(bridge: TranslationBridge, source_side: str): return bridge.a_language if source_side == "a" else bridge.b_language async def _translate_text(user, text: str, source_lang: str, target_lang: str) -> str: ai_obj = await sync_to_async(lambda: AI.objects.filter(user=user).first())() if ai_obj is None: return text prompt = [ { "role": "system", "content": ( "Translate the user text exactly for meaning and tone. " "Do not add commentary. Return only translated text." ), }, { "role": "user", "content": ( f"Source language: {source_lang}\n" f"Target language: {target_lang}\n" f"Text:\n{text}" ), }, ] return str( await ai_runner.run_prompt(prompt, ai_obj, operation="translation") or "" ).strip() async def process_inbound_translation(message: Message): if message is None or not str(message.text or "").strip(): return if is_mirrored_origin(message.message_meta): return source_service = str(message.source_service or "").strip().lower() source_channel = str(message.source_chat_id or "").strip() if not source_service or not source_channel: return bridges = await sync_to_async(list)( TranslationBridge.objects.filter( user=message.user, enabled=True, ) ) for bridge in bridges: side = None if ( bridge.a_service == source_service and str(bridge.a_channel_identifier or "").strip() == source_channel ): side = "a" elif ( bridge.b_service == source_service and str(bridge.b_channel_identifier or "").strip() == source_channel ): side = "b" if side is None or not _direction_allowed(bridge, side): continue _, target_service, target_channel, target_lang = _target_for_side(bridge, side) source_lang = _source_language(bridge, side) origin_tag = f"translation:{bridge.id}:{message.id}" content_hash = hashlib.sha1( f"{source_service}|{source_channel}|{message.text}".encode("utf-8") ).hexdigest() log_row = await sync_to_async(TranslationEventLog.objects.create)( bridge=bridge, source_message=message, target_service=target_service, target_channel=target_channel, status="pending", origin_tag=origin_tag, content_hash=content_hash, ) try: translated = await _translate_text( message.user, str(message.text or ""), source_lang=source_lang, target_lang=target_lang, ) if target_service != "web": await transport.send_message_raw( target_service, target_channel, text=translated, attachments=[], metadata={"origin_tag": origin_tag}, ) log_row.status = "ok" log_row.error = "" except Exception as exc: log_row.status = "failed" log_row.error = str(exc) log.warning("translation forward failed bridge=%s: %s", bridge.id, exc) await sync_to_async(log_row.save)(update_fields=["status", "error", "updated_at"]) def apply_translation_origin(meta: dict | None, origin_tag: str) -> dict: return apply_sync_origin(meta, origin_tag) def parse_quick_mode_title(raw_title: str) -> dict: title = str(raw_title or "").strip() parts = [part.strip() for part in title.split("|") if part.strip()] if len(parts) < 2: return {} return { "a_language": parts[0].lower(), "b_language": parts[1].lower(), }