Spaces:
Runtime error
Runtime error
| """ | |
| Universal Fast Dubbing - FastAPI 主应用 | |
| 使用 FastAPI + Jinja2 模板替代 Gradio | |
| 提供更好的性能和控制能力 | |
| 新增功能: | |
| - WebSocket 流式处理支持 | |
| - SSE (Server-Sent Events) 流式处理(HF Spaces 推荐) | |
| - 分段并行处理 | |
| - 实时进度反馈 | |
| - 10秒内开始播放优化 | |
| """ | |
| import os | |
| import json | |
| import asyncio | |
| import base64 | |
| import time | |
| from pathlib import Path | |
| from contextlib import asynccontextmanager | |
| from fastapi import FastAPI, Request, Form, File, UploadFile, WebSocket, WebSocketDisconnect | |
| from fastapi.templating import Jinja2Templates | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse | |
| from typing import Optional, AsyncGenerator | |
| import sys | |
| # 将 backend 目录添加到 Python 路径 | |
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'backend')) | |
| # 导入后端模块 | |
| from modules.gateway import GradioAPIGateway, GatewayConfig | |
| from modules.groq_client import GroqConfig | |
| from modules.siliconflow_client import SiliconFlowConfig | |
| from modules.stream_processor import StreamProcessor, StreamConfig | |
| from modules.logging_config import setup_logging, get_component_logger, Component | |
| # 配置日志 | |
| setup_logging() | |
| logger = get_component_logger(Component.SYSTEM) | |
| from contextlib import asynccontextmanager | |
| # 全局实例 | |
| gateway: Optional[GradioAPIGateway] = None | |
| stream_processor: Optional[StreamProcessor] = None | |
| async def lifespan(app: FastAPI): | |
| """应用生命周期管理""" | |
| global gateway, stream_processor | |
| # 启动时初始化 | |
| logger.info("初始化 Universal Fast Dubbing 后端...") | |
| # 配置 Groq | |
| groq_api_key = os.getenv("GROQ_API_KEY") | |
| if groq_api_key: | |
| logger.info(f"[配置] ✓ GROQ_API_KEY 已配置 (长度: {len(groq_api_key)})") | |
| else: | |
| logger.warning("[配置] ⚠ GROQ_API_KEY 未配置,将使用 SiliconFlow ASR(无时间戳)") | |
| groq_config = GroqConfig( | |
| api_key=groq_api_key, | |
| asr_model=os.getenv("ASR_MODEL", "whisper-large-v3"), | |
| llm_model=os.getenv("LLM_MODEL", "llama3-8b-8192") | |
| ) if groq_api_key else None | |
| # 配置 SiliconFlow | |
| siliconflow_api_key = os.getenv("SILICONFLOW_API_KEY") | |
| if siliconflow_api_key: | |
| logger.info(f"[配置] ✓ SILICONFLOW_API_KEY 已配置 (长度: {len(siliconflow_api_key)})") | |
| else: | |
| logger.info("[配置] SILICONFLOW_API_KEY 未配置(可选)") | |
| siliconflow_config = SiliconFlowConfig( | |
| api_key=siliconflow_api_key, | |
| tts_model=os.getenv("SILICONFLOW_TTS_MODEL", "fishaudio/fish-speech-1.5") | |
| ) if siliconflow_api_key else None | |
| # 配置网关 | |
| gateway_config = GatewayConfig( | |
| temp_dir=os.getenv("TEMP_DIR", "temp/gateway"), | |
| cache_duration=int(os.getenv("CACHE_DURATION", "3600")), | |
| max_sessions=int(os.getenv("MAX_SESSIONS", "10")), | |
| use_low_quality_audio=os.getenv("USE_LOW_QUALITY_AUDIO", "true").lower() == "true" | |
| ) | |
| gateway = GradioAPIGateway(config=gateway_config, groq_config=groq_config) | |
| await gateway.initialize() | |
| # 初始化流式处理器 | |
| stream_config = StreamConfig( | |
| temp_dir=os.getenv("TEMP_DIR", "temp/stream"), | |
| max_segment_duration=float(os.getenv("MAX_SEGMENT_DURATION", "120")), | |
| first_segment_target=float(os.getenv("FIRST_SEGMENT_TARGET", "30")) | |
| ) | |
| stream_processor = StreamProcessor( | |
| config=stream_config, | |
| groq_config=groq_config, | |
| siliconflow_config=siliconflow_config | |
| ) | |
| await stream_processor.initialize() | |
| logger.info("后端初始化完成") | |
| yield # 应用运行期间 | |
| # 关闭时清理 | |
| if gateway: | |
| pass | |
| if stream_processor: | |
| stream_processor.cleanup() | |
| # 创建 FastAPI 应用 | |
| app = FastAPI( | |
| title="Universal Fast Dubbing", | |
| version="3.0.0", | |
| lifespan=lifespan # 使用新的生命周期管理 | |
| ) | |
| # 设置模板和静态文件目录 | |
| templates = Jinja2Templates(directory="templates") | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| async def home(request: Request): | |
| """主页面""" | |
| return templates.TemplateResponse("index.html", { | |
| "request": request, | |
| "title": "Universal Fast Dubbing", | |
| "version": "3.0.0" | |
| }) | |
| async def get_status(): | |
| """获取系统状态 API""" | |
| try: | |
| # 获取性能监控数据 | |
| from modules.performance_monitor import get_performance_monitor | |
| perf_monitor = get_performance_monitor() | |
| perf_stats = perf_monitor.get_statistics() | |
| health = perf_monitor.is_healthy() | |
| # 获取网关状态 | |
| gateway_status = {} | |
| if gateway and gateway.is_initialized: | |
| gateway_status = { | |
| "active_sessions": len(gateway.get_active_sessions()), | |
| "cache_stats": gateway.get_cache_stats() | |
| } | |
| return { | |
| "timestamp": "2025-12-21T00:00:00Z", | |
| "healthy": health["healthy"], | |
| "issues": health["issues"], | |
| "performance": { | |
| "memory_mb": perf_stats["current_memory_mb"], | |
| "cpu_percent": perf_stats["current_cpu_percent"], | |
| "success_rate": perf_stats["success_rate"], | |
| "total_operations": perf_stats["total_operations"] | |
| }, | |
| "gateway": gateway_status | |
| } | |
| except Exception as e: | |
| logger.error(f"获取系统状态失败: {e}") | |
| return {"error": str(e), "healthy": False} | |
| async def get_config(): | |
| """获取后端配置 API""" | |
| try: | |
| return { | |
| "api_provider": os.getenv("API_PROVIDER", "auto"), | |
| "has_groq_key": bool(os.getenv("GROQ_API_KEY")), | |
| "has_siliconflow_key": bool(os.getenv("SILICONFLOW_API_KEY")), | |
| "tts_provider": os.getenv("TTS_PROVIDER", "edge-tts"), | |
| "siliconflow_tts_model": os.getenv("SILICONFLOW_TTS_MODEL", "fishaudio/fish-speech-1.5"), | |
| "use_low_quality_audio": os.getenv("USE_LOW_QUALITY_AUDIO", "true").lower() == "true", | |
| "max_concurrent_workers": int(os.getenv("MAX_CONCURRENT_WORKERS", "3")), | |
| "cache_duration": int(os.getenv("CACHE_DURATION", "3600")), | |
| "asr_model": os.getenv("ASR_MODEL", "whisper-large-v3"), | |
| "llm_model": os.getenv("LLM_MODEL", "llama3-8b-8192"), | |
| "version": "3.0.0", | |
| "debug": os.getenv("DEBUG", "false").lower() == "true", | |
| } | |
| except Exception as e: | |
| logger.error(f"获取后端配置失败: {e}") | |
| return {"error": str(e)} | |
| async def ping(client_config: str = Form(default="{}")): | |
| """统一连接测试 API""" | |
| try: | |
| # 解析客户端配置 | |
| ext_config = {} | |
| if client_config and client_config != "{}": | |
| try: | |
| ext_config = json.loads(client_config) | |
| logger.info(f"收到客户端配置: {ext_config}") | |
| except json.JSONDecodeError as e: | |
| return { | |
| "success": False, | |
| "error": f"客户端配置解析失败: {e}", | |
| "message": "配置格式错误" | |
| } | |
| # 获取后端配置 | |
| backend_config = await get_config() | |
| return { | |
| "success": True, | |
| "message": "连接成功", | |
| "backend_config": backend_config, | |
| "client_config_received": ext_config if ext_config else None | |
| } | |
| except Exception as e: | |
| logger.error(f"ping 失败: {e}") | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "message": "连接失败" | |
| } | |
| async def save_client_config(client_config: str = Form(...)): | |
| """保存客户端配置 API""" | |
| try: | |
| # 解析客户端配置 | |
| config_data = {} | |
| if client_config and client_config != "{}": | |
| try: | |
| config_data = json.loads(client_config) | |
| logger.info(f"保存客户端配置: {config_data}") | |
| except json.JSONDecodeError as e: | |
| return { | |
| "success": False, | |
| "error": f"配置解析失败: {e}", | |
| "message": "配置格式错误" | |
| } | |
| # 这里可以将配置保存到文件或数据库 | |
| # 目前只是记录日志,实际项目中可以持久化存储 | |
| logger.info(f"[配置保存] 客户端配置已接收并记录: {config_data}") | |
| # 如果有网关实例,可以将配置传递给网关 | |
| if gateway and gateway.is_initialized: | |
| # 将客户端配置传递给网关,用于后续处理 | |
| gateway.update_client_config(config_data) | |
| return { | |
| "success": True, | |
| "message": "配置保存成功", | |
| "saved_config": config_data | |
| } | |
| except Exception as e: | |
| logger.error(f"保存配置失败: {e}") | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "message": "配置保存失败" | |
| } | |
| async def process_dubbing( | |
| mode: str = Form(...), | |
| url: Optional[str] = Form(None), | |
| audio_file: Optional[UploadFile] = File(None), | |
| client_config: str = Form(default="{}") | |
| ): | |
| """配音处理 API""" | |
| try: | |
| if not gateway: | |
| return {"success": False, "error": "后端未初始化"} | |
| # 解析客户端配置 | |
| ext_config = {} | |
| if client_config and client_config != "{}": | |
| try: | |
| ext_config = json.loads(client_config) | |
| except json.JSONDecodeError: | |
| pass | |
| # 准备处理数据 | |
| data = {"client_config": ext_config} | |
| if mode == "url" and url: | |
| data["url"] = url | |
| actual_mode = "url" | |
| elif mode == "record" and audio_file: | |
| # 保存上传的音频文件 | |
| audio_content = await audio_file.read() | |
| data["audio_data"] = audio_content | |
| actual_mode = "record" | |
| else: | |
| return {"success": False, "error": "无效的输入参数"} | |
| # 处理请求 | |
| result = None | |
| async for update in gateway.process_request(actual_mode, data): | |
| if update.get("state") == "completed": | |
| result = update.get("result", {}) | |
| break | |
| elif update.get("state") == "failed": | |
| return {"success": False, "error": update.get("message", "处理失败")} | |
| if result: | |
| return { | |
| "success": True, | |
| "audio_url": result.get("audio_url"), | |
| "processing_time": result.get("processing_time", 0), | |
| "segments_processed": result.get("segments_processed", 0) | |
| } | |
| else: | |
| return {"success": False, "error": "处理未返回结果"} | |
| except Exception as e: | |
| logger.error(f"配音处理失败: {e}") | |
| return {"success": False, "error": str(e)} | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |
| async def get_capabilities(): | |
| """ | |
| 获取后端能力信息 | |
| 用于客户端判断支持的功能 | |
| """ | |
| return { | |
| "websocket_supported": True, | |
| "sse_supported": True, # SSE 在 HF Spaces 上更稳定 | |
| "segmented_processing": True, | |
| "range_download": True, # 支持分段下载 | |
| "parallel_tts": True, | |
| "max_segment_duration": 120, # 每段最大2分钟 | |
| "first_segment_target": 10, # 目标10秒内开始播放 | |
| "supported_modes": ["url", "record"], | |
| "version": "3.1.0" | |
| } | |
| async def process_stream_sse( | |
| mode: str = Form(...), | |
| url: Optional[str] = Form(None), | |
| audio: Optional[UploadFile] = File(None), | |
| segment_duration: int = Form(default=120), | |
| client_config: str = Form(default="{}") | |
| ): | |
| """ | |
| SSE 流式处理端点(推荐用于 HF Spaces) | |
| 比 WebSocket 更稳定,HF Spaces 代理对 SSE 支持更好。 | |
| 支持两种输入方式: | |
| 1. URL模式:提供视频URL | |
| 2. 上传模式:上传音频文件(来自扩展端拦截或录制) | |
| 参数: | |
| mode: 处理模式 (url/upload) | |
| url: 视频URL(URL模式) | |
| audio: 音频文件(上传模式) | |
| segment_duration: 每段时长(秒),默认120秒 | |
| client_config: 客户端配置JSON | |
| 返回: | |
| SSE 事件流,包含: | |
| - progress: 进度更新 | |
| - segment_ready: 分段完成 | |
| - complete: 处理完成 | |
| - error: 错误信息 | |
| """ | |
| async def event_generator() -> AsyncGenerator[str, None]: | |
| """SSE 事件生成器""" | |
| try: | |
| if not gateway or not gateway.is_initialized: | |
| yield f"data: {json.dumps({'type': 'error', 'message': '后端未初始化'})}\n\n" | |
| return | |
| # 解析客户端配置 | |
| ext_config = {} | |
| if client_config and client_config != "{}": | |
| try: | |
| ext_config = json.loads(client_config) | |
| except json.JSONDecodeError: | |
| pass | |
| # 准备处理数据 | |
| process_data = { | |
| "client_config": ext_config, | |
| "segment_duration": segment_duration | |
| } | |
| # 根据模式准备数据 | |
| actual_mode = mode | |
| if mode == "url" and url: | |
| process_data["url"] = url | |
| logger.info(f"[SSE] URL模式处理: {url[:50]}..., 分段时长: {segment_duration}秒") | |
| elif mode == "upload" and audio: | |
| # 读取上传的音频 | |
| audio_content = await audio.read() | |
| process_data["audio_data"] = audio_content | |
| actual_mode = "upload" | |
| logger.info(f"[SSE] 上传模式处理: {len(audio_content)} bytes, 分段时长: {segment_duration}秒") | |
| else: | |
| yield f"data: {json.dumps({'type': 'error', 'message': '缺少视频URL或音频文件'})}\n\n" | |
| return | |
| # 发送初始进度 | |
| yield f"data: {json.dumps({'type': 'progress', 'progress': 5, 'message': '正在分析音频...'})}\n\n" | |
| # 使用分段处理模式 | |
| segment_index = 0 | |
| total_duration = 0 | |
| async for update in gateway.process_request_segmented(actual_mode, process_data): | |
| update_type = update.get("type", "progress") | |
| if update_type == "progress": | |
| yield f"data: {json.dumps({'type': 'progress', 'progress': update.get('progress', 0), 'message': update.get('message', '')})}\n\n" | |
| elif update_type == "segment_ready": | |
| segment_data = update.get("segment", {}) | |
| audio_data = segment_data.get("audio_data") | |
| # 二进制数据转base64 | |
| if isinstance(audio_data, bytes): | |
| audio_data = base64.b64encode(audio_data).decode('utf-8') | |
| yield f"data: {json.dumps({'type': 'segment_ready', 'index': segment_index, 'startTime': segment_data.get('start_time', total_duration), 'duration': segment_data.get('duration', segment_duration), 'audioData': audio_data})}\n\n" | |
| total_duration += segment_data.get("duration", segment_duration) | |
| segment_index += 1 | |
| elif update_type == "complete": | |
| yield f"data: {json.dumps({'type': 'complete', 'totalSegments': segment_index, 'totalDuration': total_duration, 'processingTime': update.get('processing_time', 0)})}\n\n" | |
| break | |
| elif update_type == "error": | |
| yield f"data: {json.dumps({'type': 'error', 'message': update.get('message', '处理失败')})}\n\n" | |
| break | |
| logger.info(f"[SSE] 处理完成: {segment_index}段, 总时长: {total_duration}秒") | |
| except Exception as e: | |
| logger.error(f"[SSE] 处理错误: {e}") | |
| yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n" | |
| return StreamingResponse( | |
| event_generator(), | |
| media_type="text/event-stream", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive", | |
| "X-Accel-Buffering": "no", # 禁用Nginx缓冲 | |
| } | |
| ) | |
| async def quick_start_dubbing( | |
| audio: UploadFile = File(...), | |
| client_config: str = Form(default="{}") | |
| ): | |
| """ | |
| 快速启动配音 API(SSE流式返回) | |
| 接收扩展端上传的音频(可以是拦截下载的或录制的), | |
| 使用SSE流式返回处理进度和分段音频。 | |
| 参数: | |
| audio: 音频文件 | |
| client_config: 客户端配置JSON | |
| 返回: | |
| SSE 事件流,包含: | |
| - progress: 进度更新 | |
| - segment_ready: 分段完成(第一段完成即可开始播放) | |
| - complete: 处理完成 | |
| - error: 错误信息 | |
| """ | |
| async def event_generator() -> AsyncGenerator[str, None]: | |
| """SSE 事件生成器 - 使用新的流式处理器""" | |
| try: | |
| if not stream_processor or not stream_processor.is_initialized: | |
| yield f"data: {json.dumps({'type': 'error', 'message': '流式处理器未初始化'})}\n\n" | |
| return | |
| # 解析客户端配置 | |
| ext_config = {} | |
| if client_config and client_config != "{}": | |
| try: | |
| ext_config = json.loads(client_config) | |
| except json.JSONDecodeError: | |
| pass | |
| logger.info(f"[快速启动] 收到音频文件: {audio.filename}") | |
| # 读取音频数据并保存到临时文件 | |
| audio_content = await audio.read() | |
| temp_audio_path = f"temp/stream/upload_{os.urandom(4).hex()}.wav" | |
| os.makedirs(os.path.dirname(temp_audio_path), exist_ok=True) | |
| with open(temp_audio_path, 'wb') as f: | |
| f.write(audio_content) | |
| # 发送初始进度 | |
| yield f"data: {json.dumps({'type': 'progress', 'progress': 5, 'message': '音频上传完成,开始处理...'})}\n\n" | |
| # 使用流式处理器处理 | |
| async for update in stream_processor.process_stream(temp_audio_path, ext_config): | |
| yield f"data: {json.dumps(update)}\n\n" | |
| # 清理临时文件 | |
| try: | |
| if os.path.exists(temp_audio_path): | |
| os.remove(temp_audio_path) | |
| except: | |
| pass | |
| except Exception as e: | |
| logger.error(f"[快速启动] 错误: {e}") | |
| yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n" | |
| return StreamingResponse( | |
| event_generator(), | |
| media_type="text/event-stream", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive", | |
| "X-Accel-Buffering": "no", | |
| } | |
| ) | |
| # 存储待处理的任务(task_id -> 文件路径和配置) | |
| pending_tasks: dict = {} | |
| async def upload_audio( | |
| audio: UploadFile = File(...), | |
| client_config: str = Form(default="{}") | |
| ): | |
| """ | |
| 上传音频文件 API(只上传,不处理) | |
| 返回 task_id,前端通过 /api/process_task/{task_id} 获取 SSE 处理进度 | |
| 参数: | |
| audio: 音频文件 | |
| client_config: 客户端配置 JSON | |
| 返回: | |
| JSON: { success: true, task_id: "xxx", message: "文件已上传" } | |
| """ | |
| try: | |
| # 解析配置 | |
| ext_config = {} | |
| if client_config and client_config != "{}": | |
| try: | |
| ext_config = json.loads(client_config) | |
| except json.JSONDecodeError as e: | |
| logger.warning(f"[上传] 配置解析失败: {e}") | |
| # 生成任务 ID | |
| task_id = os.urandom(8).hex() | |
| # 保存文件 | |
| filename = audio.filename or "unknown" | |
| file_ext = os.path.splitext(filename)[1] or ".mp3" | |
| temp_audio_path = f"temp/tasks/{task_id}{file_ext}" | |
| os.makedirs(os.path.dirname(temp_audio_path), exist_ok=True) | |
| audio_content = await audio.read() | |
| file_size_mb = len(audio_content) / (1024 * 1024) | |
| with open(temp_audio_path, 'wb') as f: | |
| f.write(audio_content) | |
| # 存储任务信息 | |
| pending_tasks[task_id] = { | |
| "file_path": temp_audio_path, | |
| "config": ext_config, | |
| "filename": filename, | |
| "file_size": len(audio_content), | |
| "created_at": time.time() | |
| } | |
| logger.info(f"[上传] 文件已保存: {temp_audio_path}, 大小: {file_size_mb:.2f}MB, task_id: {task_id}") | |
| return { | |
| "success": True, | |
| "task_id": task_id, | |
| "file_name": filename, | |
| "file_size": len(audio_content), | |
| "message": f"文件已上传 ({file_size_mb:.1f}MB)" | |
| } | |
| except Exception as e: | |
| logger.error(f"[上传] 错误: {e}", exc_info=True) | |
| return {"success": False, "error": str(e)} | |
| async def process_task(task_id: str): | |
| """ | |
| 处理已上传的任务(SSE 流式响应) | |
| 前端通过 EventSource 连接此端点获取处理进度 | |
| 参数: | |
| task_id: 任务 ID(从 /api/upload_audio 获取) | |
| 返回: | |
| SSE 事件流 | |
| """ | |
| async def event_generator() -> AsyncGenerator[str, None]: | |
| temp_audio_path = None | |
| try: | |
| # 检查任务是否存在 | |
| if task_id not in pending_tasks: | |
| yield f"data: {json.dumps({'type': 'error', 'message': '任务不存在或已过期'})}\n\n" | |
| return | |
| task_info = pending_tasks.pop(task_id) # 取出并删除任务 | |
| temp_audio_path = task_info["file_path"] | |
| ext_config = task_info["config"] | |
| if not os.path.exists(temp_audio_path): | |
| yield f"data: {json.dumps({'type': 'error', 'message': '音频文件不存在'})}\n\n" | |
| return | |
| if not stream_processor or not stream_processor.is_initialized: | |
| yield f"data: {json.dumps({'type': 'error', 'message': '流式处理器未初始化'})}\n\n" | |
| return | |
| file_size_mb = os.path.getsize(temp_audio_path) / (1024 * 1024) | |
| logger.info(f"[处理任务] 开始处理: {task_id}, 文件: {temp_audio_path}") | |
| logger.info(f"[处理任务] 流式处理器状态: initialized={stream_processor.is_initialized}, groq_client={stream_processor.groq_client is not None}") | |
| # 发送开始处理信号 | |
| yield f"data: {json.dumps({'type': 'progress', 'progress': 5, 'message': f'开始处理音频 ({file_size_mb:.1f}MB)...'})}\n\n" | |
| # 使用流式处理器处理 | |
| logger.info(f"[处理任务] 准备调用 stream_processor.process_stream") | |
| try: | |
| async for update in stream_processor.process_stream(temp_audio_path, ext_config): | |
| logger.debug(f"[处理任务] SSE 事件: {update.get('type', 'unknown')}") | |
| yield f"data: {json.dumps(update)}\n\n" | |
| logger.info(f"[处理任务] stream_processor.process_stream 完成") | |
| except Exception as stream_error: | |
| logger.error(f"[处理任务] stream_processor.process_stream 异常: {stream_error}", exc_info=True) | |
| yield f"data: {json.dumps({'type': 'error', 'message': f'流式处理异常: {str(stream_error)}'})}\n\n" | |
| except Exception as e: | |
| logger.error(f"[处理任务] 错误: {e}", exc_info=True) | |
| yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n" | |
| finally: | |
| # 清理临时文件 | |
| if temp_audio_path: | |
| try: | |
| if os.path.exists(temp_audio_path): | |
| os.remove(temp_audio_path) | |
| except Exception as e: | |
| logger.warning(f"[处理任务] 清理临时文件失败: {e}") | |
| return StreamingResponse( | |
| event_generator(), | |
| media_type="text/event-stream", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive", | |
| "X-Accel-Buffering": "no", | |
| } | |
| ) | |
| async def upload_and_process( | |
| audio: UploadFile = File(...), | |
| client_config: str = Form(default="{}") | |
| ): | |
| """ | |
| 上传音频并流式处理 API | |
| 完整流程: | |
| 扩展上传音频 → 语音识别 (Whisper V3 带时间戳) → 带时间戳分段异步处理 | |
| → 翻译 + 角色识别 (Llama 3) → 语音合成 (Edge-TTS / SiliconFlow) | |
| → 音频同步对齐 (时间戳匹配) → 配音音频分段输出 | |
| 参数: | |
| audio: 音频文件(来自扩展端下载或录制) | |
| client_config: 客户端配置JSON,包含: | |
| - asrProvider: ASR提供商 (groq/siliconflow) | |
| - ttsProvider: TTS提供商 (edge-tts/siliconflow) | |
| - voiceMale: 男声语音模型 | |
| - voiceFemale: 女声语音模型 | |
| - syncOffset: 同步偏移量(毫秒) | |
| 返回: | |
| SSE 事件流 | |
| """ | |
| async def event_generator() -> AsyncGenerator[str, None]: | |
| """SSE 事件生成器""" | |
| temp_audio_path = None | |
| try: | |
| if not stream_processor or not stream_processor.is_initialized: | |
| yield f"data: {json.dumps({'type': 'error', 'message': '流式处理器未初始化'})}\n\n" | |
| return | |
| # 解析客户端配置 | |
| ext_config = {} | |
| if client_config and client_config != "{}": | |
| try: | |
| ext_config = json.loads(client_config) | |
| logger.info(f"[上传处理] 客户端配置: {list(ext_config.keys())}") | |
| except json.JSONDecodeError as e: | |
| logger.warning(f"[上传处理] 配置解析失败: {e}") | |
| # 获取文件信息 | |
| filename = audio.filename or "unknown" | |
| content_type = audio.content_type or "audio/mpeg" | |
| logger.info(f"[上传处理] 收到文件: {filename}, 类型: {content_type}") | |
| # 保存上传的音频到临时文件 | |
| file_ext = os.path.splitext(filename)[1] or ".mp3" | |
| temp_audio_path = f"temp/stream/upload_{os.urandom(4).hex()}{file_ext}" | |
| os.makedirs(os.path.dirname(temp_audio_path), exist_ok=True) | |
| audio_content = await audio.read() | |
| file_size_mb = len(audio_content) / (1024 * 1024) | |
| with open(temp_audio_path, 'wb') as f: | |
| f.write(audio_content) | |
| logger.info(f"[上传处理] 音频已保存: {temp_audio_path}, 大小: {file_size_mb:.2f}MB") | |
| # 发送初始进度 | |
| yield f"data: {json.dumps({'type': 'progress', 'progress': 5, 'message': f'音频上传完成 ({file_size_mb:.1f}MB),开始处理...'})}\n\n" | |
| # 使用流式处理器处理 | |
| async for update in stream_processor.process_stream(temp_audio_path, ext_config): | |
| yield f"data: {json.dumps(update)}\n\n" | |
| except Exception as e: | |
| logger.error(f"[上传处理] 错误: {e}", exc_info=True) | |
| yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n" | |
| finally: | |
| # 清理临时文件 | |
| if temp_audio_path: | |
| try: | |
| if os.path.exists(temp_audio_path): | |
| os.remove(temp_audio_path) | |
| except Exception as e: | |
| logger.warning(f"[上传处理] 清理临时文件失败: {e}") | |
| return StreamingResponse( | |
| event_generator(), | |
| media_type="text/event-stream", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive", | |
| "X-Accel-Buffering": "no", | |
| } | |
| ) | |
| async def websocket_process(websocket: WebSocket): | |
| """ | |
| WebSocket 流式处理端点 | |
| 支持实时双向通信,分段处理和流式返回 | |
| 消息格式: | |
| - 客户端发送: {"type": "start_process", "url": "...", "config": {...}} | |
| - 服务端发送: {"type": "progress|segment_ready|complete|error", ...} | |
| """ | |
| await websocket.accept() | |
| logger.info("[WebSocket] 新连接建立") | |
| try: | |
| # 接收处理请求 | |
| data = await websocket.receive_json() | |
| if data.get("type") != "start_process": | |
| await websocket.send_json({ | |
| "type": "error", | |
| "message": "无效的请求类型" | |
| }) | |
| return | |
| url = data.get("url") | |
| config = data.get("config", {}) | |
| segment_duration = config.get("segmentDuration", 120) # 默认2分钟 | |
| if not url: | |
| await websocket.send_json({ | |
| "type": "error", | |
| "message": "缺少视频URL" | |
| }) | |
| return | |
| logger.info(f"[WebSocket] 开始处理: {url}, 分段时长: {segment_duration}秒") | |
| # 发送初始进度 | |
| await websocket.send_json({ | |
| "type": "progress", | |
| "progress": 5, | |
| "message": "正在分析视频..." | |
| }) | |
| # 检查网关是否初始化 | |
| if not gateway or not gateway.is_initialized: | |
| await websocket.send_json({ | |
| "type": "error", | |
| "message": "后端未初始化" | |
| }) | |
| return | |
| # 准备处理数据 | |
| process_data = { | |
| "url": url, | |
| "client_config": config, | |
| "segmented": True, | |
| "segment_duration": segment_duration | |
| } | |
| # 使用分段处理模式 | |
| segment_index = 0 | |
| total_duration = 0 | |
| async for update in gateway.process_request_segmented("url", process_data): | |
| update_type = update.get("type", "progress") | |
| if update_type == "progress": | |
| # 进度更新 | |
| await websocket.send_json({ | |
| "type": "progress", | |
| "progress": update.get("progress", 0), | |
| "message": update.get("message", "") | |
| }) | |
| elif update_type == "segment_ready": | |
| # 某段处理完成 | |
| segment_data = update.get("segment", {}) | |
| audio_data = segment_data.get("audio_data") | |
| # 如果是二进制数据,转换为base64 | |
| if isinstance(audio_data, bytes): | |
| audio_data = base64.b64encode(audio_data).decode('utf-8') | |
| await websocket.send_json({ | |
| "type": "segment_ready", | |
| "index": segment_index, | |
| "startTime": segment_data.get("start_time", total_duration), | |
| "duration": segment_data.get("duration", segment_duration), | |
| "audioData": audio_data | |
| }) | |
| total_duration += segment_data.get("duration", segment_duration) | |
| segment_index += 1 | |
| elif update_type == "complete": | |
| # 处理完成 | |
| await websocket.send_json({ | |
| "type": "complete", | |
| "totalSegments": segment_index, | |
| "totalDuration": total_duration, | |
| "processingTime": update.get("processing_time", 0) | |
| }) | |
| break | |
| elif update_type == "error": | |
| # 处理错误 | |
| await websocket.send_json({ | |
| "type": "error", | |
| "message": update.get("message", "处理失败") | |
| }) | |
| break | |
| logger.info(f"[WebSocket] 处理完成: {segment_index}段, 总时长: {total_duration}秒") | |
| except WebSocketDisconnect: | |
| logger.info("[WebSocket] 客户端断开连接") | |
| except Exception as e: | |
| logger.error(f"[WebSocket] 处理错误: {e}") | |
| try: | |
| await websocket.send_json({ | |
| "type": "error", | |
| "message": str(e) | |
| }) | |
| except: | |
| pass | |
| finally: | |
| try: | |
| await websocket.close() | |
| except: | |
| pass | |
| async def process_segment( | |
| mode: str = Form(...), | |
| url: Optional[str] = Form(None), | |
| start_time: int = Form(default=0), | |
| duration: int = Form(default=120), | |
| segment_index: int = Form(default=0), | |
| client_config: str = Form(default="{}") | |
| ): | |
| """ | |
| 分段处理 API | |
| 处理视频的指定时间段 | |
| 参数: | |
| mode: 处理模式 (url/record) | |
| url: 视频URL | |
| start_time: 起始时间(秒) | |
| duration: 处理时长(秒) | |
| segment_index: 段落索引 | |
| client_config: 客户端配置JSON | |
| 返回: | |
| success: 是否成功 | |
| audio_data: base64编码的音频数据 | |
| actual_duration: 实际处理时长 | |
| is_last_segment: 是否是最后一段 | |
| """ | |
| try: | |
| if not gateway or not gateway.is_initialized: | |
| return {"success": False, "error": "后端未初始化"} | |
| # 解析客户端配置 | |
| ext_config = {} | |
| if client_config and client_config != "{}": | |
| try: | |
| ext_config = json.loads(client_config) | |
| except json.JSONDecodeError: | |
| pass | |
| if not url: | |
| return {"success": False, "error": "缺少视频URL"} | |
| logger.info(f"[分段处理] 段落{segment_index}: {start_time}s - {start_time + duration}s") | |
| # 准备处理数据 | |
| process_data = { | |
| "url": url, | |
| "client_config": ext_config, | |
| "start_time": start_time, | |
| "duration": duration, | |
| "segment_index": segment_index | |
| } | |
| # 调用网关处理单个分段 | |
| result = await gateway.process_single_segment("url", process_data) | |
| if result.get("no_more_segments"): | |
| return { | |
| "success": True, | |
| "no_more_segments": True, | |
| "message": "视频已处理完毕" | |
| } | |
| if not result.get("success"): | |
| return { | |
| "success": False, | |
| "error": result.get("error", "分段处理失败") | |
| } | |
| # 获取音频数据 | |
| audio_data = result.get("audio_data") | |
| if isinstance(audio_data, bytes): | |
| audio_data = base64.b64encode(audio_data).decode('utf-8') | |
| return { | |
| "success": True, | |
| "audio_data": audio_data, | |
| "actual_duration": result.get("actual_duration", duration), | |
| "is_last_segment": result.get("is_last_segment", False), | |
| "segment_index": segment_index | |
| } | |
| except Exception as e: | |
| logger.error(f"[分段处理] 错误: {e}") | |
| return {"success": False, "error": str(e)} | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |