|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from transformers import AutoProcessor, AutoModel, AutoConfig, GenerationConfig |
|
|
import torch |
|
|
import os |
|
|
import time |
|
|
from pathlib import Path |
|
|
from typing import List, Dict, Any, Optional, Union |
|
|
import logging |
|
|
import sys |
|
|
os.environ["HF_HUB_OFFLINE"] = "1" |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
def add_to_sys_path_direct(model_path): |
|
|
"""Add model path directly to sys.path""" |
|
|
if model_path not in sys.path: |
|
|
sys.path.insert(0, model_path) |
|
|
print(f"✓ Added to sys.path: {model_path}") |
|
|
else: |
|
|
print(f"Already in sys.path: {model_path}") |
|
|
|
|
|
class NVOmniVideoInference: |
|
|
"""A class to handle NVOmni video model inference with improved error handling and flexibility.""" |
|
|
|
|
|
def __init__(self, model_path: str, torch_dtype="torch.float16", device_map="auto"): |
|
|
""" |
|
|
Initialize the NVOmni model for video inference. |
|
|
|
|
|
Args: |
|
|
model_path (str): Path to the model directory |
|
|
torch_dtype: PyTorch data type for model weights |
|
|
device_map (str): Device mapping strategy for model loading |
|
|
""" |
|
|
self.model_path = model_path |
|
|
self.torch_dtype = torch_dtype |
|
|
self.device_map = device_map |
|
|
self.model = None |
|
|
self.processor = None |
|
|
self.config = None |
|
|
self.device = None |
|
|
|
|
|
self.load_model() |
|
|
|
|
|
def validate_paths(self, model_path: str, video_path: str = None) -> bool: |
|
|
"""Validate that required paths exist.""" |
|
|
if not Path(model_path).exists(): |
|
|
logger.error(f"Model path does not exist: {model_path}") |
|
|
return False |
|
|
|
|
|
if video_path and not Path(video_path).exists(): |
|
|
logger.error(f"Video path does not exist: {video_path}") |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
def load_model(self) -> bool: |
|
|
"""Load the model, processor, and config with error handling.""" |
|
|
if not self.validate_paths(self.model_path): |
|
|
return False |
|
|
|
|
|
if True: |
|
|
logger.info("Loading model configuration...") |
|
|
self.config = AutoConfig.from_pretrained(self.model_path, trust_remote_code=True) |
|
|
|
|
|
logger.info("Loading model...") |
|
|
start_time = time.time() |
|
|
self.model = AutoModel.from_pretrained( |
|
|
self.model_path, |
|
|
trust_remote_code=True, |
|
|
torch_dtype=self.torch_dtype, |
|
|
device_map=self.device_map, |
|
|
low_cpu_mem_usage=True |
|
|
) |
|
|
load_time = time.time() - start_time |
|
|
logger.info(f"Model loaded in {load_time:.2f} seconds") |
|
|
|
|
|
logger.info("Loading processor...") |
|
|
self.processor = AutoProcessor.from_pretrained(self.model_path, trust_remote_code=True) |
|
|
|
|
|
|
|
|
if hasattr(self.model, 'device'): |
|
|
self.device = self.model.device |
|
|
else: |
|
|
self.device = next(self.model.parameters()).device if self.model.parameters() else torch.device('cpu') |
|
|
|
|
|
logger.info(f"Model successfully loaded on device: {self.device}") |
|
|
self._print_model_info() |
|
|
return True |
|
|
|
|
|
def _print_model_info(self): |
|
|
"""Print useful information about the loaded model.""" |
|
|
logger.info("=" * 50) |
|
|
logger.info("MODEL INFORMATION") |
|
|
logger.info("=" * 50) |
|
|
|
|
|
if self.config: |
|
|
logger.info(f"Model type: {getattr(self.config, 'model_type', 'Unknown')}") |
|
|
logger.info(f"Hidden size: {getattr(self.config, 'hidden_size', 'Unknown')}") |
|
|
|
|
|
if self.model and torch.cuda.is_available(): |
|
|
logger.info(f"GPU memory allocated: {torch.cuda.memory_allocated() / 1024**3:.2f} GB") |
|
|
logger.info(f"GPU memory reserved: {torch.cuda.memory_reserved() / 1024**3:.2f} GB") |
|
|
|
|
|
def create_conversation(self, video_path: str, text_prompt: str) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Create a conversation format for the model. |
|
|
|
|
|
Args: |
|
|
video_path (str): Path to the video file |
|
|
text_prompt (str): Text prompt for the model |
|
|
|
|
|
Returns: |
|
|
List[Dict]: Conversation in the expected format |
|
|
""" |
|
|
return [{ |
|
|
"role": "user", |
|
|
"content": [ |
|
|
{"type": "video", "video": video_path}, |
|
|
{"type": "text", "text": text_prompt} |
|
|
] |
|
|
}] |
|
|
|
|
|
@torch.inference_mode() |
|
|
def generate_response( |
|
|
self, |
|
|
video_path: str, |
|
|
text_prompt: str, |
|
|
max_new_tokens: int = 256, |
|
|
temperature: float = None, |
|
|
top_p: float = None, |
|
|
do_sample: bool = None, |
|
|
num_video_frames: int = -1, |
|
|
load_audio_in_video: bool = True, |
|
|
audio_length: Union[int, str] = "max_3600", |
|
|
) -> Optional[str]: |
|
|
""" |
|
|
Generate a response from the model given a video and text prompt. |
|
|
|
|
|
Args: |
|
|
video_path (str): Path to the video file |
|
|
text_prompt (str): Text prompt for the model |
|
|
max_new_tokens (int): Maximum number of new tokens to generate |
|
|
temperature (float): Sampling temperature |
|
|
top_p (float): Top-p sampling parameter |
|
|
do_sample (bool): Whether to use sampling |
|
|
custom_generation_config (GenerationConfig): Custom generation configuration |
|
|
|
|
|
Returns: |
|
|
Optional[str]: Generated response or None if failed |
|
|
""" |
|
|
if not self.model or not self.processor: |
|
|
logger.error("Model or processor not loaded. Please initialize the model first.") |
|
|
return None |
|
|
|
|
|
if not self.validate_paths(self.model_path, video_path): |
|
|
return None |
|
|
|
|
|
|
|
|
if True: |
|
|
|
|
|
logger.info(f"Processing video: {video_path}") |
|
|
logger.info(f"Text prompt: {text_prompt}") |
|
|
|
|
|
|
|
|
conversation = self.create_conversation(video_path, text_prompt) |
|
|
|
|
|
|
|
|
text = self.processor.apply_chat_template( |
|
|
conversation, |
|
|
tokenize=False, |
|
|
add_generation_prompt=True |
|
|
) |
|
|
logger.info(f"Chat template applied") |
|
|
|
|
|
|
|
|
self.model.config.load_audio_in_video = load_audio_in_video |
|
|
self.processor.config.load_audio_in_video = load_audio_in_video |
|
|
if num_video_frames > 0: |
|
|
self.model.config.num_video_frames = num_video_frames |
|
|
self.processor.config.num_video_frames = num_video_frames |
|
|
if audio_length != -1: |
|
|
self.model.config.audio_chunk_length = audio_length |
|
|
self.processor.config.audio_chunk_length = audio_length |
|
|
logger.info(f"Model config - load_audio_in_video: {self.model.config.load_audio_in_video}, num_video_frames: {self.model.config.num_video_frames}, audio_chunk_length: {self.model.config.audio_chunk_length}") |
|
|
|
|
|
|
|
|
start_time = time.time() |
|
|
inputs = self.processor([text]) |
|
|
|
|
|
|
|
|
if hasattr(inputs, 'input_ids') and inputs.input_ids is not None: |
|
|
inputs.input_ids = inputs.input_ids.to(self.device) |
|
|
|
|
|
processing_time = time.time() - start_time |
|
|
logger.info(f"Input processing completed in {processing_time:.2f} seconds") |
|
|
|
|
|
logger.info("Generating response...") |
|
|
start_time = time.time() |
|
|
|
|
|
generation_kwargs = {"max_new_tokens": max_new_tokens, "max_length": 99999999} |
|
|
if top_p is not None: |
|
|
generation_kwargs["top_p"] = top_p |
|
|
if do_sample is not None: |
|
|
generation_kwargs["do_sample"] = do_sample |
|
|
if temperature is not None: |
|
|
generation_kwargs["temperature"] = temperature |
|
|
|
|
|
generation_config = self.model.default_generation_config |
|
|
generation_config.update(**generation_kwargs) |
|
|
|
|
|
logger.info(f"Generation config: {generation_config.to_dict()}") |
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
output_ids = self.model.generate( |
|
|
input_ids=inputs.input_ids, |
|
|
media=getattr(inputs, 'media', None), |
|
|
media_config=getattr(inputs, 'media_config', None), |
|
|
generation_config=generation_config, |
|
|
) |
|
|
|
|
|
generation_time = time.time() - start_time |
|
|
logger.info(f"Generation completed in {generation_time:.2f} seconds") |
|
|
|
|
|
|
|
|
response = self.processor.tokenizer.batch_decode( |
|
|
output_ids, |
|
|
skip_special_tokens=True |
|
|
)[0] |
|
|
|
|
|
return response |
|
|
|
|
|
def batch_generate( |
|
|
self, |
|
|
video_text_pairs: List[tuple], |
|
|
**generation_kwargs |
|
|
) -> List[Optional[str]]: |
|
|
""" |
|
|
Generate responses for multiple video-text pairs. |
|
|
|
|
|
Args: |
|
|
video_text_pairs (List[tuple]): List of (video_path, text_prompt) tuples |
|
|
**generation_kwargs: Arguments passed to generate_response |
|
|
|
|
|
Returns: |
|
|
List[Optional[str]]: List of generated responses |
|
|
""" |
|
|
responses = [] |
|
|
for i, (video_path, text_prompt) in enumerate(video_text_pairs): |
|
|
logger.info(f"Processing batch item {i+1}/{len(video_text_pairs)}") |
|
|
response = self.generate_response(video_path, text_prompt, **generation_kwargs) |
|
|
responses.append(response) |
|
|
|
|
|
|
|
|
if torch.cuda.is_available(): |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
return responses |
|
|
|
|
|
def main(): |
|
|
"""Main function demonstrating usage of the NVOmni model.""" |
|
|
|
|
|
|
|
|
MODEL_PATH = "./" |
|
|
VIDEO_PATH = "xxx.mp4" |
|
|
TEXT_PROMPT = "Assess the video, followed by a detailed description of it's video and audio contents." |
|
|
|
|
|
num_video_frames=128 |
|
|
audio_length="max_3600" |
|
|
load_audio_in_video=True |
|
|
|
|
|
add_to_sys_path_direct(MODEL_PATH) |
|
|
|
|
|
|
|
|
logger.info("Initializing NVOmni Video Inference...") |
|
|
inferencer = NVOmniVideoInference(MODEL_PATH, torch_dtype="torch.float16") |
|
|
|
|
|
if inferencer.model is None: |
|
|
logger.error("Failed to initialize model. Exiting.") |
|
|
return |
|
|
|
|
|
|
|
|
logger.info("Starting inference...") |
|
|
response = inferencer.generate_response( |
|
|
video_path=VIDEO_PATH, |
|
|
text_prompt=TEXT_PROMPT, |
|
|
num_video_frames=num_video_frames, |
|
|
load_audio_in_video=load_audio_in_video, |
|
|
audio_length=audio_length, |
|
|
max_new_tokens=1024, |
|
|
) |
|
|
|
|
|
if response: |
|
|
print("\n" + "="*60) |
|
|
print("GENERATED RESPONSE") |
|
|
print("="*60) |
|
|
print(response) |
|
|
print("="*60) |
|
|
else: |
|
|
logger.error("Failed to generate response") |
|
|
|
|
|
|
|
|
if False: |
|
|
logger.info("\nExample: Batch processing") |
|
|
batch_pairs = [ |
|
|
(VIDEO_PATH, "What is happening in this video?"), |
|
|
(VIDEO_PATH, "Describe the audio content of this video."), |
|
|
] |
|
|
|
|
|
batch_responses = inferencer.batch_generate(batch_pairs, max_new_tokens=128) |
|
|
|
|
|
for i, (pair, response) in enumerate(zip(batch_pairs, batch_responses)): |
|
|
print(f"\n--- Batch Response {i+1} ---") |
|
|
print(f"Prompt: {pair[1]}") |
|
|
print(f"Response: {response}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |