Spaces:
Paused
Paused
| # app_gradio_omni.py | |
| import base64 | |
| import os | |
| import tempfile | |
| import gradio as gr | |
| import requests | |
| from openai import OpenAI | |
| from vllm.assets.audio import AudioAsset | |
| # === vLLM OpenAI-compatible endpoint === | |
| openai_api_key = "EMPTY" | |
| # lokal | |
| openai_api_base = "https://xdzk1d1ai8bzpv-8000.proxy.runpod.net/v1" | |
| # RunPod Proxy (optional) – wenn du statt lokal nutzen willst, kommentiere oben aus und setze hier: | |
| # openai_api_base = "https://xdzk1d1ai8bzpv-8000.proxy.runpod.net/v1" | |
| client = OpenAI(api_key=openai_api_key, base_url=openai_api_base) | |
| SEED = 42 | |
| MODEL_NAME = "Qwen/Qwen2.5-Omni-7B" | |
| # ----------------------------- | |
| # Helpers (wie in deinem Script) | |
| # ----------------------------- | |
| def encode_base64_content_from_url(content_url: str) -> str: | |
| with requests.get(content_url) as response: | |
| response.raise_for_status() | |
| return base64.b64encode(response.content).decode("utf-8") | |
| def encode_base64_content_from_file(file_path: str) -> str: | |
| with open(file_path, "rb") as f: | |
| content = f.read() | |
| return base64.b64encode(content).decode("utf-8") | |
| def get_video_url_from_path(video_path: str | None) -> str: | |
| if not video_path: | |
| return "https://huggingface.co/datasets/raushan-testing-hf/videos-test/resolve/main/sample_demo_1.mp4" | |
| if video_path.startswith(("http://", "https://")): | |
| return video_path | |
| if not os.path.exists(video_path): | |
| raise FileNotFoundError(f"Video file not found: {video_path}") | |
| vp = video_path.lower() | |
| if vp.endswith(".mp4"): | |
| mime_type = "video/mp4" | |
| elif vp.endswith(".webm"): | |
| mime_type = "video/webm" | |
| elif vp.endswith(".mov"): | |
| mime_type = "video/quicktime" | |
| elif vp.endswith(".avi"): | |
| mime_type = "video/x-msvideo" | |
| elif vp.endswith(".mkv"): | |
| mime_type = "video/x-matroska" | |
| else: | |
| mime_type = "video/mp4" | |
| video_base64 = encode_base64_content_from_file(video_path) | |
| return f"data:{mime_type};base64,{video_base64}" | |
| def get_image_url_from_path(image_path: str | None) -> str: | |
| if not image_path: | |
| return "https://vllm-public-assets.s3.us-west-2.amazonaws.com/vision_model_images/cherry_blossom.jpg" | |
| if image_path.startswith(("http://", "https://")): | |
| return image_path | |
| if not os.path.exists(image_path): | |
| raise FileNotFoundError(f"Image file not found: {image_path}") | |
| ip = image_path.lower() | |
| if ip.endswith((".jpg", ".jpeg")): | |
| mime_type = "image/jpeg" | |
| elif ip.endswith(".png"): | |
| mime_type = "image/png" | |
| elif ip.endswith(".gif"): | |
| mime_type = "image/gif" | |
| elif ip.endswith(".webp"): | |
| mime_type = "image/webp" | |
| else: | |
| mime_type = "image/jpeg" | |
| image_base64 = encode_base64_content_from_file(image_path) | |
| return f"data:{mime_type};base64,{image_base64}" | |
| def get_audio_url_from_path(audio_path: str | None) -> str: | |
| if not audio_path: | |
| return AudioAsset("mary_had_lamb").url | |
| if audio_path.startswith(("http://", "https://")): | |
| return audio_path | |
| if not os.path.exists(audio_path): | |
| raise FileNotFoundError(f"Audio file not found: {audio_path}") | |
| ap = audio_path.lower() | |
| if ap.endswith((".mp3", ".mpeg")): | |
| mime_type = "audio/mpeg" | |
| elif ap.endswith(".wav"): | |
| mime_type = "audio/wav" | |
| elif ap.endswith(".ogg"): | |
| mime_type = "audio/ogg" | |
| elif ap.endswith(".flac"): | |
| mime_type = "audio/flac" | |
| elif ap.endswith(".m4a"): | |
| mime_type = "audio/mp4" | |
| else: | |
| mime_type = "audio/wav" | |
| audio_base64 = encode_base64_content_from_file(audio_path) | |
| return f"data:{mime_type};base64,{audio_base64}" | |
| def get_system_prompt(): | |
| return { | |
| "role": "system", | |
| "content": [ | |
| { | |
| "type": "text", | |
| "text": ( | |
| "You are Qwen, a virtual human developed by the Qwen Team, " | |
| "Alibaba Group, capable of perceiving auditory and visual inputs, " | |
| "as well as generating text and speech." | |
| ), | |
| } | |
| ], | |
| } | |
| def get_text_query(custom_prompt: str | None = None): | |
| question = custom_prompt or "Explain the system architecture for a scalable audio generation pipeline." | |
| return { | |
| "role": "user", | |
| "content": [{"type": "text", "text": question}], | |
| } | |
| def get_mixed_modalities_query(video_path=None, image_path=None, audio_path=None, custom_prompt=None): | |
| question = custom_prompt or "What is recited in the audio? What is the content of this image? Why is this video funny?" | |
| return { | |
| "role": "user", | |
| "content": [ | |
| {"type": "audio_url", "audio_url": {"url": get_audio_url_from_path(audio_path)}}, | |
| {"type": "image_url", "image_url": {"url": get_image_url_from_path(image_path)}}, | |
| {"type": "video_url", "video_url": {"url": get_video_url_from_path(video_path)}}, | |
| {"type": "text", "text": question}, | |
| ], | |
| } | |
| def get_use_audio_in_video_query(video_path=None, custom_prompt=None): | |
| question = custom_prompt or "Describe the content of the video, then convert what the baby say into text." | |
| return { | |
| "role": "user", | |
| "content": [ | |
| {"type": "video_url", "video_url": {"url": get_video_url_from_path(video_path), "num_frames": 16}}, | |
| {"type": "text", "text": question}, | |
| ], | |
| } | |
| def get_multi_audios_query(audio_path=None, custom_prompt=None): | |
| question = custom_prompt or "Are these two audio clips the same?" | |
| return { | |
| "role": "user", | |
| "content": [ | |
| {"type": "audio_url", "audio_url": {"url": get_audio_url_from_path(audio_path)}}, | |
| {"type": "audio_url", "audio_url": {"url": AudioAsset("winning_call").url}}, | |
| {"type": "text", "text": question}, | |
| ], | |
| } | |
| query_map = { | |
| "mixed_modalities": get_mixed_modalities_query, | |
| "use_audio_in_video": get_use_audio_in_video_query, | |
| "multi_audios": get_multi_audios_query, | |
| "text": get_text_query, | |
| } | |
| # --- Sampling params wie bei dir --- | |
| thinker_sampling_params = { | |
| "temperature": 0.0, | |
| "top_p": 1.0, | |
| "top_k": -1, | |
| "max_tokens": 2048, | |
| "seed": SEED, | |
| "detokenize": True, | |
| "repetition_penalty": 1.1, | |
| } | |
| talker_sampling_params = { | |
| "temperature": 0.9, | |
| "top_p": 0.8, | |
| "top_k": 40, | |
| "max_tokens": 2048, | |
| "seed": SEED, | |
| "detokenize": True, | |
| "repetition_penalty": 1.05, | |
| "stop_token_ids": [8294], | |
| } | |
| code2wav_sampling_params = { | |
| "temperature": 0.0, | |
| "top_p": 1.0, | |
| "top_k": -1, | |
| "max_tokens": 2048, | |
| "seed": SEED, | |
| "detokenize": True, | |
| "repetition_penalty": 1.1, | |
| } | |
| def build_messages_for_openai(chat_history_messages, user_prompt_block): | |
| """ | |
| chat_history_messages: Gradio messages-format: [{"role":"user|assistant","content":"..."}...] | |
| user_prompt_block: dict im OpenAI multimodal-format (deine prompt dicts) | |
| """ | |
| msgs = [get_system_prompt()] | |
| # History nur als Text (damit Omni nicht jedes Mal Video/Bild wiederholt) | |
| for m in chat_history_messages: | |
| msgs.append({"role": m["role"], "content": [{"type": "text", "text": m["content"]}]}) | |
| # Dann aktueller multimodal user prompt (audio/image/video/text) | |
| msgs.append(user_prompt_block) | |
| return msgs | |
| def save_audio_base64_to_unique_wav(b64_data: str) -> str: | |
| audio_bytes = base64.b64decode(b64_data) | |
| fd, path = tempfile.mkstemp(prefix="omni_", suffix=".wav") | |
| os.close(fd) | |
| with open(path, "wb") as f: | |
| f.write(audio_bytes) | |
| return path | |
| def call_omni(chat_history, query_type, prompt_text, video_path, image_path, audio_path, out_modalities, use_audio_in_video): | |
| # Prompt bauen wie bei dir | |
| qfunc = query_map[query_type] | |
| if query_type == "mixed_modalities": | |
| user_block = qfunc(video_path=video_path, image_path=image_path, audio_path=audio_path, custom_prompt=prompt_text) | |
| elif query_type == "use_audio_in_video": | |
| user_block = qfunc(video_path=video_path, custom_prompt=prompt_text) | |
| elif query_type == "multi_audios": | |
| user_block = qfunc(audio_path=audio_path, custom_prompt=prompt_text) | |
| else: # text | |
| user_block = qfunc(custom_prompt=prompt_text) | |
| sampling_params_list = [thinker_sampling_params, talker_sampling_params, code2wav_sampling_params] | |
| extra_body = {"sampling_params_list": sampling_params_list} | |
| if query_type == "use_audio_in_video" and use_audio_in_video: | |
| extra_body["mm_processor_kwargs"] = {"use_audio_in_video": True} | |
| modalities = None | |
| if out_modalities: | |
| modalities = [m.strip() for m in out_modalities.split(",") if m.strip()] | |
| resp = client.chat.completions.create( | |
| model=MODEL_NAME, | |
| messages=build_messages_for_openai(chat_history, user_block), | |
| modalities=modalities, | |
| extra_body=extra_body, | |
| stream=False, # wie dein “gutes” saving (eine komplette WAV) | |
| ) | |
| msg = resp.choices[0].message | |
| # Text extrahieren (optional) | |
| text_out = "" | |
| if getattr(msg, "content", None): | |
| if isinstance(msg.content, str): | |
| text_out = msg.content | |
| elif isinstance(msg.content, list): | |
| parts = [] | |
| for p in msg.content: | |
| if isinstance(p, dict) and p.get("type") == "text": | |
| parts.append(p.get("text", "")) | |
| text_out = "".join(parts) | |
| audio_path_out = None | |
| if getattr(msg, "audio", None) and getattr(msg.audio, "data", None): | |
| audio_path_out = save_audio_base64_to_unique_wav(msg.audio.data) | |
| # Chat UI updaten | |
| assistant_label = text_out or ("[Audio erzeugt]" if audio_path_out else "[keine Ausgabe]") | |
| chat_history = chat_history + [ | |
| {"role": "user", "content": prompt_text}, | |
| {"role": "assistant", "content": assistant_label}, | |
| ] | |
| return chat_history, audio_path_out, "" | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## Omni Chat (wie dein Script) – Text + WAV Output") | |
| chat = gr.Chatbot(height=420) | |
| audio_player = gr.Audio(type="filepath", autoplay=True, label="Antwort-Audio (WAV)") | |
| with gr.Row(): | |
| query_type = gr.Dropdown( | |
| choices=list(query_map.keys()), | |
| value="mixed_modalities", | |
| label="query-type", | |
| ) | |
| out_modalities = gr.Textbox( | |
| value="audio", | |
| label="modalities (z.B. audio oder text,audio)", | |
| ) | |
| prompt_text = gr.Textbox( | |
| label="Prompt", | |
| value="Say hello and explain what you see/hear briefly.", | |
| ) | |
| with gr.Row(): | |
| video_path = gr.Textbox(label="video-path (optional URL/Local)", value="") | |
| image_path = gr.Textbox(label="image-path (optional URL/Local)", value="") | |
| audio_path = gr.Textbox(label="audio-path (optional URL/Local)", value="") | |
| use_audio_in_video = gr.Checkbox(value=True, label="mm_processor_kwargs: use_audio_in_video (nur für use_audio_in_video)") | |
| send = gr.Button("Senden") | |
| send.click( | |
| fn=call_omni, | |
| inputs=[chat, query_type, prompt_text, video_path, image_path, audio_path, out_modalities, use_audio_in_video], | |
| outputs=[chat, audio_player, prompt_text], | |
| ) | |
| demo.launch() | |