# app_gradio_omni.py import base64 import os import tempfile import gradio as gr import requests from openai import OpenAI from vllm.assets.audio import AudioAsset # === vLLM OpenAI-compatible endpoint === openai_api_key = "EMPTY" # lokal openai_api_base = "https://xdzk1d1ai8bzpv-8000.proxy.runpod.net/v1" # RunPod Proxy (optional) – wenn du statt lokal nutzen willst, kommentiere oben aus und setze hier: # openai_api_base = "https://xdzk1d1ai8bzpv-8000.proxy.runpod.net/v1" client = OpenAI(api_key=openai_api_key, base_url=openai_api_base) SEED = 42 MODEL_NAME = "Qwen/Qwen2.5-Omni-7B" # ----------------------------- # Helpers (wie in deinem Script) # ----------------------------- def encode_base64_content_from_url(content_url: str) -> str: with requests.get(content_url) as response: response.raise_for_status() return base64.b64encode(response.content).decode("utf-8") def encode_base64_content_from_file(file_path: str) -> str: with open(file_path, "rb") as f: content = f.read() return base64.b64encode(content).decode("utf-8") def get_video_url_from_path(video_path: str | None) -> str: if not video_path: return "https://huggingface.co/datasets/raushan-testing-hf/videos-test/resolve/main/sample_demo_1.mp4" if video_path.startswith(("http://", "https://")): return video_path if not os.path.exists(video_path): raise FileNotFoundError(f"Video file not found: {video_path}") vp = video_path.lower() if vp.endswith(".mp4"): mime_type = "video/mp4" elif vp.endswith(".webm"): mime_type = "video/webm" elif vp.endswith(".mov"): mime_type = "video/quicktime" elif vp.endswith(".avi"): mime_type = "video/x-msvideo" elif vp.endswith(".mkv"): mime_type = "video/x-matroska" else: mime_type = "video/mp4" video_base64 = encode_base64_content_from_file(video_path) return f"data:{mime_type};base64,{video_base64}" def get_image_url_from_path(image_path: str | None) -> str: if not image_path: return "https://vllm-public-assets.s3.us-west-2.amazonaws.com/vision_model_images/cherry_blossom.jpg" if image_path.startswith(("http://", "https://")): return image_path if not os.path.exists(image_path): raise FileNotFoundError(f"Image file not found: {image_path}") ip = image_path.lower() if ip.endswith((".jpg", ".jpeg")): mime_type = "image/jpeg" elif ip.endswith(".png"): mime_type = "image/png" elif ip.endswith(".gif"): mime_type = "image/gif" elif ip.endswith(".webp"): mime_type = "image/webp" else: mime_type = "image/jpeg" image_base64 = encode_base64_content_from_file(image_path) return f"data:{mime_type};base64,{image_base64}" def get_audio_url_from_path(audio_path: str | None) -> str: if not audio_path: return AudioAsset("mary_had_lamb").url if audio_path.startswith(("http://", "https://")): return audio_path if not os.path.exists(audio_path): raise FileNotFoundError(f"Audio file not found: {audio_path}") ap = audio_path.lower() if ap.endswith((".mp3", ".mpeg")): mime_type = "audio/mpeg" elif ap.endswith(".wav"): mime_type = "audio/wav" elif ap.endswith(".ogg"): mime_type = "audio/ogg" elif ap.endswith(".flac"): mime_type = "audio/flac" elif ap.endswith(".m4a"): mime_type = "audio/mp4" else: mime_type = "audio/wav" audio_base64 = encode_base64_content_from_file(audio_path) return f"data:{mime_type};base64,{audio_base64}" def get_system_prompt(): return { "role": "system", "content": [ { "type": "text", "text": ( "You are Qwen, a virtual human developed by the Qwen Team, " "Alibaba Group, capable of perceiving auditory and visual inputs, " "as well as generating text and speech." ), } ], } def get_text_query(custom_prompt: str | None = None): question = custom_prompt or "Explain the system architecture for a scalable audio generation pipeline." return { "role": "user", "content": [{"type": "text", "text": question}], } def get_mixed_modalities_query(video_path=None, image_path=None, audio_path=None, custom_prompt=None): question = custom_prompt or "What is recited in the audio? What is the content of this image? Why is this video funny?" return { "role": "user", "content": [ {"type": "audio_url", "audio_url": {"url": get_audio_url_from_path(audio_path)}}, {"type": "image_url", "image_url": {"url": get_image_url_from_path(image_path)}}, {"type": "video_url", "video_url": {"url": get_video_url_from_path(video_path)}}, {"type": "text", "text": question}, ], } def get_use_audio_in_video_query(video_path=None, custom_prompt=None): question = custom_prompt or "Describe the content of the video, then convert what the baby say into text." return { "role": "user", "content": [ {"type": "video_url", "video_url": {"url": get_video_url_from_path(video_path), "num_frames": 16}}, {"type": "text", "text": question}, ], } def get_multi_audios_query(audio_path=None, custom_prompt=None): question = custom_prompt or "Are these two audio clips the same?" return { "role": "user", "content": [ {"type": "audio_url", "audio_url": {"url": get_audio_url_from_path(audio_path)}}, {"type": "audio_url", "audio_url": {"url": AudioAsset("winning_call").url}}, {"type": "text", "text": question}, ], } query_map = { "mixed_modalities": get_mixed_modalities_query, "use_audio_in_video": get_use_audio_in_video_query, "multi_audios": get_multi_audios_query, "text": get_text_query, } # --- Sampling params wie bei dir --- thinker_sampling_params = { "temperature": 0.0, "top_p": 1.0, "top_k": -1, "max_tokens": 2048, "seed": SEED, "detokenize": True, "repetition_penalty": 1.1, } talker_sampling_params = { "temperature": 0.9, "top_p": 0.8, "top_k": 40, "max_tokens": 2048, "seed": SEED, "detokenize": True, "repetition_penalty": 1.05, "stop_token_ids": [8294], } code2wav_sampling_params = { "temperature": 0.0, "top_p": 1.0, "top_k": -1, "max_tokens": 2048, "seed": SEED, "detokenize": True, "repetition_penalty": 1.1, } def build_messages_for_openai(chat_history_messages, user_prompt_block): """ chat_history_messages: Gradio messages-format: [{"role":"user|assistant","content":"..."}...] user_prompt_block: dict im OpenAI multimodal-format (deine prompt dicts) """ msgs = [get_system_prompt()] # History nur als Text (damit Omni nicht jedes Mal Video/Bild wiederholt) for m in chat_history_messages: msgs.append({"role": m["role"], "content": [{"type": "text", "text": m["content"]}]}) # Dann aktueller multimodal user prompt (audio/image/video/text) msgs.append(user_prompt_block) return msgs def save_audio_base64_to_unique_wav(b64_data: str) -> str: audio_bytes = base64.b64decode(b64_data) fd, path = tempfile.mkstemp(prefix="omni_", suffix=".wav") os.close(fd) with open(path, "wb") as f: f.write(audio_bytes) return path def call_omni(chat_history, query_type, prompt_text, video_path, image_path, audio_path, out_modalities, use_audio_in_video): # Prompt bauen wie bei dir qfunc = query_map[query_type] if query_type == "mixed_modalities": user_block = qfunc(video_path=video_path, image_path=image_path, audio_path=audio_path, custom_prompt=prompt_text) elif query_type == "use_audio_in_video": user_block = qfunc(video_path=video_path, custom_prompt=prompt_text) elif query_type == "multi_audios": user_block = qfunc(audio_path=audio_path, custom_prompt=prompt_text) else: # text user_block = qfunc(custom_prompt=prompt_text) sampling_params_list = [thinker_sampling_params, talker_sampling_params, code2wav_sampling_params] extra_body = {"sampling_params_list": sampling_params_list} if query_type == "use_audio_in_video" and use_audio_in_video: extra_body["mm_processor_kwargs"] = {"use_audio_in_video": True} modalities = None if out_modalities: modalities = [m.strip() for m in out_modalities.split(",") if m.strip()] resp = client.chat.completions.create( model=MODEL_NAME, messages=build_messages_for_openai(chat_history, user_block), modalities=modalities, extra_body=extra_body, stream=False, # wie dein “gutes” saving (eine komplette WAV) ) msg = resp.choices[0].message # Text extrahieren (optional) text_out = "" if getattr(msg, "content", None): if isinstance(msg.content, str): text_out = msg.content elif isinstance(msg.content, list): parts = [] for p in msg.content: if isinstance(p, dict) and p.get("type") == "text": parts.append(p.get("text", "")) text_out = "".join(parts) audio_path_out = None if getattr(msg, "audio", None) and getattr(msg.audio, "data", None): audio_path_out = save_audio_base64_to_unique_wav(msg.audio.data) # Chat UI updaten assistant_label = text_out or ("[Audio erzeugt]" if audio_path_out else "[keine Ausgabe]") chat_history = chat_history + [ {"role": "user", "content": prompt_text}, {"role": "assistant", "content": assistant_label}, ] return chat_history, audio_path_out, "" with gr.Blocks() as demo: gr.Markdown("## Omni Chat (wie dein Script) – Text + WAV Output") chat = gr.Chatbot(height=420) audio_player = gr.Audio(type="filepath", autoplay=True, label="Antwort-Audio (WAV)") with gr.Row(): query_type = gr.Dropdown( choices=list(query_map.keys()), value="mixed_modalities", label="query-type", ) out_modalities = gr.Textbox( value="audio", label="modalities (z.B. audio oder text,audio)", ) prompt_text = gr.Textbox( label="Prompt", value="Say hello and explain what you see/hear briefly.", ) with gr.Row(): video_path = gr.Textbox(label="video-path (optional URL/Local)", value="") image_path = gr.Textbox(label="image-path (optional URL/Local)", value="") audio_path = gr.Textbox(label="audio-path (optional URL/Local)", value="") use_audio_in_video = gr.Checkbox(value=True, label="mm_processor_kwargs: use_audio_in_video (nur für use_audio_in_video)") send = gr.Button("Senden") send.click( fn=call_omni, inputs=[chat, query_type, prompt_text, video_path, image_path, audio_path, out_modalities, use_audio_in_video], outputs=[chat, audio_player, prompt_text], ) demo.launch()