from transformers import AutoProcessor, AutoModelForVision2Seq, AutoTokenizer from qwen_vl_utils import process_vision_info import torch import numpy as np import cv2, os, re def _get_video_fps(url_or_p:str): cap = cv2.VideoCapture(url_or_p) if not cap.isOpened(): raise ValueError(f"Cannot open video: {url_or_p}") fps = cap.get(cv2.CAP_PROP_FPS) cap.release() return fps class eval_VideoScore2_float: def __init__(self, model_name: str): self.model, self.processor = self.load_model_processor(model_name) self.tokenizer = getattr(self.processor, "tokenizer", None) if self.tokenizer is None: self.tokenizer = AutoTokenizer.from_pretrained( model_name, trust_remote_code=True, use_fast=False, ) def load_model_processor(self, model_name): model = AutoModelForVision2Seq.from_pretrained( model_name, trust_remote_code=True, ).to("cuda") processor = AutoProcessor.from_pretrained(model_name, trust_remote_code=True) return model, processor def evaluate_video(self, user_prompt: str, video_path: str, kwargs: dict ) -> str | None: if not os.path.exists(video_path): raise ValueError(f"not exist: {video_path}") max_tokens=kwargs.get("max_tokens",4096) infer_fps=kwargs.get("infer_fps",2.0) temperature=kwargs.get("temperature",0.7) if infer_fps == "raw": infer_fps=_get_video_fps(video_path) messages = [ { "role": "user", "content": [ { "type": "video", "video": video_path, "fps":infer_fps }, { "type": "text", "text": user_prompt, }, ], } ] text = self.processor.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) try: image_inputs, video_inputs = process_vision_info(messages) except Exception as e: raise ValueError(f"error when reading: {video_path}") inputs = self.processor( text=[text], images=image_inputs, videos=video_inputs, fps=infer_fps, padding=True, return_tensors="pt", ) inputs = inputs.to("cuda") gen_out = self.model.generate( **inputs, max_new_tokens=max_tokens, output_scores=True, return_dict_in_generate=True, do_sample=True, temperature=temperature, ) sequences = gen_out.sequences scores = gen_out.scores input_len = inputs["input_ids"].shape[1] gen_token_ids = sequences[0, input_len:].tolist() output_text = self.processor.batch_decode( sequences[:, input_len:], skip_special_tokens=True, clean_up_tokenization_spaces=False )[0] pattern = r"visual quality:\s*(\d+).*?text-to-video alignment:\s*(\d+).*?physical/common-sense consistency:\s*(\d+)" match = re.search(pattern, output_text, re.DOTALL | re.IGNORECASE) if match: v_score_model = int(match.group(1)) t_score_model = int(match.group(2)) p_score_model = int(match.group(3)) else: v_score_model = t_score_model = p_score_model = None # def find_score_token_index_by_prompt(prompt_text: str) -> int: # prompt_tokens = self.tokenizer.encode(prompt_text, add_special_tokens=False) # gen_ids = gen_token_ids # print("Prompt tokens:", prompt_tokens, self.tokenizer.decode(prompt_tokens)) # print("Generated tokens snippet:", gen_ids[:50], self.tokenizer.decode(gen_ids[:50])) # for i in range(len(gen_ids) - len(prompt_tokens)): # if gen_ids[i:i+len(prompt_tokens)] == prompt_tokens: # j = i + len(prompt_tokens) # while j < len(gen_ids): # token_str = self.tokenizer.decode([gen_ids[j]]).strip() # if token_str.isdigit(): # return j # j += 1 # return -1 def find_score_token_index_by_prompt_v0(prompt_text: str) -> int: prompt_tokens = self.tokenizer.encode(prompt_text, add_special_tokens=False) gen_ids = gen_token_ids for i in range(len(gen_ids) - len(prompt_tokens)): if gen_ids[i:i+len(prompt_tokens)] == prompt_tokens: j = i + len(prompt_tokens) while j < len(gen_ids): token_str = self.tokenizer.decode([gen_ids[j]], skip_special_tokens=True).strip() if token_str.isdigit(): return j j += 1 return -1 def find_score_token_index_by_prompt(prompt_text: str): import re gen_ids = gen_token_ids gen_str = self.tokenizer.decode(gen_ids, skip_special_tokens=False) pattern = r"(?:\(\d+\)\s*|\n\s*)?" + re.escape(prompt_text) match = re.search(pattern, gen_str, flags=re.IGNORECASE) if not match: return -1 after_text = gen_str[match.end():] num_match = re.search(r"\d", after_text) if not num_match: return -1 target_substr = gen_str[:match.end() + num_match.start() + 1] for i in range(len(gen_ids)): partial = self.tokenizer.decode(gen_ids[:i+1], skip_special_tokens=False) if partial == target_substr: return i return -1 idx_v = find_score_token_index_by_prompt("visual quality:") idx_t = find_score_token_index_by_prompt("text-to-video alignment:") idx_p = find_score_token_index_by_prompt("physical/common-sense consistency:") def ll_based_soft_score_normed(hard_val, token_idx) -> float: if hard_val is None or token_idx < 0: return None logits = scores[token_idx][0] # [vocab] score_range = list(range(1, 6)) score_probs = [] # [(score, prob)] for s in score_range: ids = self.tokenizer.encode(str(s), add_special_tokens=False) if len(ids) == 1: tid = ids[0] logp = torch.log_softmax(logits, dim=-1)[tid].item() prob = float(np.exp(logp)) score_probs.append((s, prob)) else: print(f"[warn] score {s} maps to multi-token: {ids}, skipping.") if not score_probs: print("[warn] No valid score token found (1–5 all multi-token?)") return None scores_list, probs_list = zip(*score_probs) total_prob = sum(probs_list) max_prob = max(probs_list) max_idx = probs_list.index(max_prob) best_score = scores_list[max_idx] normalized_prob = max_prob / total_prob if total_prob > 0 else 0 soft_score = best_score * normalized_prob print(f"hard score={hard_val}, token_idx={token_idx}") for s, p in score_probs: print(f" score {s}: prob={p:.4f}") print(f" max prob={max_prob:.4f} at score={best_score}, total prob={total_prob:.4f}") print(f" normalized prob={normalized_prob:.4f}, soft score={soft_score:.4f}") return round(soft_score,4) v_soft = ll_based_soft_score_normed(v_score_model, idx_v) t_soft = ll_based_soft_score_normed(t_score_model, idx_t) p_soft = ll_based_soft_score_normed(p_score_model, idx_p) return v_soft, t_soft, p_soft, output_text