# SPDX-License-Identifier: Apache-2.0 # usage: # VLLM_USE_V1=1 python examples/offline_inference/data_parallel.py # we need to have a launcher to create multiple data parallel # ranks. And each rank will create a vLLM instance to process its own prompts. import os from vllm import LLM, SamplingParams from vllm.utils import get_open_port import random random.seed(42) from prompt import object_recognition_prompt_miradata, prompt_miradata_based_text prompt_generate = [prompt_miradata_based_text] from transformers import AutoTokenizer import jsonlines import json from multiprocessing import Process import time import argparse import gc import torch import psutil import pdb from tqdm import tqdm def get_agrs(): parser = argparse.ArgumentParser() parser.add_argument("--save_dir", type=str, default="/share/minghao/VideoProjects/Sythesis/LongVideoCaption/CaptionResults") parser.add_argument("--model", type=str, default="Qwen2.5-VL-72B-Instruct-AWQ") parser.add_argument("--GPUs_per_dp_rank", type=int, default=2) parser.add_argument("--DP_size", type=int, default=4) parser.add_argument("--start", type=int, default=0) parser.add_argument("--end", type=int, default=10000) parser.add_argument("--max_num_seqs", type=int, default=4) parser.add_argument("--max_model_len", type=int, default=32768) parser.add_argument("--max_tokens", type=int, default=8192) args = parser.parse_args() return args def get_have_processed(save_dir): names = os.listdir(save_dir) paths = [ os.path.join(save_dir, tmp) for tmp in names ] record_video_id = [] for path in paths: datas = load_jsonl(path) for data in datas: video_id = datas['clip_id'] if video_id in record_video_id: continue else: record_video_id.append(video_id) return record_video_id def load_jsonl(path): datas = [] # 读取 JSONL 文件 with jsonlines.open(path, "r") as reader: for obj in reader: datas.append(obj) return datas def load_json(path): with open(path, "r") as reader: datas = json.load(reader) return datas def main(dp_size, dp_rank, dp_master_ip, dp_master_port, GPUs_per_dp_rank, data_inps): os.environ["VLLM_DP_RANK"] = str(dp_rank) os.environ["VLLM_DP_SIZE"] = str(dp_size) os.environ["VLLM_DP_MASTER_IP"] = dp_master_ip os.environ["VLLM_DP_MASTER_PORT"] = str(dp_master_port) # set devices for each dp_rank os.environ["CUDA_VISIBLE_DEVICES"] = ",".join( str(i) for i in range(dp_rank * GPUs_per_dp_rank, (dp_rank + 1) * GPUs_per_dp_rank)) # Sample prompts. # prompts = [ # "Hello, my name is", # "The president of the United States is", # "The capital of France is", # "The future of AI is", # ] # with DP, each rank should process different prompts. # usually all the DP ranks process a full dataset, # and each rank processes a different part of the dataset. promts_per_rank = len(data_inps) // dp_size start = dp_rank * promts_per_rank end = start + promts_per_rank this_data_inps = data_inps[start:end] if len(this_data_inps) == 0: # if any rank has no prompts to process, # we need to set a placeholder prompt this_data_inps = ["Placeholder"] print(f"DP rank {dp_rank} needs to process {len(this_data_inps)} prompts") # Create a sampling params object. # since we are doing data parallel, every rank can have different # sampling params. here we set different max_tokens for different # ranks for demonstration. max_tokens = args.max_tokens sampling_params = SamplingParams(temperature=0.1, top_k=20, top_p=0.8, repetition_penalty=1.05, max_tokens=max_tokens) model_name = f"/share/minghao/Models/{args.model}" max_model_len = args.max_model_len max_num_seqs = args.max_num_seqs # Create an LLM. llm = LLM(model=model_name, tensor_parallel_size=GPUs_per_dp_rank,max_model_len=max_model_len,enforce_eager=True,gpu_memory_utilization=0.9, max_num_seqs=max_num_seqs) # batch_size = 2000 save_dir = args.save_dir os.makedirs(save_dir, exist_ok=True) save_name = f'{dp_rank}.jsonl' save_path = os.path.join(save_dir, save_name) with open(save_path, 'a') as file: for i in range(0, len(this_data_inps), batch_size): start = time.time() batch_this_data_inps = this_data_inps[i:i+batch_size] batch_prompts = [tmp['qa_prompt'] for tmp in batch_this_data_inps] outputs = llm.generate(batch_prompts, sampling_params) print(f'推理完成 Total Finish:{len(outputs)}') for idx, output in enumerate(outputs): this_inp = batch_this_data_inps[idx] prompt = output.prompt generated_qa = output.outputs[0].text this_inp['qa_prompt'] = prompt this_inp.update({"generated_qa": generated_qa}) file.write(json.dumps(this_inp) + "\n") file.flush() # 加上 flush 进一步保险 end = time.time() del batch_this_data_inps, batch_prompts, outputs gc.collect() print(f'batch time cost: {end-start}s') print(f"[Memory] CPU: {psutil.Process(os.getpid()).memory_info().rss / 1024 ** 2:.2f} MB") print(f"[Memory] GPU: {torch.cuda.memory_allocated() / 1024 ** 2:.2f} MB") def read_all_captions(root_caption_dir, caption_file_names): caption_file_dir_list = [os.path.join(root_caption_dir, file_name) for file_name in caption_file_names] datas = [] for caption_dir in caption_file_dir_list: caption_file_names = sorted(os.listdir(caption_dir)) caption_file_paths = [os.path.join(caption_dir, name) for name in caption_file_names] for path in caption_file_paths: datas += load_jsonl(path) return datas if __name__ == "__main__": args = get_agrs() datas = load_json('/share/minghao/VideoProjects/Sythesis2/Candidates/miradata_youtube_31k_5_10min_filter_clips.json') print(f'Total Video Size: {len(datas)}') # 整理成以clip为单位,并且适合合成 new_datas = [] for data in tqdm(datas): clips = data['clips'] for clip in clips: clip['clip_id'] = str(clip['clip_id']) + '_' + clip['video_id'] new_datas.extend(clips) print(f'Total Clips Size: {len(new_datas)}') datas = new_datas start = args.start end = args.end datas = datas[start:end] print(f'Start: {start}, End: {end}') print(f'to process size: {len(datas)}') save_dir = args.save_dir if os.path.exists(save_dir): have_downloaded = get_have_processed(save_dir) filter_datas = [] for data in tqdm(datas, desc='Filtering 2...'): if data['clip_id'] in have_downloaded: continue else: filter_datas.append(data) datas = filter_datas print(f'have_downloaded size : {len(have_downloaded)}') print(f'rest to process size : {len(datas)}') model_name = f"/share/minghao/Models/{args.model}" # Initialize the tokenizer tokenizer = AutoTokenizer.from_pretrained(model_name) prompts = [] data_inps = [] for data in datas: this_prompt = random.choice(prompt_generate) dense_caption = data['dense_caption'] background_caption = data['background_caption'] main_object_caption = data['main_object_caption'] system_prompt, user_prompt = this_prompt(dense_caption, background_caption, main_object_caption) messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ] prompt = tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) prompts.append(prompt) data['qa_prompt'] = prompt data_inps.append(data) print(f'Total size: {len(prompts)}') print(f'Sample show: {prompts[0]}') # start = time.time() # dp_master_ip = "127.0.0.1" # dp_master_port = get_open_port() # procs = [] # GPUs_per_dp_rank = args.GPUs_per_dp_rank # DP_size = args.DP_size # for i in range(DP_size): # proc = Process(target=main, # args=(DP_size, i, dp_master_ip, dp_master_port, # GPUs_per_dp_rank, data_inps)) # proc.start() # procs.append(proc) # print(f'OOM了没有?') # exit_code = 0 # for proc in procs: # proc.join() # if proc.exitcode: # exit_code = proc.exitcode # end = time.time() # print(f'Total size: {len(prompts)}', f'Total time cost: {end-start}s') # exit(exit_code)