vLLMFLux / app_2.py
dtometzki's picture
Update app_2.py
a1d0de6 verified
# app_gradio_omni.py
import base64
import os
import tempfile
import gradio as gr
import requests
from openai import OpenAI
from vllm.assets.audio import AudioAsset
# === vLLM OpenAI-compatible endpoint ===
openai_api_key = "EMPTY"
# lokal
openai_api_base = "https://xdzk1d1ai8bzpv-8000.proxy.runpod.net/v1"
# RunPod Proxy (optional) – wenn du statt lokal nutzen willst, kommentiere oben aus und setze hier:
# openai_api_base = "https://xdzk1d1ai8bzpv-8000.proxy.runpod.net/v1"
client = OpenAI(api_key=openai_api_key, base_url=openai_api_base)
SEED = 42
MODEL_NAME = "Qwen/Qwen2.5-Omni-7B"
# -----------------------------
# Helpers (wie in deinem Script)
# -----------------------------
def encode_base64_content_from_url(content_url: str) -> str:
with requests.get(content_url) as response:
response.raise_for_status()
return base64.b64encode(response.content).decode("utf-8")
def encode_base64_content_from_file(file_path: str) -> str:
with open(file_path, "rb") as f:
content = f.read()
return base64.b64encode(content).decode("utf-8")
def get_video_url_from_path(video_path: str | None) -> str:
if not video_path:
return "https://huggingface.co/datasets/raushan-testing-hf/videos-test/resolve/main/sample_demo_1.mp4"
if video_path.startswith(("http://", "https://")):
return video_path
if not os.path.exists(video_path):
raise FileNotFoundError(f"Video file not found: {video_path}")
vp = video_path.lower()
if vp.endswith(".mp4"):
mime_type = "video/mp4"
elif vp.endswith(".webm"):
mime_type = "video/webm"
elif vp.endswith(".mov"):
mime_type = "video/quicktime"
elif vp.endswith(".avi"):
mime_type = "video/x-msvideo"
elif vp.endswith(".mkv"):
mime_type = "video/x-matroska"
else:
mime_type = "video/mp4"
video_base64 = encode_base64_content_from_file(video_path)
return f"data:{mime_type};base64,{video_base64}"
def get_image_url_from_path(image_path: str | None) -> str:
if not image_path:
return "https://vllm-public-assets.s3.us-west-2.amazonaws.com/vision_model_images/cherry_blossom.jpg"
if image_path.startswith(("http://", "https://")):
return image_path
if not os.path.exists(image_path):
raise FileNotFoundError(f"Image file not found: {image_path}")
ip = image_path.lower()
if ip.endswith((".jpg", ".jpeg")):
mime_type = "image/jpeg"
elif ip.endswith(".png"):
mime_type = "image/png"
elif ip.endswith(".gif"):
mime_type = "image/gif"
elif ip.endswith(".webp"):
mime_type = "image/webp"
else:
mime_type = "image/jpeg"
image_base64 = encode_base64_content_from_file(image_path)
return f"data:{mime_type};base64,{image_base64}"
def get_audio_url_from_path(audio_path: str | None) -> str:
if not audio_path:
return AudioAsset("mary_had_lamb").url
if audio_path.startswith(("http://", "https://")):
return audio_path
if not os.path.exists(audio_path):
raise FileNotFoundError(f"Audio file not found: {audio_path}")
ap = audio_path.lower()
if ap.endswith((".mp3", ".mpeg")):
mime_type = "audio/mpeg"
elif ap.endswith(".wav"):
mime_type = "audio/wav"
elif ap.endswith(".ogg"):
mime_type = "audio/ogg"
elif ap.endswith(".flac"):
mime_type = "audio/flac"
elif ap.endswith(".m4a"):
mime_type = "audio/mp4"
else:
mime_type = "audio/wav"
audio_base64 = encode_base64_content_from_file(audio_path)
return f"data:{mime_type};base64,{audio_base64}"
def get_system_prompt():
return {
"role": "system",
"content": [
{
"type": "text",
"text": (
"You are Qwen, a virtual human developed by the Qwen Team, "
"Alibaba Group, capable of perceiving auditory and visual inputs, "
"as well as generating text and speech."
),
}
],
}
def get_text_query(custom_prompt: str | None = None):
question = custom_prompt or "Explain the system architecture for a scalable audio generation pipeline."
return {
"role": "user",
"content": [{"type": "text", "text": question}],
}
def get_mixed_modalities_query(video_path=None, image_path=None, audio_path=None, custom_prompt=None):
question = custom_prompt or "What is recited in the audio? What is the content of this image? Why is this video funny?"
return {
"role": "user",
"content": [
{"type": "audio_url", "audio_url": {"url": get_audio_url_from_path(audio_path)}},
{"type": "image_url", "image_url": {"url": get_image_url_from_path(image_path)}},
{"type": "video_url", "video_url": {"url": get_video_url_from_path(video_path)}},
{"type": "text", "text": question},
],
}
def get_use_audio_in_video_query(video_path=None, custom_prompt=None):
question = custom_prompt or "Describe the content of the video, then convert what the baby say into text."
return {
"role": "user",
"content": [
{"type": "video_url", "video_url": {"url": get_video_url_from_path(video_path), "num_frames": 16}},
{"type": "text", "text": question},
],
}
def get_multi_audios_query(audio_path=None, custom_prompt=None):
question = custom_prompt or "Are these two audio clips the same?"
return {
"role": "user",
"content": [
{"type": "audio_url", "audio_url": {"url": get_audio_url_from_path(audio_path)}},
{"type": "audio_url", "audio_url": {"url": AudioAsset("winning_call").url}},
{"type": "text", "text": question},
],
}
query_map = {
"mixed_modalities": get_mixed_modalities_query,
"use_audio_in_video": get_use_audio_in_video_query,
"multi_audios": get_multi_audios_query,
"text": get_text_query,
}
# --- Sampling params wie bei dir ---
thinker_sampling_params = {
"temperature": 0.0,
"top_p": 1.0,
"top_k": -1,
"max_tokens": 2048,
"seed": SEED,
"detokenize": True,
"repetition_penalty": 1.1,
}
talker_sampling_params = {
"temperature": 0.9,
"top_p": 0.8,
"top_k": 40,
"max_tokens": 2048,
"seed": SEED,
"detokenize": True,
"repetition_penalty": 1.05,
"stop_token_ids": [8294],
}
code2wav_sampling_params = {
"temperature": 0.0,
"top_p": 1.0,
"top_k": -1,
"max_tokens": 2048,
"seed": SEED,
"detokenize": True,
"repetition_penalty": 1.1,
}
def build_messages_for_openai(chat_history_messages, user_prompt_block):
"""
chat_history_messages: Gradio messages-format: [{"role":"user|assistant","content":"..."}...]
user_prompt_block: dict im OpenAI multimodal-format (deine prompt dicts)
"""
msgs = [get_system_prompt()]
# History nur als Text (damit Omni nicht jedes Mal Video/Bild wiederholt)
for m in chat_history_messages:
msgs.append({"role": m["role"], "content": [{"type": "text", "text": m["content"]}]})
# Dann aktueller multimodal user prompt (audio/image/video/text)
msgs.append(user_prompt_block)
return msgs
def save_audio_base64_to_unique_wav(b64_data: str) -> str:
audio_bytes = base64.b64decode(b64_data)
fd, path = tempfile.mkstemp(prefix="omni_", suffix=".wav")
os.close(fd)
with open(path, "wb") as f:
f.write(audio_bytes)
return path
def call_omni(chat_history, query_type, prompt_text, video_path, image_path, audio_path, out_modalities, use_audio_in_video):
# Prompt bauen wie bei dir
qfunc = query_map[query_type]
if query_type == "mixed_modalities":
user_block = qfunc(video_path=video_path, image_path=image_path, audio_path=audio_path, custom_prompt=prompt_text)
elif query_type == "use_audio_in_video":
user_block = qfunc(video_path=video_path, custom_prompt=prompt_text)
elif query_type == "multi_audios":
user_block = qfunc(audio_path=audio_path, custom_prompt=prompt_text)
else: # text
user_block = qfunc(custom_prompt=prompt_text)
sampling_params_list = [thinker_sampling_params, talker_sampling_params, code2wav_sampling_params]
extra_body = {"sampling_params_list": sampling_params_list}
if query_type == "use_audio_in_video" and use_audio_in_video:
extra_body["mm_processor_kwargs"] = {"use_audio_in_video": True}
modalities = None
if out_modalities:
modalities = [m.strip() for m in out_modalities.split(",") if m.strip()]
resp = client.chat.completions.create(
model=MODEL_NAME,
messages=build_messages_for_openai(chat_history, user_block),
modalities=modalities,
extra_body=extra_body,
stream=False, # wie dein “gutes” saving (eine komplette WAV)
)
msg = resp.choices[0].message
# Text extrahieren (optional)
text_out = ""
if getattr(msg, "content", None):
if isinstance(msg.content, str):
text_out = msg.content
elif isinstance(msg.content, list):
parts = []
for p in msg.content:
if isinstance(p, dict) and p.get("type") == "text":
parts.append(p.get("text", ""))
text_out = "".join(parts)
audio_path_out = None
if getattr(msg, "audio", None) and getattr(msg.audio, "data", None):
audio_path_out = save_audio_base64_to_unique_wav(msg.audio.data)
# Chat UI updaten
assistant_label = text_out or ("[Audio erzeugt]" if audio_path_out else "[keine Ausgabe]")
chat_history = chat_history + [
{"role": "user", "content": prompt_text},
{"role": "assistant", "content": assistant_label},
]
return chat_history, audio_path_out, ""
with gr.Blocks() as demo:
gr.Markdown("## Omni Chat (wie dein Script) – Text + WAV Output")
chat = gr.Chatbot(height=420)
audio_player = gr.Audio(type="filepath", autoplay=True, label="Antwort-Audio (WAV)")
with gr.Row():
query_type = gr.Dropdown(
choices=list(query_map.keys()),
value="mixed_modalities",
label="query-type",
)
out_modalities = gr.Textbox(
value="audio",
label="modalities (z.B. audio oder text,audio)",
)
prompt_text = gr.Textbox(
label="Prompt",
value="Say hello and explain what you see/hear briefly.",
)
with gr.Row():
video_path = gr.Textbox(label="video-path (optional URL/Local)", value="")
image_path = gr.Textbox(label="image-path (optional URL/Local)", value="")
audio_path = gr.Textbox(label="audio-path (optional URL/Local)", value="")
use_audio_in_video = gr.Checkbox(value=True, label="mm_processor_kwargs: use_audio_in_video (nur für use_audio_in_video)")
send = gr.Button("Senden")
send.click(
fn=call_omni,
inputs=[chat, query_type, prompt_text, video_path, image_path, audio_path, out_modalities, use_audio_in_video],
outputs=[chat, audio_player, prompt_text],
)
demo.launch()