Spaces:
Running
on
Zero
Running
on
Zero
| from transformers import AutoProcessor, AutoModelForVision2Seq, AutoTokenizer | |
| from qwen_vl_utils import process_vision_info | |
| import torch | |
| import numpy as np | |
| import cv2, os, re | |
| def _get_video_fps(url_or_p:str): | |
| cap = cv2.VideoCapture(url_or_p) | |
| if not cap.isOpened(): | |
| raise ValueError(f"Cannot open video: {url_or_p}") | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| cap.release() | |
| return fps | |
| class eval_VideoScore2_float: | |
| def __init__(self, model_name: str): | |
| self.model, self.processor = self.load_model_processor(model_name) | |
| self.tokenizer = getattr(self.processor, "tokenizer", None) | |
| if self.tokenizer is None: | |
| self.tokenizer = AutoTokenizer.from_pretrained( | |
| model_name, | |
| trust_remote_code=True, | |
| use_fast=False, | |
| ) | |
| def load_model_processor(self, model_name): | |
| model = AutoModelForVision2Seq.from_pretrained( | |
| model_name, | |
| trust_remote_code=True, | |
| ).to("cuda") | |
| processor = AutoProcessor.from_pretrained(model_name, trust_remote_code=True) | |
| return model, processor | |
| def evaluate_video(self, | |
| user_prompt: str, | |
| video_path: str, | |
| kwargs: dict | |
| ) -> str | None: | |
| if not os.path.exists(video_path): | |
| raise ValueError(f"not exist: {video_path}") | |
| max_tokens=kwargs.get("max_tokens",4096) | |
| infer_fps=kwargs.get("infer_fps",2.0) | |
| temperature=kwargs.get("temperature",0.7) | |
| if infer_fps == "raw": | |
| infer_fps=_get_video_fps(video_path) | |
| messages = [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "video", | |
| "video": video_path, | |
| "fps":infer_fps | |
| }, | |
| { | |
| "type": "text", | |
| "text": user_prompt, | |
| }, | |
| ], | |
| } | |
| ] | |
| text = self.processor.apply_chat_template( | |
| messages, tokenize=False, add_generation_prompt=True | |
| ) | |
| try: | |
| image_inputs, video_inputs = process_vision_info(messages) | |
| except Exception as e: | |
| raise ValueError(f"error when reading: {video_path}") | |
| inputs = self.processor( | |
| text=[text], | |
| images=image_inputs, | |
| videos=video_inputs, | |
| fps=infer_fps, | |
| padding=True, | |
| return_tensors="pt", | |
| ) | |
| inputs = inputs.to("cuda") | |
| gen_out = self.model.generate( | |
| **inputs, | |
| max_new_tokens=max_tokens, | |
| output_scores=True, | |
| return_dict_in_generate=True, | |
| do_sample=True, | |
| temperature=temperature, | |
| ) | |
| sequences = gen_out.sequences | |
| scores = gen_out.scores | |
| input_len = inputs["input_ids"].shape[1] | |
| gen_token_ids = sequences[0, input_len:].tolist() | |
| output_text = self.processor.batch_decode( | |
| sequences[:, input_len:], skip_special_tokens=True, clean_up_tokenization_spaces=False | |
| )[0] | |
| pattern = r"visual quality:\s*(\d+).*?text-to-video alignment:\s*(\d+).*?physical/common-sense consistency:\s*(\d+)" | |
| match = re.search(pattern, output_text, re.DOTALL | re.IGNORECASE) | |
| if match: | |
| v_score_model = int(match.group(1)) | |
| t_score_model = int(match.group(2)) | |
| p_score_model = int(match.group(3)) | |
| else: | |
| v_score_model = t_score_model = p_score_model = None | |
| # def find_score_token_index_by_prompt(prompt_text: str) -> int: | |
| # prompt_tokens = self.tokenizer.encode(prompt_text, add_special_tokens=False) | |
| # gen_ids = gen_token_ids | |
| # print("Prompt tokens:", prompt_tokens, self.tokenizer.decode(prompt_tokens)) | |
| # print("Generated tokens snippet:", gen_ids[:50], self.tokenizer.decode(gen_ids[:50])) | |
| # for i in range(len(gen_ids) - len(prompt_tokens)): | |
| # if gen_ids[i:i+len(prompt_tokens)] == prompt_tokens: | |
| # j = i + len(prompt_tokens) | |
| # while j < len(gen_ids): | |
| # token_str = self.tokenizer.decode([gen_ids[j]]).strip() | |
| # if token_str.isdigit(): | |
| # return j | |
| # j += 1 | |
| # return -1 | |
| def find_score_token_index_by_prompt_v0(prompt_text: str) -> int: | |
| prompt_tokens = self.tokenizer.encode(prompt_text, add_special_tokens=False) | |
| gen_ids = gen_token_ids | |
| for i in range(len(gen_ids) - len(prompt_tokens)): | |
| if gen_ids[i:i+len(prompt_tokens)] == prompt_tokens: | |
| j = i + len(prompt_tokens) | |
| while j < len(gen_ids): | |
| token_str = self.tokenizer.decode([gen_ids[j]], skip_special_tokens=True).strip() | |
| if token_str.isdigit(): | |
| return j | |
| j += 1 | |
| return -1 | |
| def find_score_token_index_by_prompt(prompt_text: str): | |
| import re | |
| gen_ids = gen_token_ids | |
| gen_str = self.tokenizer.decode(gen_ids, skip_special_tokens=False) | |
| pattern = r"(?:\(\d+\)\s*|\n\s*)?" + re.escape(prompt_text) | |
| match = re.search(pattern, gen_str, flags=re.IGNORECASE) | |
| if not match: | |
| return -1 | |
| after_text = gen_str[match.end():] | |
| num_match = re.search(r"\d", after_text) | |
| if not num_match: | |
| return -1 | |
| target_substr = gen_str[:match.end() + num_match.start() + 1] | |
| for i in range(len(gen_ids)): | |
| partial = self.tokenizer.decode(gen_ids[:i+1], skip_special_tokens=False) | |
| if partial == target_substr: | |
| return i | |
| return -1 | |
| idx_v = find_score_token_index_by_prompt("visual quality:") | |
| idx_t = find_score_token_index_by_prompt("text-to-video alignment:") | |
| idx_p = find_score_token_index_by_prompt("physical/common-sense consistency:") | |
| def ll_based_soft_score_normed(hard_val, token_idx) -> float: | |
| if hard_val is None or token_idx < 0: | |
| return None | |
| logits = scores[token_idx][0] # [vocab] | |
| score_range = list(range(1, 6)) | |
| score_probs = [] # [(score, prob)] | |
| for s in score_range: | |
| ids = self.tokenizer.encode(str(s), add_special_tokens=False) | |
| if len(ids) == 1: | |
| tid = ids[0] | |
| logp = torch.log_softmax(logits, dim=-1)[tid].item() | |
| prob = float(np.exp(logp)) | |
| score_probs.append((s, prob)) | |
| else: | |
| print(f"[warn] score {s} maps to multi-token: {ids}, skipping.") | |
| if not score_probs: | |
| print("[warn] No valid score token found (1–5 all multi-token?)") | |
| return None | |
| scores_list, probs_list = zip(*score_probs) | |
| total_prob = sum(probs_list) | |
| max_prob = max(probs_list) | |
| max_idx = probs_list.index(max_prob) | |
| best_score = scores_list[max_idx] | |
| normalized_prob = max_prob / total_prob if total_prob > 0 else 0 | |
| soft_score = best_score * normalized_prob | |
| print(f"hard score={hard_val}, token_idx={token_idx}") | |
| for s, p in score_probs: | |
| print(f" score {s}: prob={p:.4f}") | |
| print(f" max prob={max_prob:.4f} at score={best_score}, total prob={total_prob:.4f}") | |
| print(f" normalized prob={normalized_prob:.4f}, soft score={soft_score:.4f}") | |
| return round(soft_score,4) | |
| v_soft = ll_based_soft_score_normed(v_score_model, idx_v) | |
| t_soft = ll_based_soft_score_normed(t_score_model, idx_t) | |
| p_soft = ll_based_soft_score_normed(p_score_model, idx_p) | |
| return v_soft, t_soft, p_soft, output_text | |