cytopa99's picture
Upload 47 files
68e5689 verified
"""
Universal Fast Dubbing - FastAPI 主应用
使用 FastAPI + Jinja2 模板替代 Gradio
提供更好的性能和控制能力
新增功能:
- WebSocket 流式处理支持
- SSE (Server-Sent Events) 流式处理(HF Spaces 推荐)
- 分段并行处理
- 实时进度反馈
- 10秒内开始播放优化
"""
import os
import json
import asyncio
import base64
import time
from pathlib import Path
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, Form, File, UploadFile, WebSocket, WebSocketDisconnect
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
from typing import Optional, AsyncGenerator
import sys
# 将 backend 目录添加到 Python 路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'backend'))
# 导入后端模块
from modules.gateway import GradioAPIGateway, GatewayConfig
from modules.groq_client import GroqConfig
from modules.siliconflow_client import SiliconFlowConfig
from modules.stream_processor import StreamProcessor, StreamConfig
from modules.logging_config import setup_logging, get_component_logger, Component
# 配置日志
setup_logging()
logger = get_component_logger(Component.SYSTEM)
from contextlib import asynccontextmanager
# 全局实例
gateway: Optional[GradioAPIGateway] = None
stream_processor: Optional[StreamProcessor] = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
global gateway, stream_processor
# 启动时初始化
logger.info("初始化 Universal Fast Dubbing 后端...")
# 配置 Groq
groq_api_key = os.getenv("GROQ_API_KEY")
if groq_api_key:
logger.info(f"[配置] ✓ GROQ_API_KEY 已配置 (长度: {len(groq_api_key)})")
else:
logger.warning("[配置] ⚠ GROQ_API_KEY 未配置,将使用 SiliconFlow ASR(无时间戳)")
groq_config = GroqConfig(
api_key=groq_api_key,
asr_model=os.getenv("ASR_MODEL", "whisper-large-v3"),
llm_model=os.getenv("LLM_MODEL", "llama3-8b-8192")
) if groq_api_key else None
# 配置 SiliconFlow
siliconflow_api_key = os.getenv("SILICONFLOW_API_KEY")
if siliconflow_api_key:
logger.info(f"[配置] ✓ SILICONFLOW_API_KEY 已配置 (长度: {len(siliconflow_api_key)})")
else:
logger.info("[配置] SILICONFLOW_API_KEY 未配置(可选)")
siliconflow_config = SiliconFlowConfig(
api_key=siliconflow_api_key,
tts_model=os.getenv("SILICONFLOW_TTS_MODEL", "fishaudio/fish-speech-1.5")
) if siliconflow_api_key else None
# 配置网关
gateway_config = GatewayConfig(
temp_dir=os.getenv("TEMP_DIR", "temp/gateway"),
cache_duration=int(os.getenv("CACHE_DURATION", "3600")),
max_sessions=int(os.getenv("MAX_SESSIONS", "10")),
use_low_quality_audio=os.getenv("USE_LOW_QUALITY_AUDIO", "true").lower() == "true"
)
gateway = GradioAPIGateway(config=gateway_config, groq_config=groq_config)
await gateway.initialize()
# 初始化流式处理器
stream_config = StreamConfig(
temp_dir=os.getenv("TEMP_DIR", "temp/stream"),
max_segment_duration=float(os.getenv("MAX_SEGMENT_DURATION", "120")),
first_segment_target=float(os.getenv("FIRST_SEGMENT_TARGET", "30"))
)
stream_processor = StreamProcessor(
config=stream_config,
groq_config=groq_config,
siliconflow_config=siliconflow_config
)
await stream_processor.initialize()
logger.info("后端初始化完成")
yield # 应用运行期间
# 关闭时清理
if gateway:
pass
if stream_processor:
stream_processor.cleanup()
# 创建 FastAPI 应用
app = FastAPI(
title="Universal Fast Dubbing",
version="3.0.0",
lifespan=lifespan # 使用新的生命周期管理
)
# 设置模板和静态文件目录
templates = Jinja2Templates(directory="templates")
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.get("/", response_class=HTMLResponse)
async def home(request: Request):
"""主页面"""
return templates.TemplateResponse("index.html", {
"request": request,
"title": "Universal Fast Dubbing",
"version": "3.0.0"
})
@app.get("/api/status")
async def get_status():
"""获取系统状态 API"""
try:
# 获取性能监控数据
from modules.performance_monitor import get_performance_monitor
perf_monitor = get_performance_monitor()
perf_stats = perf_monitor.get_statistics()
health = perf_monitor.is_healthy()
# 获取网关状态
gateway_status = {}
if gateway and gateway.is_initialized:
gateway_status = {
"active_sessions": len(gateway.get_active_sessions()),
"cache_stats": gateway.get_cache_stats()
}
return {
"timestamp": "2025-12-21T00:00:00Z",
"healthy": health["healthy"],
"issues": health["issues"],
"performance": {
"memory_mb": perf_stats["current_memory_mb"],
"cpu_percent": perf_stats["current_cpu_percent"],
"success_rate": perf_stats["success_rate"],
"total_operations": perf_stats["total_operations"]
},
"gateway": gateway_status
}
except Exception as e:
logger.error(f"获取系统状态失败: {e}")
return {"error": str(e), "healthy": False}
@app.get("/api/config")
async def get_config():
"""获取后端配置 API"""
try:
return {
"api_provider": os.getenv("API_PROVIDER", "auto"),
"has_groq_key": bool(os.getenv("GROQ_API_KEY")),
"has_siliconflow_key": bool(os.getenv("SILICONFLOW_API_KEY")),
"tts_provider": os.getenv("TTS_PROVIDER", "edge-tts"),
"siliconflow_tts_model": os.getenv("SILICONFLOW_TTS_MODEL", "fishaudio/fish-speech-1.5"),
"use_low_quality_audio": os.getenv("USE_LOW_QUALITY_AUDIO", "true").lower() == "true",
"max_concurrent_workers": int(os.getenv("MAX_CONCURRENT_WORKERS", "3")),
"cache_duration": int(os.getenv("CACHE_DURATION", "3600")),
"asr_model": os.getenv("ASR_MODEL", "whisper-large-v3"),
"llm_model": os.getenv("LLM_MODEL", "llama3-8b-8192"),
"version": "3.0.0",
"debug": os.getenv("DEBUG", "false").lower() == "true",
}
except Exception as e:
logger.error(f"获取后端配置失败: {e}")
return {"error": str(e)}
@app.post("/api/ping")
async def ping(client_config: str = Form(default="{}")):
"""统一连接测试 API"""
try:
# 解析客户端配置
ext_config = {}
if client_config and client_config != "{}":
try:
ext_config = json.loads(client_config)
logger.info(f"收到客户端配置: {ext_config}")
except json.JSONDecodeError as e:
return {
"success": False,
"error": f"客户端配置解析失败: {e}",
"message": "配置格式错误"
}
# 获取后端配置
backend_config = await get_config()
return {
"success": True,
"message": "连接成功",
"backend_config": backend_config,
"client_config_received": ext_config if ext_config else None
}
except Exception as e:
logger.error(f"ping 失败: {e}")
return {
"success": False,
"error": str(e),
"message": "连接失败"
}
@app.post("/api/save_config")
async def save_client_config(client_config: str = Form(...)):
"""保存客户端配置 API"""
try:
# 解析客户端配置
config_data = {}
if client_config and client_config != "{}":
try:
config_data = json.loads(client_config)
logger.info(f"保存客户端配置: {config_data}")
except json.JSONDecodeError as e:
return {
"success": False,
"error": f"配置解析失败: {e}",
"message": "配置格式错误"
}
# 这里可以将配置保存到文件或数据库
# 目前只是记录日志,实际项目中可以持久化存储
logger.info(f"[配置保存] 客户端配置已接收并记录: {config_data}")
# 如果有网关实例,可以将配置传递给网关
if gateway and gateway.is_initialized:
# 将客户端配置传递给网关,用于后续处理
gateway.update_client_config(config_data)
return {
"success": True,
"message": "配置保存成功",
"saved_config": config_data
}
except Exception as e:
logger.error(f"保存配置失败: {e}")
return {
"success": False,
"error": str(e),
"message": "配置保存失败"
}
@app.post("/api/process")
async def process_dubbing(
mode: str = Form(...),
url: Optional[str] = Form(None),
audio_file: Optional[UploadFile] = File(None),
client_config: str = Form(default="{}")
):
"""配音处理 API"""
try:
if not gateway:
return {"success": False, "error": "后端未初始化"}
# 解析客户端配置
ext_config = {}
if client_config and client_config != "{}":
try:
ext_config = json.loads(client_config)
except json.JSONDecodeError:
pass
# 准备处理数据
data = {"client_config": ext_config}
if mode == "url" and url:
data["url"] = url
actual_mode = "url"
elif mode == "record" and audio_file:
# 保存上传的音频文件
audio_content = await audio_file.read()
data["audio_data"] = audio_content
actual_mode = "record"
else:
return {"success": False, "error": "无效的输入参数"}
# 处理请求
result = None
async for update in gateway.process_request(actual_mode, data):
if update.get("state") == "completed":
result = update.get("result", {})
break
elif update.get("state") == "failed":
return {"success": False, "error": update.get("message", "处理失败")}
if result:
return {
"success": True,
"audio_url": result.get("audio_url"),
"processing_time": result.get("processing_time", 0),
"segments_processed": result.get("segments_processed", 0)
}
else:
return {"success": False, "error": "处理未返回结果"}
except Exception as e:
logger.error(f"配音处理失败: {e}")
return {"success": False, "error": str(e)}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)
@app.get("/api/capabilities")
async def get_capabilities():
"""
获取后端能力信息
用于客户端判断支持的功能
"""
return {
"websocket_supported": True,
"sse_supported": True, # SSE 在 HF Spaces 上更稳定
"segmented_processing": True,
"range_download": True, # 支持分段下载
"parallel_tts": True,
"max_segment_duration": 120, # 每段最大2分钟
"first_segment_target": 10, # 目标10秒内开始播放
"supported_modes": ["url", "record"],
"version": "3.1.0"
}
@app.post("/api/process_stream")
async def process_stream_sse(
mode: str = Form(...),
url: Optional[str] = Form(None),
audio: Optional[UploadFile] = File(None),
segment_duration: int = Form(default=120),
client_config: str = Form(default="{}")
):
"""
SSE 流式处理端点(推荐用于 HF Spaces)
比 WebSocket 更稳定,HF Spaces 代理对 SSE 支持更好。
支持两种输入方式:
1. URL模式:提供视频URL
2. 上传模式:上传音频文件(来自扩展端拦截或录制)
参数:
mode: 处理模式 (url/upload)
url: 视频URL(URL模式)
audio: 音频文件(上传模式)
segment_duration: 每段时长(秒),默认120秒
client_config: 客户端配置JSON
返回:
SSE 事件流,包含:
- progress: 进度更新
- segment_ready: 分段完成
- complete: 处理完成
- error: 错误信息
"""
async def event_generator() -> AsyncGenerator[str, None]:
"""SSE 事件生成器"""
try:
if not gateway or not gateway.is_initialized:
yield f"data: {json.dumps({'type': 'error', 'message': '后端未初始化'})}\n\n"
return
# 解析客户端配置
ext_config = {}
if client_config and client_config != "{}":
try:
ext_config = json.loads(client_config)
except json.JSONDecodeError:
pass
# 准备处理数据
process_data = {
"client_config": ext_config,
"segment_duration": segment_duration
}
# 根据模式准备数据
actual_mode = mode
if mode == "url" and url:
process_data["url"] = url
logger.info(f"[SSE] URL模式处理: {url[:50]}..., 分段时长: {segment_duration}秒")
elif mode == "upload" and audio:
# 读取上传的音频
audio_content = await audio.read()
process_data["audio_data"] = audio_content
actual_mode = "upload"
logger.info(f"[SSE] 上传模式处理: {len(audio_content)} bytes, 分段时长: {segment_duration}秒")
else:
yield f"data: {json.dumps({'type': 'error', 'message': '缺少视频URL或音频文件'})}\n\n"
return
# 发送初始进度
yield f"data: {json.dumps({'type': 'progress', 'progress': 5, 'message': '正在分析音频...'})}\n\n"
# 使用分段处理模式
segment_index = 0
total_duration = 0
async for update in gateway.process_request_segmented(actual_mode, process_data):
update_type = update.get("type", "progress")
if update_type == "progress":
yield f"data: {json.dumps({'type': 'progress', 'progress': update.get('progress', 0), 'message': update.get('message', '')})}\n\n"
elif update_type == "segment_ready":
segment_data = update.get("segment", {})
audio_data = segment_data.get("audio_data")
# 二进制数据转base64
if isinstance(audio_data, bytes):
audio_data = base64.b64encode(audio_data).decode('utf-8')
yield f"data: {json.dumps({'type': 'segment_ready', 'index': segment_index, 'startTime': segment_data.get('start_time', total_duration), 'duration': segment_data.get('duration', segment_duration), 'audioData': audio_data})}\n\n"
total_duration += segment_data.get("duration", segment_duration)
segment_index += 1
elif update_type == "complete":
yield f"data: {json.dumps({'type': 'complete', 'totalSegments': segment_index, 'totalDuration': total_duration, 'processingTime': update.get('processing_time', 0)})}\n\n"
break
elif update_type == "error":
yield f"data: {json.dumps({'type': 'error', 'message': update.get('message', '处理失败')})}\n\n"
break
logger.info(f"[SSE] 处理完成: {segment_index}段, 总时长: {total_duration}秒")
except Exception as e:
logger.error(f"[SSE] 处理错误: {e}")
yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # 禁用Nginx缓冲
}
)
@app.post("/api/quick_start")
async def quick_start_dubbing(
audio: UploadFile = File(...),
client_config: str = Form(default="{}")
):
"""
快速启动配音 API(SSE流式返回)
接收扩展端上传的音频(可以是拦截下载的或录制的),
使用SSE流式返回处理进度和分段音频。
参数:
audio: 音频文件
client_config: 客户端配置JSON
返回:
SSE 事件流,包含:
- progress: 进度更新
- segment_ready: 分段完成(第一段完成即可开始播放)
- complete: 处理完成
- error: 错误信息
"""
async def event_generator() -> AsyncGenerator[str, None]:
"""SSE 事件生成器 - 使用新的流式处理器"""
try:
if not stream_processor or not stream_processor.is_initialized:
yield f"data: {json.dumps({'type': 'error', 'message': '流式处理器未初始化'})}\n\n"
return
# 解析客户端配置
ext_config = {}
if client_config and client_config != "{}":
try:
ext_config = json.loads(client_config)
except json.JSONDecodeError:
pass
logger.info(f"[快速启动] 收到音频文件: {audio.filename}")
# 读取音频数据并保存到临时文件
audio_content = await audio.read()
temp_audio_path = f"temp/stream/upload_{os.urandom(4).hex()}.wav"
os.makedirs(os.path.dirname(temp_audio_path), exist_ok=True)
with open(temp_audio_path, 'wb') as f:
f.write(audio_content)
# 发送初始进度
yield f"data: {json.dumps({'type': 'progress', 'progress': 5, 'message': '音频上传完成,开始处理...'})}\n\n"
# 使用流式处理器处理
async for update in stream_processor.process_stream(temp_audio_path, ext_config):
yield f"data: {json.dumps(update)}\n\n"
# 清理临时文件
try:
if os.path.exists(temp_audio_path):
os.remove(temp_audio_path)
except:
pass
except Exception as e:
logger.error(f"[快速启动] 错误: {e}")
yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
# 存储待处理的任务(task_id -> 文件路径和配置)
pending_tasks: dict = {}
@app.post("/api/upload_audio")
async def upload_audio(
audio: UploadFile = File(...),
client_config: str = Form(default="{}")
):
"""
上传音频文件 API(只上传,不处理)
返回 task_id,前端通过 /api/process_task/{task_id} 获取 SSE 处理进度
参数:
audio: 音频文件
client_config: 客户端配置 JSON
返回:
JSON: { success: true, task_id: "xxx", message: "文件已上传" }
"""
try:
# 解析配置
ext_config = {}
if client_config and client_config != "{}":
try:
ext_config = json.loads(client_config)
except json.JSONDecodeError as e:
logger.warning(f"[上传] 配置解析失败: {e}")
# 生成任务 ID
task_id = os.urandom(8).hex()
# 保存文件
filename = audio.filename or "unknown"
file_ext = os.path.splitext(filename)[1] or ".mp3"
temp_audio_path = f"temp/tasks/{task_id}{file_ext}"
os.makedirs(os.path.dirname(temp_audio_path), exist_ok=True)
audio_content = await audio.read()
file_size_mb = len(audio_content) / (1024 * 1024)
with open(temp_audio_path, 'wb') as f:
f.write(audio_content)
# 存储任务信息
pending_tasks[task_id] = {
"file_path": temp_audio_path,
"config": ext_config,
"filename": filename,
"file_size": len(audio_content),
"created_at": time.time()
}
logger.info(f"[上传] 文件已保存: {temp_audio_path}, 大小: {file_size_mb:.2f}MB, task_id: {task_id}")
return {
"success": True,
"task_id": task_id,
"file_name": filename,
"file_size": len(audio_content),
"message": f"文件已上传 ({file_size_mb:.1f}MB)"
}
except Exception as e:
logger.error(f"[上传] 错误: {e}", exc_info=True)
return {"success": False, "error": str(e)}
@app.get("/api/process_task/{task_id}")
async def process_task(task_id: str):
"""
处理已上传的任务(SSE 流式响应)
前端通过 EventSource 连接此端点获取处理进度
参数:
task_id: 任务 ID(从 /api/upload_audio 获取)
返回:
SSE 事件流
"""
async def event_generator() -> AsyncGenerator[str, None]:
temp_audio_path = None
try:
# 检查任务是否存在
if task_id not in pending_tasks:
yield f"data: {json.dumps({'type': 'error', 'message': '任务不存在或已过期'})}\n\n"
return
task_info = pending_tasks.pop(task_id) # 取出并删除任务
temp_audio_path = task_info["file_path"]
ext_config = task_info["config"]
if not os.path.exists(temp_audio_path):
yield f"data: {json.dumps({'type': 'error', 'message': '音频文件不存在'})}\n\n"
return
if not stream_processor or not stream_processor.is_initialized:
yield f"data: {json.dumps({'type': 'error', 'message': '流式处理器未初始化'})}\n\n"
return
file_size_mb = os.path.getsize(temp_audio_path) / (1024 * 1024)
logger.info(f"[处理任务] 开始处理: {task_id}, 文件: {temp_audio_path}")
logger.info(f"[处理任务] 流式处理器状态: initialized={stream_processor.is_initialized}, groq_client={stream_processor.groq_client is not None}")
# 发送开始处理信号
yield f"data: {json.dumps({'type': 'progress', 'progress': 5, 'message': f'开始处理音频 ({file_size_mb:.1f}MB)...'})}\n\n"
# 使用流式处理器处理
logger.info(f"[处理任务] 准备调用 stream_processor.process_stream")
try:
async for update in stream_processor.process_stream(temp_audio_path, ext_config):
logger.debug(f"[处理任务] SSE 事件: {update.get('type', 'unknown')}")
yield f"data: {json.dumps(update)}\n\n"
logger.info(f"[处理任务] stream_processor.process_stream 完成")
except Exception as stream_error:
logger.error(f"[处理任务] stream_processor.process_stream 异常: {stream_error}", exc_info=True)
yield f"data: {json.dumps({'type': 'error', 'message': f'流式处理异常: {str(stream_error)}'})}\n\n"
except Exception as e:
logger.error(f"[处理任务] 错误: {e}", exc_info=True)
yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
finally:
# 清理临时文件
if temp_audio_path:
try:
if os.path.exists(temp_audio_path):
os.remove(temp_audio_path)
except Exception as e:
logger.warning(f"[处理任务] 清理临时文件失败: {e}")
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
@app.post("/api/upload_and_process")
async def upload_and_process(
audio: UploadFile = File(...),
client_config: str = Form(default="{}")
):
"""
上传音频并流式处理 API
完整流程:
扩展上传音频 → 语音识别 (Whisper V3 带时间戳) → 带时间戳分段异步处理
→ 翻译 + 角色识别 (Llama 3) → 语音合成 (Edge-TTS / SiliconFlow)
→ 音频同步对齐 (时间戳匹配) → 配音音频分段输出
参数:
audio: 音频文件(来自扩展端下载或录制)
client_config: 客户端配置JSON,包含:
- asrProvider: ASR提供商 (groq/siliconflow)
- ttsProvider: TTS提供商 (edge-tts/siliconflow)
- voiceMale: 男声语音模型
- voiceFemale: 女声语音模型
- syncOffset: 同步偏移量(毫秒)
返回:
SSE 事件流
"""
async def event_generator() -> AsyncGenerator[str, None]:
"""SSE 事件生成器"""
temp_audio_path = None
try:
if not stream_processor or not stream_processor.is_initialized:
yield f"data: {json.dumps({'type': 'error', 'message': '流式处理器未初始化'})}\n\n"
return
# 解析客户端配置
ext_config = {}
if client_config and client_config != "{}":
try:
ext_config = json.loads(client_config)
logger.info(f"[上传处理] 客户端配置: {list(ext_config.keys())}")
except json.JSONDecodeError as e:
logger.warning(f"[上传处理] 配置解析失败: {e}")
# 获取文件信息
filename = audio.filename or "unknown"
content_type = audio.content_type or "audio/mpeg"
logger.info(f"[上传处理] 收到文件: {filename}, 类型: {content_type}")
# 保存上传的音频到临时文件
file_ext = os.path.splitext(filename)[1] or ".mp3"
temp_audio_path = f"temp/stream/upload_{os.urandom(4).hex()}{file_ext}"
os.makedirs(os.path.dirname(temp_audio_path), exist_ok=True)
audio_content = await audio.read()
file_size_mb = len(audio_content) / (1024 * 1024)
with open(temp_audio_path, 'wb') as f:
f.write(audio_content)
logger.info(f"[上传处理] 音频已保存: {temp_audio_path}, 大小: {file_size_mb:.2f}MB")
# 发送初始进度
yield f"data: {json.dumps({'type': 'progress', 'progress': 5, 'message': f'音频上传完成 ({file_size_mb:.1f}MB),开始处理...'})}\n\n"
# 使用流式处理器处理
async for update in stream_processor.process_stream(temp_audio_path, ext_config):
yield f"data: {json.dumps(update)}\n\n"
except Exception as e:
logger.error(f"[上传处理] 错误: {e}", exc_info=True)
yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
finally:
# 清理临时文件
if temp_audio_path:
try:
if os.path.exists(temp_audio_path):
os.remove(temp_audio_path)
except Exception as e:
logger.warning(f"[上传处理] 清理临时文件失败: {e}")
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
@app.websocket("/ws/process")
async def websocket_process(websocket: WebSocket):
"""
WebSocket 流式处理端点
支持实时双向通信,分段处理和流式返回
消息格式:
- 客户端发送: {"type": "start_process", "url": "...", "config": {...}}
- 服务端发送: {"type": "progress|segment_ready|complete|error", ...}
"""
await websocket.accept()
logger.info("[WebSocket] 新连接建立")
try:
# 接收处理请求
data = await websocket.receive_json()
if data.get("type") != "start_process":
await websocket.send_json({
"type": "error",
"message": "无效的请求类型"
})
return
url = data.get("url")
config = data.get("config", {})
segment_duration = config.get("segmentDuration", 120) # 默认2分钟
if not url:
await websocket.send_json({
"type": "error",
"message": "缺少视频URL"
})
return
logger.info(f"[WebSocket] 开始处理: {url}, 分段时长: {segment_duration}秒")
# 发送初始进度
await websocket.send_json({
"type": "progress",
"progress": 5,
"message": "正在分析视频..."
})
# 检查网关是否初始化
if not gateway or not gateway.is_initialized:
await websocket.send_json({
"type": "error",
"message": "后端未初始化"
})
return
# 准备处理数据
process_data = {
"url": url,
"client_config": config,
"segmented": True,
"segment_duration": segment_duration
}
# 使用分段处理模式
segment_index = 0
total_duration = 0
async for update in gateway.process_request_segmented("url", process_data):
update_type = update.get("type", "progress")
if update_type == "progress":
# 进度更新
await websocket.send_json({
"type": "progress",
"progress": update.get("progress", 0),
"message": update.get("message", "")
})
elif update_type == "segment_ready":
# 某段处理完成
segment_data = update.get("segment", {})
audio_data = segment_data.get("audio_data")
# 如果是二进制数据,转换为base64
if isinstance(audio_data, bytes):
audio_data = base64.b64encode(audio_data).decode('utf-8')
await websocket.send_json({
"type": "segment_ready",
"index": segment_index,
"startTime": segment_data.get("start_time", total_duration),
"duration": segment_data.get("duration", segment_duration),
"audioData": audio_data
})
total_duration += segment_data.get("duration", segment_duration)
segment_index += 1
elif update_type == "complete":
# 处理完成
await websocket.send_json({
"type": "complete",
"totalSegments": segment_index,
"totalDuration": total_duration,
"processingTime": update.get("processing_time", 0)
})
break
elif update_type == "error":
# 处理错误
await websocket.send_json({
"type": "error",
"message": update.get("message", "处理失败")
})
break
logger.info(f"[WebSocket] 处理完成: {segment_index}段, 总时长: {total_duration}秒")
except WebSocketDisconnect:
logger.info("[WebSocket] 客户端断开连接")
except Exception as e:
logger.error(f"[WebSocket] 处理错误: {e}")
try:
await websocket.send_json({
"type": "error",
"message": str(e)
})
except:
pass
finally:
try:
await websocket.close()
except:
pass
@app.post("/api/process_segment")
async def process_segment(
mode: str = Form(...),
url: Optional[str] = Form(None),
start_time: int = Form(default=0),
duration: int = Form(default=120),
segment_index: int = Form(default=0),
client_config: str = Form(default="{}")
):
"""
分段处理 API
处理视频的指定时间段
参数:
mode: 处理模式 (url/record)
url: 视频URL
start_time: 起始时间(秒)
duration: 处理时长(秒)
segment_index: 段落索引
client_config: 客户端配置JSON
返回:
success: 是否成功
audio_data: base64编码的音频数据
actual_duration: 实际处理时长
is_last_segment: 是否是最后一段
"""
try:
if not gateway or not gateway.is_initialized:
return {"success": False, "error": "后端未初始化"}
# 解析客户端配置
ext_config = {}
if client_config and client_config != "{}":
try:
ext_config = json.loads(client_config)
except json.JSONDecodeError:
pass
if not url:
return {"success": False, "error": "缺少视频URL"}
logger.info(f"[分段处理] 段落{segment_index}: {start_time}s - {start_time + duration}s")
# 准备处理数据
process_data = {
"url": url,
"client_config": ext_config,
"start_time": start_time,
"duration": duration,
"segment_index": segment_index
}
# 调用网关处理单个分段
result = await gateway.process_single_segment("url", process_data)
if result.get("no_more_segments"):
return {
"success": True,
"no_more_segments": True,
"message": "视频已处理完毕"
}
if not result.get("success"):
return {
"success": False,
"error": result.get("error", "分段处理失败")
}
# 获取音频数据
audio_data = result.get("audio_data")
if isinstance(audio_data, bytes):
audio_data = base64.b64encode(audio_data).decode('utf-8')
return {
"success": True,
"audio_data": audio_data,
"actual_duration": result.get("actual_duration", duration),
"is_last_segment": result.get("is_last_segment", False),
"segment_index": segment_index
}
except Exception as e:
logger.error(f"[分段处理] 错误: {e}")
return {"success": False, "error": str(e)}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)