import random import os import argparse import time from vllm import LLM, SamplingParams from datetime import datetime from tqdm import tqdm from transformers import AutoTokenizer, AutoModelForCausalLM from eval_tools import apply_RL_prompt, solve_final_answer from evaluate import evaluate from utils import set_seed, load_jsonl, save_jsonl, construct_prompt from parser import * from trajectory import * from data_loader import load_data from python_executor import PythonExecutor from model_utils import load_hf_lm_and_tokenizer, generate_completions import logging ## 启动logging功能 if not os.path.exists(f'{os.environ["modelname"]}'): os.mkdir(f'{os.environ["modelname"]}') if not os.path.exists(f'{os.environ["model"]}'): os.mkdir(f'{os.environ["model"]}') DATA_NAME = os.environ["DATA_NAME"] logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename=f'{os.environ["model"]}/{os.environ["mode"]}-{DATA_NAME}.log', filemode='a') print(f"logging in {os.environ['model']}/{os.environ['mode']}-{DATA_NAME}.log") logging.info(f"modelname's infor: {os.environ['modelname']}") logging.info(f"mode's infor: {os.environ['mode']}") logging.info(f"model's infor: {os.environ['model']}") with open('./special_tokens.json') as f: special_tokens = json.load(f) bins_tokens = [ special_tokens[f"{i}"] for i in range(400) ] import os import multiprocessing from multiprocessing import Process, Manager, Queue from transformers import AutoTokenizer from vllm import LLM, SamplingParams import fcntl import torch def process_chunk(chunk, args, available_gpus, stop_words, output_queue, index, two_stage=False): """ 处理一个数据块的函数 """ GPU_LOCK_FILES = {gpu_id: f"/tmp/gpu_{gpu_id}.lock" for gpu_id in available_gpus} # 设置当前进程使用的GPU os.environ["CUDA_VISIBLE_DEVICES"] = str(available_gpus[index % len(available_gpus)]) gpu_id = available_gpus[index % len(available_gpus)] lock_file = f"/tmp/gpu_{gpu_id}.lock" # 每个 GPU 对应一个锁文件 # llm = LLM( # model=args.model_name_or_path, # tensor_parallel_size=1, # pipeline_parallel_size=1, # trust_remote_code=True, # gpu_memory_utilization=0.95, # enforce_eager=True, # max_seq_len_to_capture=65536, # ) # chunk_outputs = [] # for i in range(0, len(chunk), 125): # chunk_batch = chunk[i:i + 125] # if args.use_vllm: # if os.environ['stage'] == "add": # budget = args.max_tokens_per_call + 50 # else: # budget = args.max_tokens_per_call # os.environ["position"] = 'start' # chunk_batch_outputs = llm.generate( # chunk_batch, # SamplingParams( # temperature=args.temperature, # top_p=0.9, # max_tokens=budget if not two_stage else 20, # n=1, # stop=stop_words, # stop_token_ids=( # [151645, 151643] # if "qwen2" in args.model_name_or_path.lower() # else None # ), # skip_special_tokens=False, # ), # ) # if os.path.exists('./start_positions.pt'): # os.remove('./start_positions.pt') # if os.path.exists('./early_positions.pt'): # os.remove('./early_positions.pt') # chunk_batch_outputs = sorted(chunk_batch_outputs, key=lambda x: int(x.request_id)) # chunk_batch_outputs = [output.outputs[0].text for output in chunk_batch_outputs] # batch_chunk = [single_chunk + chunk_output for single_chunk, chunk_output in zip(chunk_batch, chunk_batch_outputs)] # chunk_outputs.extend(batch_chunk) # # 将处理结果存储到共享列表中 # output_queue.put((index, chunk_outputs)) # 获取文件锁(排他锁) with open(lock_file, 'w') as f: fcntl.flock(f, fcntl.LOCK_EX) # 获取排他锁,其他进程会阻塞直到锁释放 try: # 设置当前进程使用的 GPU os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id) # 初始化 LLM(确保在锁内执行,避免多个进程同时初始化) llm = LLM( model=args.model_name_or_path, tensor_parallel_size=1, pipeline_parallel_size=1, trust_remote_code=True, gpu_memory_utilization=0.95, enforce_eager=True, max_seq_len_to_capture=65536, ) chunk_outputs = [] for i in range(0, len(chunk), 125): chunk_batch = chunk[i:i + 125] if args.use_vllm: if os.environ['stage'] == "add": budget = args.max_tokens_per_call + (args.max_tokens_per_call // 50) + 50 else: budget = args.max_tokens_per_call + (args.max_tokens_per_call // 50) os.environ["position"] = 'start' chunk_batch_outputs = llm.generate( chunk_batch, SamplingParams( temperature=args.temperature, top_p=0.9, max_tokens=budget if not two_stage else 20, n=1, stop=stop_words, stop_token_ids=( [151645, 151643] if "qwen2" in args.model_name_or_path.lower() else None ), skip_special_tokens=False, ), ) if os.path.exists('./start_positions.pt'): os.remove('./start_positions.pt') if os.path.exists('./early_positions.pt'): os.remove('./early_positions.pt') chunk_batch_outputs = sorted(chunk_batch_outputs, key=lambda x: int(x.request_id)) chunk_batch_outputs = [output.outputs[0].text for output in chunk_batch_outputs] batch_chunk = [single_chunk + chunk_output for single_chunk, chunk_output in zip(chunk_batch, chunk_batch_outputs)] chunk_outputs.extend(batch_chunk) # 处理完成后,将结果存入队列 output_queue.put((index, chunk_outputs)) finally: # 显式销毁 llm 对象 if 'llm' in locals(): del llm # 确保 LLM 引擎被销毁 # torch.cuda.empty_cache() # 清理 GPU 内存 # 释放锁(确保即使发生异常也能释放) fcntl.flock(f, fcntl.LOCK_UN) def clean_code(code): for bin_token in bins_tokens: if bin_token in code: code = code.replace(bin_token, "") return code def parse_args(): parser = argparse.ArgumentParser() parser.add_argument("--ratio", type=float, default=-1, help="ratio of cot to use for generation") parser.add_argument("--data_names", default="math", type=str) parser.add_argument("--data_dir", default="./data", type=str) parser.add_argument("--model_name_or_path", default="Qwen/QwQ-32B-Preview", type=str) parser.add_argument("--output_dir", default="Qwen/QwQ-32B-Preview/math_eval", type=str) parser.add_argument("--prompt_type", default="qwen25-math-cot", type=str) parser.add_argument("--split", default="test", type=str) parser.add_argument("--num_test_sample", default=-1, type=int) # -1 for full data parser.add_argument("--seed", default=0, type=int) parser.add_argument("--start", default=0, type=int) parser.add_argument("--end", default=-1, type=int) parser.add_argument("--temperature", default=0, type=float) parser.add_argument("--n_sampling", default=1, type=int) parser.add_argument("--top_p", default=1, type=float) parser.add_argument("--max_tokens_per_call", default=4096, type=int) parser.add_argument("--shuffle", action="store_true") parser.add_argument("--use_vllm", action="store_true") parser.add_argument("--save_outputs", action="store_true") parser.add_argument("--overwrite", action="store_true") parser.add_argument("--use_safetensors", action="store_true") parser.add_argument("--num_shots", type=int, default=0) parser.add_argument("--apply_chat_template", action="store_true", help="Apply chat template to prompt.",) parser.add_argument("--pipeline_parallel_size", type=int, default=1) parser.add_argument("--adapt_few_shot", action="store_true", help="Few shot for multiple-choice questions, zero shot for others.",) args = parser.parse_args() args.top_p = (1 if args.temperature == 0 else args.top_p) # top_p must be 1 when using greedy sampling (vllm) # if args.ratio > 0: # args.max_tokens_per_call = 50 return args def set_output_path(args, data_name): # args.output_dir defines experiment path,such as outputs/12_25 model_name_list = args.model_name_or_path.split('/')[-1] model_name = model_name_list for part in model_name_list: if 'models' in part: model_name = part # print(f"args.output_dir: {args.output_dir}") # print(f"model_name: {model_name}") # print(f"args.prompt_type: {args.prompt_type}") output_dir = os.path.join(args.output_dir, model_name, args.prompt_type) out_file_prefix = f"{args.split}_{args.prompt_type}_{args.num_test_sample}_seed{args.seed}_t{args.temperature}" out_file = f"{output_dir}/{data_name}/{out_file_prefix}_s{args.start}_e{args.end}_b{int(args.max_tokens_per_call)}_original.jsonl" print(out_file) os.makedirs(f"{output_dir}/{data_name}", exist_ok=True) return out_file_prefix, output_dir, out_file def prepare_data(data_name, args): examples = load_data(data_name, args.split, args.data_dir) # sample `num_test_sample` from dataset, -1 for full data if args.num_test_sample > 0: # examples = random.sample(examples, min(args.num_test_sample, len(examples))) examples = examples[: args.num_test_sample] # shuffle if args.shuffle: random.seed(datetime.now().timestamp()) random.shuffle(examples) # select start and end examples = examples[args.start : len(examples) if args.end == -1 else args.end] # get out_file name dt_string = datetime.now().strftime("%m-%d_%H-%M") model_name = "/".join(args.model_name_or_path.split("/")[-2:]) # get out_file_prefix, output_dir and out_file out_file_prefix, output_dir, out_file = set_output_path(args, data_name) # load all processed samples processed_samples = [] if not args.overwrite: processed_files = [ f for f in os.listdir(f"{output_dir}/{data_name}/") if f.endswith(".jsonl") and f.startswith(out_file_prefix) ] for f in processed_files: processed_samples.extend( list(load_jsonl(f"{output_dir}/{data_name}/{f}")) ) # dedepulicate processed_samples = {sample["idx"]: sample for sample in processed_samples} processed_idxs = list(processed_samples.keys()) processed_samples = list(processed_samples.values()) examples = [example for example in examples if example["idx"] not in processed_idxs] return examples, processed_samples, out_file def setup(args): # load model available_gpus = os.environ["CUDA_VISIBLE_DEVICES"].split(",") if args.use_vllm: # llm = LLM( # model=args.model_name_or_path, # tensor_parallel_size=len(available_gpus) // args.pipeline_parallel_size, # pipeline_parallel_size=args.pipeline_parallel_size, # trust_remote_code=True, # gpu_memory_utilization=0.85, # enforce_eager=True, # max_seq_len_to_capture=5000000, # # enable_flash_attn=True # ) llm = None tokenizer = None # if args.apply_chat_template: tokenizer = AutoTokenizer.from_pretrained( args.model_name_or_path, trust_remote_code=True, max_length=16000, ) else: llm, tokenizer = load_hf_lm_and_tokenizer( model_name_or_path=args.model_name_or_path, load_in_half=True, use_fast_tokenizer=True, use_safetensors=args.use_safetensors, ) # infer & eval data_list = args.data_names.split(",") results = [] for data_name in data_list: results.append(main(llm, tokenizer, data_name, args)) # add "avg" result to data_list and results data_list.append("avg") results.append( { "acc": sum([result["acc"] for result in results]) / len(results), } ) # print all results pad = max([len(data_name) for data_name in data_list]) print("\t".join(data_name.ljust(pad, " ") for data_name in data_list)) print("\t".join([f"{result['acc']:.1f}".ljust(pad, " ") for result in results])) logging.info("\t".join(data_name.ljust(pad, " ") for data_name in data_list)) logging.info(f"os.environ['PE_MODE'] = {os.environ['PE_MODE']}") logging.info(f"path = {args.model_name_or_path}") logging.info(f"tip = {os.environ['tip']}") logging.info(f"BUDGET = {os.environ['BUDGET']}") logging.info("\t".join([f"{result['acc']:.1f}".ljust(pad, " ") for result in results])) def is_multi_choice(answer): for c in answer: if c not in ["A", "B", "C", "D", "E"]: return False return True def main(llm, tokenizer, data_name, args): examples, processed_samples, out_file = prepare_data(data_name, args) print(examples[0]) print("\n" + "-" * 50) print("data:", data_name, ", remain samples:", len(examples)) if len(examples) > 0: print(examples[0]) available_gpus = os.environ["CUDA_VISIBLE_DEVICES"].split(",") print(f"available_gpus = {available_gpus}") # init python executor if "pal" in args.prompt_type: executor = PythonExecutor(get_answer_expr="solution()") else: executor = PythonExecutor(get_answer_from_stdout=True) # load done samples if args.ratio > 0 : done_samples_path = out_file.replace("_r" + str(args.ratio), "") done_samples = list(load_jsonl(done_samples_path)) else: done_samples = [] done_samples = {sample["idx"]: sample for sample in done_samples} samples = [] print("\nProcessing", len(examples), "examples", "=" * 50) for example in tqdm(examples, total=len(examples)): idx = example["idx"] # parse question and answer example["question"] = parse_question(example, data_name) if example["question"] == "": continue gt_cot, gt_ans = parse_ground_truth(example, data_name) example["gt_ans"] = gt_ans full_prompt = construct_prompt(example, data_name, args) # # add ratio part of complete cot if args.ratio > 0 : done_cot = done_samples[idx]["code"][0] cut_cot = done_cot[:int(len(done_cot)*args.ratio)] # # 将prompt中的<|im_start|>assistant\n换成新内容 # full_prompt = full_prompt.replace("<|im_start|>assistant\n", "<|im_start|>assistant\n" + cut_cot + "\n\nFinal answer within \\boxed{{}}:\n") # 直接在prompt的后面添加新内容 full_prompt = full_prompt + cut_cot + "\n\nFinal answer within \\boxed{{}}:\n" if idx == args.start: print(full_prompt) sample = { "idx": idx, "question": example["question"], "gt_cot": gt_cot, "gt": gt_ans, "prompt": full_prompt, } # add remain fields for key in [ "level", "type", "unit", "solution_type", "choices", "solution", "ques_type", "ans_type", "answer_type", "dataset", "subfield", "filed", "theorem", "answer", ]: if key in example: sample[key] = example[key] samples.append(sample) # repeat n times input_prompts = [sample["prompt"] for sample in samples for _ in range(args.n_sampling)] input_prompts = apply_RL_prompt(input_prompts, args, budget = args.max_tokens_per_call) if args.apply_chat_template: input_prompts = [ tokenizer.apply_chat_template( [{"role": "user", "content": prompt.strip()}], tokenize=False, add_generation_prompt=True, ) for prompt in input_prompts ] remain_prompts = input_prompts remain_prompts = [(i, prompt) for i, prompt in enumerate(remain_prompts)] end_prompts = [] max_func_call = 1 if args.prompt_type in ["cot", "pal", "qwen25-math-cot"] else 4 stop_words = ["", "<|im_end|>", "<|endoftext|>", "<|end▁of▁sentence|>"] if args.prompt_type in ["cot"]: stop_words.append("\n\nQuestion:") if args.prompt_type in ["pal", "tool-integrated", "jiuzhang_tora"]: stop_words.extend(["\n\n---", "```output"]) elif args.prompt_type in ["wizard_zs", "platypus_fs"]: stop_words.extend(["Instruction", "Response"]) elif "jiuzhang" in args.prompt_type: stop_words.append("\n\n## Question") elif "numina" in args.prompt_type: stop_words.append("\n### Problem") elif "pure" in args.prompt_type: stop_words.append("\n\n\n") # start inference # measure time use start_time = time.time() print(f"start_time: {start_time}") for epoch in range(max_func_call): print("-" * 20, "Epoch", epoch) current_prompts = remain_prompts if len(current_prompts) == 0: break prompts = [item[1] for item in current_prompts] # prompts = apply_RL_prompt(prompts, args, budget = args.max_tokens_per_call) num_prompts = len(prompts) chunk_size = 256 #(num_prompts + 4) // 5 # 确保包含所有的 prompts outputs = [] if os.environ['tip'] == "remaining" or os.environ['tip'] == "ATD_R": for i in range(0, num_prompts, chunk_size): # print(prompts[i]) chunk = prompts[i:i + chunk_size] # 获取当前的 chunk if args.use_vllm: budget = args.max_tokens_per_call i = 0 while 50*(2**i) < budget: i += 1 i -= 1 for k in range(i, -2, -1): stop_budget = budget - 50*(2**k) if k >= 0 else 50 # print(f"stop_budget: {stop_budget}") # chunk = [data + f"\n[{budget} token]\n" for data in chunk] if budget == args.max_tokens_per_call: chunk = chunk else: # "<|end▁of▁sentence|>" chunk = [data + f"\n{budget}\n" if "<|end▁of▁sentence|>" not in data else data for data in chunk] print(f"chunk0: {chunk[0]}") if stop_budget > 0: chunk_outputs = llm.generate( chunk, SamplingParams( temperature=args.temperature, top_p=args.top_p, max_tokens=stop_budget, n=1, stop=stop_words, stop_token_ids=( [151645, 151643] if "qwen2" in args.model_name_or_path.lower() else None ), skip_special_tokens=False, #G 设置特殊token的可见性 ), ) if os.path.exists('./start_positions.pt'): os.remove('./start_positions.pt') print('start_positions.pt removed') if os.path.exists('./early_positions.pt'): os.remove('./early_positions.pt') print('early_positions.pt removed') chunk_outputs = sorted(chunk_outputs, key=lambda x: int(x.request_id)) chunk_outputs = [output.outputs[0].text for output in chunk_outputs] ### 输出已经被整理过了,不需要再进行排序 chunk = [single_chunk + chunk_output for single_chunk, chunk_output in zip(chunk, chunk_outputs)] budget = 50*(2**k) if k >= 0 else 0 chunk, end_chunk, open_chunk = solve_final_answer(chunk) print(f"len of end_chunk: {len(end_chunk)}") print(f"len of open_chunk: {len(open_chunk)}") # outputs.extend(end_chunk) # chunk = open_chunk # print(f"now budget: {budget}") # print(f"k = {k}") chunk_outputs = chunk outputs.extend(chunk_outputs) # chunk_outputs = sorted(chunk, key=lambda x: int(x.request_id)) # initial_outputs = [output.outputs[0].text for output in chunk_outputs] # Add the think/final answer tags and create new prompts # modified_outputs = [] # for output in chunk_outputs: # modified_output = output.rstrip() + "\n\n**Final Answer**\n\\boxed" # modified_outputs.append(modified_output) # # Second generation with modified outputs # second_prompts = [p + mo for p, mo in zip(chunk, modified_outputs)] # second_outputs = llm.generate( # second_prompts, # SamplingParams( # temperature=args.temperature, # top_p=args.top_p, # max_tokens=20, # n=1, # stop=stop_words, # stop_token_ids=( # [151645, 151643] # if "qwen2" in args.model_name_or_path.lower() # else None # ), # ), # ) # if os.path.exists('./start_positions.pt'): # os.remove('./start_positions.pt') # print('start_positions.pt removed') # if os.path.exists('./early_positions.pt'): # os.remove('./early_positions.pt') # print('early_positions.pt removed') # second_outputs = sorted(second_outputs, key=lambda x: int(x.request_id)) # second_outputs = [output.outputs[0].text for output in second_outputs] # # Combine initial and second outputs # combined_outputs = [init + "\n\n**Final Answer**\n\\boxed" + second for init, second in zip(second_prompts, second_outputs)] # outputs.extend(combined_outputs) else: # Similar modification for non-vllm case chunk_outputs = generate_completions( model=llm, tokenizer=tokenizer, prompts=chunk, max_new_tokens=args.max_tokens_per_call, batch_size=16, stop_id_sequences=stop_words, ) # Add the think/final answer tags and create new prompts modified_outputs = [] for output in chunk_outputs: modified_output = output.rstrip() + "\n\n\n**Final Answer**\n\\boxed" modified_outputs.append(modified_output) # Second generation with modified outputs second_prompts = [p + mo for p, mo in zip(chunk, modified_outputs)] second_outputs = generate_completions( model=llm, tokenizer=tokenizer, prompts=second_prompts, max_new_tokens=args.max_tokens_per_call, batch_size=16, stop_id_sequences=stop_words, ) # Combine initial and second outputs combined_outputs = [init + second for init, second in zip(chunk_outputs, second_outputs)] outputs.extend(combined_outputs) elif os.environ["tip"] == "TCM": for i in range(0, num_prompts, chunk_size): chunk = prompts[i:i + chunk_size] # 获取当前的 chunk if args.use_vllm: budget = args.max_tokens_per_call i = budget // 50 + 1 for k in reversed(range(i)): stop_budget = budget - 50 * k if budget == args.max_tokens_per_call: chunk = chunk else: chunk = [data + f"\n{budget}\n" if "<|end▁of▁sentence|>" not in data else data for data in chunk] print(f"chunk0: {chunk[0]}") if stop_budget > 0: chunk_outputs = llm.generate( chunk, SamplingParams( temperature=args.temperature, top_p=args.top_p, max_tokens=stop_budget, n=1, stop=stop_words, stop_token_ids=( [151645, 151643] if "qwen2" in args.model_name_or_path.lower() else None ), skip_special_tokens=False, #G 设置特殊token的可见性 ), ) if os.path.exists('./start_positions.pt'): os.remove('./start_positions.pt') print('start_positions.pt removed') if os.path.exists('./early_positions.pt'): os.remove('./early_positions.pt') print('early_positions.pt removed') chunk_outputs = sorted(chunk_outputs, key=lambda x: int(x.request_id)) chunk_outputs = [output.outputs[0].text for output in chunk_outputs] ### 输出已经被整理过了,不需要再进行排序 chunk = [single_chunk + chunk_output for single_chunk, chunk_output in zip(chunk, chunk_outputs)] budget = 50 * k if k >= 0 else 0 chunk, end_chunk, open_chunk = solve_final_answer(chunk) print(f"len of end_chunk: {len(end_chunk)}") print(f"len of open_chunk: {len(open_chunk)}") print(F"len of chunk: {len(chunk)}s") # outputs.extend(end_chunk) # chunk = open_chunk chunk_outputs = chunk outputs.extend(chunk_outputs) else: raise(ValueError("Not implemented for non-vllm mode while tip == TCM")) elif os.environ["tip"] == "SST": for i in range(0, num_prompts, chunk_size): chunk = prompts[i:i + chunk_size] # 获取当前的 chunk if args.use_vllm: budget = args.max_tokens_per_call i = budget // 50 + 1 for k in reversed(range(i)): stop_budget = budget - 50 * k if budget == args.max_tokens_per_call: chunk = chunk else: chunk = [data + f"\n\n" if "<|end▁of▁sentence|>" not in data else data for data in chunk] print(f"chunk0: {chunk[0]}") if stop_budget > 0: chunk_outputs = llm.generate( chunk, SamplingParams( temperature=args.temperature, top_p=args.top_p, max_tokens=stop_budget, n=1, stop=stop_words, stop_token_ids=( [151645, 151643] if "qwen2" in args.model_name_or_path.lower() else None ), ), ) if os.path.exists('./start_positions.pt'): os.remove('./start_positions.pt') print('start_positions.pt removed') if os.path.exists('./early_positions.pt'): os.remove('./early_positions.pt') print('early_positions.pt removed') chunk_outputs = sorted(chunk_outputs, key=lambda x: int(x.request_id)) chunk_outputs = [output.outputs[0].text for output in chunk_outputs] ### 输出已经被整理过了,不需要再进行排序 chunk = [single_chunk + chunk_output for single_chunk, chunk_output in zip(chunk, chunk_outputs)] budget = 50 * k if k >= 0 else 0 chunk, end_chunk, open_chunk = solve_final_answer(chunk) print(f"len of end_chunk: {len(end_chunk)}") print(f"len of open_chunk: {len(open_chunk)}") print(F"len of chunk: {len(chunk)}s") # outputs.extend(end_chunk) # chunk = open_chunk chunk_outputs = chunk outputs.extend(chunk_outputs) else: raise(ValueError("Not implemented for non-vllm mode while tip == TTS")) # elif os.environ["tip"] == "TCMv2": else: available_gpus = os.environ["CUDA_VISIBLE_DEVICES"].split(",") # 示例GPU列表,根据实际情况修改 num_gpus = len(available_gpus) manager = Manager() output_queue = Queue() processes = [] multi_outputs = [] chunk_size = len(prompts) // num_gpus chunks = [prompts[i:i + chunk_size] for i in range(0, len(prompts), chunk_size)] num_rounds = (len(chunks) + num_gpus - 1) // num_gpus #g 轮次数等于最大GPU上的任务数 for round_idx in range(num_rounds): start_idx = round_idx * num_gpus end_idx = min((round_idx + 1) * num_gpus, len(chunks)) for i in range(start_idx, end_idx): chunk = chunks[i] p = Process(target=process_chunk, args=(chunk, args, available_gpus, stop_words, output_queue, i)) processes.append(p) p.start() # for i, chunk in enumerate(chunks): # print(f"Processing chunk {i} with size {len(chunk)}") # p = Process(target=process_chunk, args=(chunk, args, available_gpus, stop_words, output_queue, i)) # processes.append(p) # p.start() for _ in range(len(chunks)): result = output_queue.get() if isinstance(result, tuple) and len(result) == 2: multi_outputs.append(result) else: print(f"Error: Received non-tuple result: {result}") # multi_outputs.extend(output_queue.get()) for p in processes: p.join() multi_outputs.sort(key=lambda x: x[0]) outputs = [] for _, chunk_output in multi_outputs: outputs.extend(chunk_output) print('stage one finished!!!\n' * 20) # print("Special tokens in tokenizer:", tokenizer.special_tokens_map) # test_token = "\n50\n" # print(f"Encoding '{test_token}':", tokenizer.encode(test_token, add_special_tokens=False)) print(outputs[:3]) #################! ###! stage? 1 or 2 or add if os.environ['stage'] == "2": print("stage 2") two_stage_outputs = [] modified_outputs = [] print(f"len of outputs: {len(outputs)}") for output in outputs: # 去除output字符串末尾的换行符,并添加和**Final Answer**\n\\boxed字符串,将结果添加到modified_outputs列表中 if "<|end▁of▁sentence|>" in output: start_index = output.index("<|end▁of▁sentence|>") output = output[:start_index] # output = output.replace("<|end▁of▁sentence|>", "") modified_output = output + "\n\n\n**Final Answer**\\boxed" modified_outputs.append(modified_output) # print(f"modified_output_len: {len(modified_output)}") available_gpus = os.environ["CUDA_VISIBLE_DEVICES"].split(",") # 示例GPU列表,根据实际情况修改 num_gpus = len(available_gpus) manager = Manager() output_queue = Queue() processes = [] multi_outputs = [] prompts = modified_outputs chunk_size = len(prompts) // num_gpus chunks = [prompts[i:i + chunk_size] for i in range(0, len(prompts), chunk_size)] num_rounds = (len(chunks) + num_gpus - 1) // num_gpus #g 轮次数等于最大GPU上的任务数 for round_idx in range(num_rounds): start_idx = round_idx * num_gpus end_idx = min((round_idx + 1) * num_gpus, len(chunks)) for i in range(start_idx, end_idx): chunk = chunks[i] p = Process(target=process_chunk, args=(chunk, args, available_gpus, stop_words, output_queue, i)) processes.append(p) p.start() # for i, chunk in enumerate(chunks): # print(f"Processing chunk {i} with size {len(chunk)}") # p = Process(target=process_chunk, args=(chunk, args, available_gpus, stop_words, output_queue, i)) # processes.append(p) # p.start() for _ in range(len(chunks)): result = output_queue.get() if isinstance(result, tuple) and len(result) == 2: multi_outputs.append(result) else: print(f"Error: Received non-tuple result: {result}") # multi_outputs.extend(output_queue.get()) for p in processes: p.join() multi_outputs.sort(key=lambda x: x[0]) outputs = [] for _, chunk_output in multi_outputs: outputs.extend(chunk_output) elif os.environ['stage'] == "1": outputs = outputs with open("/mnt/lyc/wuxinrui/LLaMA-Factory/TCMv4/TCMv4_format_random2000_answer_prompt_generate.jsonl", "w") as f: for output in outputs: f.write(output + "\n") print('TCMv4_format_random2000_answer_prompt_generated!!!\n' * 20) # elif os.environ['stage'] == "add": # llm = LLM( # model=args.model_name_or_path, # tensor_parallel_size=1, # pipeline_parallel_size=1, # trust_remote_code=True, # gpu_memory_utilization=0.85, # enforce_eager=True, # max_seq_len_to_capture=5000000, # ) # two_stage_outputs = [] # modified_outputs = [] # print(f"len of outputs: {len(outputs)}") # for output in outputs: # # 去除output字符串末尾的换行符,并添加和**Final Answer**\n\\boxed字符串,将结果添加到modified_outputs列表中 # if "<|end▁of▁sentence|>" in output: # start_index = output.index("<|end▁of▁sentence|>") # output = output[:start_index] # # output = output.replace("<|end▁of▁sentence|>", "") # modified_output = output # modified_outputs.append(modified_output) # # print(f"modified_output_len: {len(modified_output)}") # for i in range(0, num_prompts, chunk_size): # modified_chunk = outputs[i:i + chunk_size] # 获取当前的 chunk # if args.use_vllm: # os.environ["position"] = 'start' # second_outputs = llm.generate( # modified_chunk, # SamplingParams( # temperature=args.temperature, # top_p=args.top_p, # max_tokens=50, # n=1, # stop=stop_words, # stop_token_ids=( # [151645, 151643] # ), # skip_special_tokens=False, #G 设置特殊token的可见性 # ), # ) # if os.path.exists('./start_positions.pt'): # os.remove('./start_positions.pt') # print('start_positions.pt removed') # if os.path.exists('./early_positions.pt'): # os.remove('./early_positions.pt') # print('early_positions.pt removed') # second_outputs = sorted(second_outputs, key=lambda x: int(x.request_id)) # second_outputs = [output.outputs[0].text for output in second_outputs] # # Combine initial and second outputs # combined_outputs = [init + second for init, second in zip(modified_chunk, second_outputs)] # print(f"len of combined_outputs:{len(combined_outputs)}") # two_stage_outputs.extend(combined_outputs) ## 直接覆盖掉就好 # outputs = two_stage_outputs #################! print(f"outputs:{len(outputs)}") print(f"current_prompts:{len(current_prompts)}") assert len(outputs) == len(current_prompts) #g 以防万一,再次sort回答 # print(f"outputs[:5]: {outputs[:5]}") # print(f"current_prompts[:5]: {current_prompts[:5]}") # sorted_outputs = [] # for (index, prompt) in current_prompts: # for i in range(len(outputs)): # if prompt in outputs[i]: # sorted_outputs.append(outputs[i]) # outputs = sorted_outputs # print(f"outputs[:5]: {outputs[123:128]}") # print(f"current_prompts[:5]: {current_prompts[123:128]}") # assert len(outputs) == len(current_prompts) # print(outputs) # process all outputs if __name__ == "__main__": multiprocessing.set_start_method('spawn') args = parse_args() set_seed(args.seed) setup(args)