veo3.1-docpro / app.py
Hamed744's picture
Update app.py
44a2f07 verified
# app.py (نسخه نهایی با کسر اعتبار سمت کلاینت)
import io
import os
import warnings
import jwt
from fastapi import FastAPI, Form, File, UploadFile, HTTPException
from fastapi.responses import StreamingResponse, FileResponse
from huggingface_hub import InferenceClient
from typing import Optional
from urllib3.exceptions import InsecureRequestWarning
# --- تنظیمات اولیه ---
warnings.filterwarnings("ignore", category=InsecureRequestWarning)
app = FastAPI(
title="AI Video Studio API",
description="Final version with client-side credit deduction confirmation."
)
# --- کلید مخفی مشترک ---
SECRET_KEY = os.getenv("INTERNAL_API_KEY", "301101005")
# --- Endpoints ---
@app.get("/", response_class=FileResponse)
async def read_root():
return FileResponse('index.html')
async def generate_video_logic(prompt: str, token: str, image_bytes: Optional[bytes] = None):
try:
# 1. توکن JWT را اعتبارسنجی کن
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
api_token = payload.get("api_key")
# "deduction_token" در اینجا فقط برای اعتبارسنجی است و استفاده نمی‌شود
deduction_token = payload.get("deduction_token")
if not all([api_token, deduction_token]):
raise ValueError("Invalid token payload.")
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=403, detail="مجوز ساخت ویدیو منقضی شده است. لطفا دوباره تلاش کنید.")
except jwt.InvalidTokenError:
raise HTTPException(status_code=403, detail="مجوز ساخت ویدیو نامعتبر است.")
except Exception as e:
raise HTTPException(status_code=400, detail=f"خطا در پردازش مجوز: {e}")
# 2. با کلید API استخراج شده از توکن، ویدیو را بساز
try:
client = InferenceClient(provider="fal-ai", api_key=api_token)
if image_bytes:
video_bytes = client.image_to_video(image_bytes, prompt=prompt, model="akhaliq/veo3.1-fast-image-to-video")
else:
video_bytes = client.text_to_video(prompt, model="akhaliq/veo3.1-fast")
# 3. ویدیو را به همراه توکن کسر اعتبار به کلاینت برگردان
# مرورگر مسئول ارسال توکن کسر اعتبار خواهد بود.
response = StreamingResponse(io.BytesIO(video_bytes), media_type="video/mp4")
response.headers["X-Deduction-Token"] = deduction_token
return response
except Exception as e:
print(f"Hugging Face API Error: {e}")
raise HTTPException(status_code=500, detail="سرویس ساخت ویدیو با خطا مواجه شد ممکنه درخواست شما طبق قوانین نباشه لطفاً درخواست اصلاح کنید و همچنین با تصاویر کودکان از قسمت تصویر به ویدیو امکان ساخت ویدیو در این سرویس وجود نداره تصویر کودکان رو آپلود نکنید و یا افراد مشهور که باعث نقض قوانین میشه. (اعتباری کسر نشد)")
@app.post("/text-to-video/")
async def api_text_to_video(prompt: str = Form(...), token: str = Form(...)):
return await generate_video_logic(prompt, token)
@app.post("/image-to-video/")
async def api_image_to_video(prompt: str = Form(...), token: str = Form(...), image: UploadFile = File(...)):
if not image.content_type.startswith("image/"):
raise HTTPException(status_code=400, detail="فایل نامعتبر است. لطفاً یک تصویر آپلود کنید.")
image_bytes = await image.read()
return await generate_video_logic(prompt, token, image_bytes=image_bytes)