tuandunghcmut's picture
update
7801f62
import gradio as gr
from transformers import AutoProcessor, Qwen2_5_VLForConditionalGeneration, TextIteratorStreamer
from transformers.image_utils import load_image
from threading import Thread
import time
import torch
import spaces
import cv2
import numpy as np
from PIL import Image
from models import (
get_model_list, get_model_info, DEFAULT_GENERATION_PARAMS,
get_preset_list, get_preset_params, get_preset_description
)
def progress_bar_html(label: str) -> str:
"""
Returns an HTML snippet for a thin progress bar with a label.
The progress bar is styled as a dark animated bar.
"""
return f'''
<div style="display: flex; align-items: center;">
<span style="margin-right: 10px; font-size: 14px;">{label}</span>
<div style="width: 110px; height: 5px; background-color: #9370DB; border-radius: 2px; overflow: hidden;">
<div style="width: 100%; height: 100%; background-color: #4B0082; animation: loading 1.5s linear infinite;"></div>
</div>
</div>
<style>
@keyframes loading {{
0% {{ transform: translateX(-100%); }}
100% {{ transform: translateX(100%); }}
}}
</style>
'''
def downsample_video(video_path):
"""
Downsamples the video to 10 evenly spaced frames.
Each frame is converted to a PIL Image along with its timestamp.
"""
vidcap = cv2.VideoCapture(video_path)
total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = vidcap.get(cv2.CAP_PROP_FPS)
frames = []
if total_frames <= 0 or fps <= 0:
vidcap.release()
return frames
# Sample 10 evenly spaced frames.
frame_indices = np.linspace(0, total_frames - 1, 10, dtype=int)
for i in frame_indices:
vidcap.set(cv2.CAP_PROP_POS_FRAMES, i)
success, image = vidcap.read()
if success:
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(image)
timestamp = round(i / fps, 2)
frames.append((pil_image, timestamp))
vidcap.release()
return frames
# Initial model will be loaded when the first request comes in
processor = None
model = None
current_model_name = None
def load_model(model_name):
"""
Loads the model and processor based on the model name.
Returns the model and processor.
"""
global processor, model, current_model_name
# If the model is already loaded, return it
if model is not None and current_model_name == model_name:
return model, processor
# Get model info
model_info = get_model_info(model_name)
MODEL_ID = model_info["id"]
# Set dtype based on model info
dtype = getattr(torch, model_info["dtype"])
# Load processor and model
processor = AutoProcessor.from_pretrained(MODEL_ID, trust_remote_code=True)
model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
MODEL_ID,
trust_remote_code=True,
torch_dtype=dtype
).to(model_info["device"]).eval()
# Update current model name
current_model_name = model_name
return model, processor
@spaces.GPU
def model_inference(input_dict, history, model_name, temperature=DEFAULT_GENERATION_PARAMS["temperature"],
top_p=DEFAULT_GENERATION_PARAMS["top_p"], top_k=DEFAULT_GENERATION_PARAMS["top_k"],
max_new_tokens=DEFAULT_GENERATION_PARAMS["max_new_tokens"],
do_sample=DEFAULT_GENERATION_PARAMS["do_sample"],
num_beams=DEFAULT_GENERATION_PARAMS["num_beams"],
early_stopping=DEFAULT_GENERATION_PARAMS["early_stopping"],
length_penalty=DEFAULT_GENERATION_PARAMS["length_penalty"],
no_repeat_ngram_size=DEFAULT_GENERATION_PARAMS["no_repeat_ngram_size"],
repetition_penalty=DEFAULT_GENERATION_PARAMS["repetition_penalty"]):
# Load the selected model
model, processor = load_model(model_name)
text = input_dict["text"]
files = input_dict["files"]
if text.strip().lower().startswith("@video-infer"):
# Remove the tag from the query.
text = text[len("@video-infer"):].strip()
if not files:
gr.Error("Please upload a video file along with your @video-infer query.")
return
# Assume the first file is a video.
video_path = files[0]
frames = downsample_video(video_path)
if not frames:
gr.Error("Could not process video.")
return
# Build messages: start with the text prompt.
messages = [
{
"role": "user",
"content": [{"type": "text", "text": text}]
}
]
# Append each frame with a timestamp label.
for image, timestamp in frames:
messages[0]["content"].append({"type": "text", "text": f"Frame {timestamp}:"})
messages[0]["content"].append({"type": "image", "image": image})
# Collect only the images from the frames.
video_images = [image for image, _ in frames]
# Prepare the prompt.
prompt = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
inputs = processor(
text=[prompt],
images=video_images,
return_tensors="pt",
padding=True,
).to("cuda")
# Set up streaming generation.
streamer = TextIteratorStreamer(processor, skip_prompt=True, skip_special_tokens=True)
generation_kwargs = dict(
inputs,
streamer=streamer,
max_new_tokens=max_new_tokens,
temperature=temperature,
top_p=top_p,
top_k=top_k,
do_sample=do_sample,
num_beams=num_beams,
early_stopping=early_stopping,
length_penalty=length_penalty,
no_repeat_ngram_size=no_repeat_ngram_size,
repetition_penalty=repetition_penalty
)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
buffer = ""
yield progress_bar_html(f"Processing video with {model_name}")
for new_text in streamer:
buffer += new_text
time.sleep(0.01)
yield buffer
return
if len(files) > 1:
images = [load_image(image) for image in files]
elif len(files) == 1:
images = [load_image(files[0])]
else:
images = []
if text == "" and not images:
gr.Error("Please input a query and optionally image(s).")
return
if text == "" and images:
gr.Error("Please input a text query along with the image(s).")
return
messages = [
{
"role": "user",
"content": [
*[{"type": "image", "image": image} for image in images],
{"type": "text", "text": text},
],
}
]
prompt = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
inputs = processor(
text=[prompt],
images=images if images else None,
return_tensors="pt",
padding=True,
).to("cuda")
streamer = TextIteratorStreamer(processor, skip_prompt=True, skip_special_tokens=True)
generation_kwargs = dict(
inputs,
streamer=streamer,
max_new_tokens=max_new_tokens,
temperature=temperature,
top_p=top_p,
top_k=top_k,
do_sample=do_sample,
num_beams=num_beams,
early_stopping=early_stopping,
length_penalty=length_penalty,
no_repeat_ngram_size=no_repeat_ngram_size,
repetition_penalty=repetition_penalty
)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
buffer = ""
yield progress_bar_html(f"Processing with {model_name}")
for new_text in streamer:
buffer += new_text
time.sleep(0.01)
yield buffer
examples = [
[{"text": "Describe the Image?", "files": ["example_images/document.jpg"]}],
[{"text": "@video-infer Explain the content of the Advertisement", "files": ["example_images/videoplayback.mp4"]}],
[{"text": "@video-infer Explain the content of the video in detail", "files": ["example_images/breakfast.mp4"]}],
[{"text": "@video-infer Explain the content of the video.", "files": ["example_images/sky.mp4"]}],
]
def create_interface():
# Get the list of available models and presets
model_options = get_model_list()
preset_options = get_preset_list()
def apply_preset(preset_name):
"""Helper function to apply parameter presets"""
params = get_preset_params(preset_name)
return [
params["temperature"],
params["top_p"],
params["top_k"],
params["max_new_tokens"],
params["do_sample"],
params["num_beams"],
params["early_stopping"],
params["length_penalty"],
params["no_repeat_ngram_size"],
params["repetition_penalty"],
get_preset_description(preset_name)
]
with gr.Blocks() as demo:
gr.Markdown("# **Qwen2.5 Series (add `@video-infer` for video understanding)**")
with gr.Accordion("Model Settings", open=True):
with gr.Row():
model_dropdown = gr.Dropdown(
choices=model_options,
value=model_options[0],
label="Select Model"
)
with gr.Row():
preset_dropdown = gr.Dropdown(
choices=preset_options,
value="Default",
label="Parameter Preset"
)
preset_description = gr.Textbox(
value=get_preset_description("Default"),
label="Preset Description",
interactive=False
)
# Button to apply the selected preset
preset_button = gr.Button("Apply Preset")
with gr.Row():
temperature = gr.Slider(
minimum=0.0,
maximum=2.0,
value=DEFAULT_GENERATION_PARAMS["temperature"],
step=0.1,
label="Temperature",
info="Higher values produce more diverse outputs"
)
top_p = gr.Slider(
minimum=0.0,
maximum=1.0,
value=DEFAULT_GENERATION_PARAMS["top_p"],
step=0.05,
label="Top P",
info="Nucleus sampling: limit sampling to top P% of probability mass"
)
with gr.Row():
top_k = gr.Slider(
minimum=1,
maximum=100,
value=DEFAULT_GENERATION_PARAMS["top_k"],
step=1,
label="Top K",
info="Limit sampling to top K most likely tokens"
)
max_tokens = gr.Slider(
minimum=64,
maximum=2048,
value=DEFAULT_GENERATION_PARAMS["max_new_tokens"],
step=64,
label="Max New Tokens",
info="Maximum number of tokens to generate"
)
with gr.Row():
do_sample = gr.Checkbox(
value=DEFAULT_GENERATION_PARAMS["do_sample"],
label="Do Sample",
info="When enabled, uses sampling; when disabled, uses greedy decoding"
)
num_beams = gr.Slider(
minimum=1,
maximum=10,
value=DEFAULT_GENERATION_PARAMS["num_beams"],
step=1,
label="Beam Size",
info="Number of beams for beam search (1 = no beam search)"
)
with gr.Accordion("Advanced Parameters", open=False):
with gr.Row():
repetition_penalty = gr.Slider(
minimum=0.1,
maximum=2.0,
value=DEFAULT_GENERATION_PARAMS["repetition_penalty"],
step=0.1,
label="Repetition Penalty",
info="Penalize repetition (1.0 = no penalty, > 1.0 = penalty)"
)
length_penalty = gr.Slider(
minimum=-2.0,
maximum=2.0,
value=DEFAULT_GENERATION_PARAMS["length_penalty"],
step=0.1,
label="Length Penalty",
info="<1 favors shorter, >1 favors longer generations"
)
with gr.Row():
no_repeat_ngram_size = gr.Slider(
minimum=0,
maximum=10,
value=DEFAULT_GENERATION_PARAMS["no_repeat_ngram_size"],
step=1,
label="No Repeat NGram Size",
info="Size of ngrams that can't be repeated (0 = no constraint)"
)
early_stopping = gr.Checkbox(
value=DEFAULT_GENERATION_PARAMS["early_stopping"],
label="Early Stopping",
info="Stop beam search when best beam is found"
)
# Connect preset button with parameter controls
preset_button.click(
fn=apply_preset,
inputs=[preset_dropdown],
outputs=[
temperature,
top_p,
top_k,
max_tokens,
do_sample,
num_beams,
early_stopping,
length_penalty,
no_repeat_ngram_size,
repetition_penalty,
preset_description
]
)
# Update description when preset is selected
preset_dropdown.change(
fn=lambda x: get_preset_description(x),
inputs=[preset_dropdown],
outputs=[preset_description]
)
chatbot = gr.ChatInterface(
fn=model_inference,
additional_inputs=[
model_dropdown,
temperature,
top_p,
top_k,
max_tokens,
do_sample,
num_beams,
early_stopping,
length_penalty,
no_repeat_ngram_size,
repetition_penalty
],
examples=examples,
fill_height=True,
textbox=gr.MultimodalTextbox(label="Query Input", file_types=["image", "video"], file_count="multiple"),
stop_btn="Stop",
multimodal=True,
cache_examples=False,
type="messages",
)
return demo
demo = create_interface()
demo.launch(debug=True, mcp=True)