常见的vllm推理和trl源码分析

常见的vllm推理和trl源码分析

API推理

一般来说,API请求都是IO密集型,可以利用到多进程和多线程,但是请求可能会失败,所以得把失败的重新跑一遍,这里用到两步

第一步先批量执行,然后第二步再专门处理失败的

  1. api-parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
import requests
import json
from typing import Dict, Any
import yaml
import copy
import os
from jinja2 import Template

import requests
import json
import time
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from multiprocessing import Pool, cpu_count
import os
import argparse # 新增:命令行参数解析

# ================= 配置区域 =================
API_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions"
HEADERS = {
"Content-Type": "application/json",
"Authorization": "Bearer sk-xxx"
}
MODEL_NAME = "qwen3.6-max-preview" # 默认模型,可被命令行覆盖

# 并发配置
NUM_PROCESSES = 5
THREADS_PER_PROCESS = 5
# BATCH_SIZE = 200
TIMEOUT = 120

# 多进程不同配置(可选),若为 None 则使用全局 API_URL 和 MODEL_NAME
PROCESS_CONFIGS = None
# ===========================================

def process_single_request(message_data, api_url, model_name):
payload = {
"model": model_name,
"messages": message_data,
"temperature": 0.8,
"max_tokens": 8000,
"stream": False,
}

max_attempts = 2
last_error = None

for attempt in range(1, max_attempts + 1):
try:
start_time = time.time()
response = requests.post(
url=api_url,
headers=HEADERS,
json=payload,
timeout=TIMEOUT
)
elapsed = time.time() - start_time

try:
res_json = response.json()
except json.JSONDecodeError:
return {
"status_code": response.status_code,
"result": None,
"error": f"Invalid JSON: {response.text[:200]}",
"time_cost": elapsed
}

return {
"status_code": response.status_code,
"result": res_json['choices'][0]['message']['content'],
"error": None,
"time_cost": elapsed
}

except requests.exceptions.Timeout as e:
last_error = str(e)
if attempt < max_attempts:
continue
except Exception as e:
return {
"status_code": None,
"result": None,
"error": str(e),
"time_cost": 0
}

return {
"status_code": None,
"result": None,
"error": last_error,
"time_cost": 0
}


def process_batch(batch_data, api_url, model_name):
pid = os.getpid()
total = len(batch_data)
num_threads = THREADS_PER_PROCESS
print(f"[进程 {pid}] 开始处理批次,数据量: {total}, API: {api_url}, 模型: {model_name}")

base_size = total // num_threads
remainder = total % num_threads
data_chunks = []
start = 0
for i in range(num_threads):
end = start + base_size + (1 if i < remainder else 0)
data_chunks.append(batch_data[start:end])
start = end

batch_results = [None] * total
completed_counter = [0]
lock = threading.Lock()

def process_thread_chunk(thread_id, chunk):
results = []
for msg in chunk:
res = process_single_request(msg, api_url, model_name)
results.append(res)
with lock:
completed_counter[0] += 1
print(f"[进程 {pid}] 进度: {completed_counter[0]}/{total} "
f"(成功: {'是' if res.get('error') is None else '否'})")
return results

with ThreadPoolExecutor(max_workers=num_threads) as executor:
future_to_id = {
executor.submit(process_thread_chunk, i, data_chunks[i]): i
for i in range(num_threads)
}

for future in as_completed(future_to_id):
thread_id = future_to_id[future]
start_idx = sum(len(data_chunks[i]) for i in range(thread_id))
try:
thread_results = future.result()
for j, res in enumerate(thread_results):
batch_results[start_idx + j] = res
except Exception as e:
chunk_len = len(data_chunks[thread_id])
for j in range(chunk_len):
batch_results[start_idx + j] = {
"error": f"Thread exception: {str(e)}"
}
with lock:
completed_counter[0] += chunk_len
print(f"[进程 {pid}] 线程 {thread_id} 异常,已标记 {chunk_len} 条数据失败")

print(f"[进程 {pid}] 批次处理完成,共处理 {total} 条数据")
return batch_results


def main():
global API_URL, MODEL_NAME, PROCESS_CONFIGS, OUTPUT_FILE # 新增 OUTPUT_FILE

# 验证数据源
if 'all_messages' not in globals():
print("错误: all_messages 未定义,请先加载数据")
return

data_to_process = all_messages
total_count = len(data_to_process)
print(f"总数据量: {total_count}, 启动 {NUM_PROCESSES} 个进程, 每进程 {THREADS_PER_PROCESS} 线程")

if PROCESS_CONFIGS is None:
process_configs = [(API_URL, MODEL_NAME)] * NUM_PROCESSES
print("使用全局 API 和模型配置")
else:
process_configs = PROCESS_CONFIGS
print("使用自定义进程配置")

# 数据分片
base_size = total_count // NUM_PROCESSES
remainder = total_count % NUM_PROCESSES
chunks = []
start = 0
for i in range(NUM_PROCESSES):
end = start + base_size + (1 if i < remainder else 0)
chunks.append(data_to_process[start:end])
start = end

if len(chunks) < NUM_PROCESSES:
print(f"警告: 数据分片数 ({len(chunks)}) 小于进程数 ({NUM_PROCESSES}),部分进程将空闲")
chunk_configs = [process_configs[i % len(process_configs)] for i in range(len(chunks))]

start_global_time = time.time()

with Pool(processes=NUM_PROCESSES) as pool:
starmap_args = [(chunk, url, model) for chunk, (url, model) in zip(chunks, chunk_configs)]
mapped_results = pool.starmap(process_batch, starmap_args)

final_results = []
for batch_res in mapped_results:
final_results.extend(batch_res)

end_global_time = time.time()
print(f"\n--- 全部完成 ---")
print(f"总耗时: {end_global_time - start_global_time:.2f} 秒")
print(f"成功处理: {sum(1 for r in final_results if r and r.get('error') is None)} / {total_count}")

for i, res in enumerate(final_results[:3]):
print(f"Index {i}: Status={res.get('status_code')}, Error={res.get('error')}")

# 使用命令行指定的输出文件路径
with open(OUTPUT_FILE, 'w', encoding='utf-8') as f:
safe_results = [r if r else {"error": "Empty Result"} for r in final_results]
json.dump(safe_results, f, ensure_ascii=False, indent=2)
print(f"结果已保存至 {OUTPUT_FILE}")


if __name__ == "__main__":
# ---- 命令行参数解析 ----
parser = argparse.ArgumentParser(description="批量调用大模型接口")
parser.add_argument('-m', '--model', type=str, default='qwen3.6-max-preview',
help='指定模型名称,覆盖默认值')
parser.add_argument('-o', '--output', type=str, default='qwen3.6-max-preview-pred-v2.json',
help='指定输出文件路径')
parser.add_argument('-Y', '--yaml', type=str, default='prompt.yaml',
help='YAML文件路径')
# parser.add_argument('-i', '--input', type=str, default='prompt.yaml',
# help='指定输入文件路径')
args = parser.parse_args()

# 用命令行参数覆盖全局配置
MODEL_NAME = args.model
OUTPUT_FILE = args.output # 新增全局输出变量
YAML_FILE = args.yaml
# INPUT_FILE = args.input

# 加载 prompt 和数据(原逻辑不变)
with open(YAML_FILE,'r',encoding='utf-8') as f:
myprompt = yaml.safe_load(f)

template = Template(myprompt['system_prompt'])

with open('26_0513_货主介入评测_new.json','r',encoding='utf-8') as f:
datas = json.load(f)

all_messages = []
for data in datas:
# print(data)
prompt = template.render(history=json.dumps(data["history"], ensure_ascii=False),
cargo_info=json.dumps(data["货源信息"] if "货源信息" in data else None, ensure_ascii=False),
query=json.dumps(data["当前query"], ensure_ascii=False))
message = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt}
]
all_messages.append(message)
print(f"开始并行执行...")
main()
  1. api2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
import os
import json
import copy
import time
import yaml
from typing import Dict, Any, List, Tuple
import requests
from jinja2 import Template
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import argparse

# ================== 配置 ==================
HEADERS = {
"Content-Type": "application/json",
"Authorization": "Bearer sk-480215c"
}
TIMEOUT = 120
API_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions"
# MODEL_NAME 已移除,改为命令行传入

# ================== 原始请求函数(不变) ==================
def process_single_request(message_data, api_url, model_name):
payload = {
"model": model_name,
"messages": message_data,
"temperature": 0.8,
"max_tokens": 8000,
"stream": False,
"chat_template_kwargs": {"enable_thinking": False}
}

max_attempts = 2
last_error = None

for attempt in range(1, max_attempts + 1):
try:
start_time = time.time()
response = requests.post(
url=api_url,
headers=HEADERS,
json=payload,
timeout=TIMEOUT
)
elapsed = time.time() - start_time

try:
res_json = response.json()
except json.JSONDecodeError:
return {
"status_code": response.status_code,
"result": None,
"error": f"Invalid JSON: {response.text[:200]}",
"time_cost": elapsed
}

# 安全获取 content
choices = res_json.get('choices', [])
if choices and len(choices) > 0:
content = choices[0]['message'].get('content', '')
else:
content = None

return {
"status_code": response.status_code,
"result": content,
"error": None,
"time_cost": elapsed
}

except requests.exceptions.Timeout as e:
last_error = str(e)
if attempt < max_attempts:
continue
except Exception as e:
return {
"status_code": None,
"result": None,
"error": str(e),
"time_cost": 0
}

return {
"status_code": None,
"result": None,
"error": last_error,
"time_cost": 0
}

# ================== 单个重试任务(用于并发) ==================
def retry_single_task(index: int, messages: List[Dict], model_name: str) -> Tuple[int, Dict]:
"""
对单个错误样本重新请求,返回 (index, result_dict)
增加 model_name 参数
"""
# print(f"[任务{index}] 开始重试...")
result = process_single_request(messages, API_URL, model_name)
# print(f"[任务{index}] 完成,耗时 {result.get('time_cost', 0):.2f}s")
return index, result

# ================== 并发重试主函数 ==================
def retry_errors_concurrently(
preds: List[Dict],
prepared_tasks: List[Tuple[int, List[Dict]]],
max_workers: int = 4,
use_process: bool = False,
model_name: str = "qwen3.6-max-preview" # 新增参数
) -> List[Dict]:
"""
并发重试所有有错误的条目,保持原始顺序。
"""

if not prepared_tasks:
print("没有需要重试的错误条目。")
return preds

print(f"发现 {len(prepared_tasks)} 个错误条目,使用 {'多进程' if use_process else '多线程'} 并发重试,并发数={max_workers}")

Executor = ProcessPoolExecutor if use_process else ThreadPoolExecutor
results_map = {}

with Executor(max_workers=max_workers) as executor:
future_to_index = {}
for idx, messages in prepared_tasks:
# 将 model_name 传递给任务函数
future = executor.submit(retry_single_task, idx, messages, model_name)
future_to_index[future] = idx

for future in as_completed(future_to_index):
idx = future_to_index[future]
try:
index, result = future.result()
results_map[index] = result
except Exception as e:
print(f"任务 {idx} 执行异常: {e}")
results_map[idx] = {
"status_code": None,
"result": None,
"error": f"并发执行异常: {e}",
"time_cost": 0
}

for idx, result in results_map.items():
preds[idx] = result

return preds

# ================== 混合并发模式支持 (多进程+多线程) ==================
def retry_worker_process_batch(
task_batch: List[Tuple[int, List[Dict]]],
threads_per_process: int,
model_name: str # 新增参数
) -> List[Tuple[int, Dict]]:
"""
在一个独立进程中运行。
接收预渲染好的任务,内部使用多线程并发处理。
"""
results = []

total = len(task_batch)

import os
pid = os.getpid()
def thread_task(idx, msgs):
try:
# 使用传入的 model_name
res = process_single_request(msgs, API_URL, model_name)
return idx, res
except Exception as e:
return idx, {
"status_code": None,
"result": None,
"error": f"Thread Exception: {str(e)}",
"time_cost": 0
}

with ThreadPoolExecutor(max_workers=threads_per_process) as executor:
futures = {}
for idx, messages in task_batch:
future = executor.submit(thread_task, idx, messages)
futures[future] = idx

completed = 0
for future in as_completed(futures):
try:
idx, res = future.result()
results.append((idx, res))
completed += 1
print(f"进程{pid} 完成 {completed}/{total}")
except Exception as e:
print(f"Worker Process Internal Error: {e}")

return results

def retry_errors_hybrid(
preds: List[Dict],
prepared_tasks: List[Tuple[int, List[Dict]]],
num_processes: int = 2,
threads_per_process: int = 4,
model_name: str = "qwen3.6-max-preview" # 新增参数
) -> List[Dict]:
"""
混合并发模式:多进程 + 每个进程内多线程
"""
if not prepared_tasks:
return preds

total_tasks = len(prepared_tasks)
print(f"启动混合并发模式: 进程数={num_processes}, 每进程线程数={threads_per_process}")
print(f"总理论并发度: {num_processes * threads_per_process}")

# 任务分片
batches = []
avg_load = total_tasks // num_processes
remainder = total_tasks % num_processes

start_idx = 0
for i in range(num_processes):
current_batch_size = avg_load + (1 if i < remainder else 0)
if current_batch_size == 0:
break
end_idx = start_idx + current_batch_size
batch = prepared_tasks[start_idx:end_idx]
batches.append(batch)
start_idx = end_idx

valid_batches = [b for b in batches if len(b) > 0]
if not valid_batches:
return preds

all_results_map = {}

with ProcessPoolExecutor(max_workers=len(valid_batches)) as executor:
future_to_batch_idx = {}
for i, batch in enumerate(valid_batches):
future = executor.submit(
retry_worker_process_batch,
batch,
threads_per_process,
model_name # 传入 model_name
)
future_to_batch_idx[future] = i

completed_count = 0
for future in as_completed(future_to_batch_idx):
try:
batch_results = future.result()
for idx, res in batch_results:
all_results_map[idx] = res
except Exception as e:
print(f"进程执行出现严重异常: {e}")
completed_count += 1
print(f"进度: {completed_count}/{len(valid_batches)} ")

for idx, result in all_results_map.items():
preds[idx] = result

print(f"混合并发重试完成,成功更新 {len(all_results_map)}/{total_tasks} 条结果。")
return preds


# ================== 主程序 ==================
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="并发重试错误预测")
parser.add_argument("--workers", type=int, default=4, help="并发数(取决于模式)")
parser.add_argument("--mode", choices=["thread", "process", "hybrid"], default="thread",
help="并发模式:thread, process, hybrid")
parser.add_argument("--hybrid_procs", type=int, default=2, help="混合模式下的进程数量")
parser.add_argument("--input_pred", type=str, default="qwen3.6-max-preview-pred.json",
help="原始预测结果文件路径")
parser.add_argument("--output_pred", type=str, default="qwen3.6-max-preview-pred_retry.json",
help="重试后保存的结果文件路径")
parser.add_argument("--train_data", type=str, default="train_datas.json",
help="训练数据文件路径")
parser.add_argument("--prompt_file", type=str, default="prompt.yaml",
help="prompt模板文件")
# ===== 新增参数:模型名称 =====
parser.add_argument("--model_name", type=str, default="qwen3.6-max-preview",
help="要调用的模型名称")
args = parser.parse_args()

# 1. 加载数据
print("正在加载数据...")
with open(args.train_data, "r", encoding="utf-8") as f:
original = json.load(f)
with open(args.prompt_file, "r", encoding="utf-8") as f:
myprompt = yaml.safe_load(f)
with open(args.input_pred, "r", encoding="utf-8") as f:
preds = json.load(f)

# 2. 创建模板
template = Template(myprompt['system_prompt'])

# 3. 预渲染所有需要重试的 Messages
prepared_tasks = []
error_count = 0
for idx, pred_item in enumerate(preds):
if pred_item.get("error") or pred_item.get("result") == None:
cur_data = original[idx]
try:
prompt_str = template.render(
history=json.dumps(cur_data["history"], ensure_ascii=False),
cargo_info=json.dumps(cur_data["货源信息"] if "货源信息" in cur_data else None, ensure_ascii=False),
query=json.dumps(cur_data["当前query"], ensure_ascii=False)
)

messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt_str}
]
prepared_tasks.append((idx, messages))
error_count += 1
except Exception as e:
print(f"警告: 索引 {idx} 的模板渲染失败: {e}")
preds[idx]["error"] = f"Render Error: {str(e)}"

print(f"共识别出 {error_count} 个错误条目,并已完成消息预渲染。")

# 4. 执行并发重试,传入 model_name
if args.mode == "hybrid":
updated_preds = retry_errors_hybrid(
preds=preds,
prepared_tasks=prepared_tasks,
num_processes=args.hybrid_procs,
threads_per_process=args.workers,
model_name=args.model_name # 传入
)
else:
updated_preds = retry_errors_concurrently(
preds=preds,
prepared_tasks=prepared_tasks,
max_workers=args.workers,
use_process=(args.mode == "process"),
model_name=args.model_name # 传入
)

# 5. 保存结果
print(f"正在保存结果至 {args.output_pred} ...")
with open(args.output_pred, "w", encoding="utf-8") as f:
json.dump(updated_preds, f, ensure_ascii=False, indent=4)
print("全部完成。")

命令

1
2
3
4
5
6
7
8
9
python api2.py --model_name qwen3.6-max-preview --mode hybrid --input_pred v1.json --output_pred v1_new.json --prompt_file prompt-v3.yaml --train_data 26_0513_货主介入评测_new.json

python api2.py --model_name qwen3.6-max-preview --mode hybrid --input_pred v2.json --output_pred v2_new.json --prompt_file 2-prompt.yaml --train_data 26_0513_货主介入评测_new.json

python api2.py --model_name qwen3.6-max-preview --mode hybrid --input_pred v2.json --output_pred v3_new.json --prompt_file 3-prompt.yaml --train_data 26_0513_货主介入评测_new.json

python api-parser.py -o v1.json -Y prompt-v3.yaml
python api-parser.py -o v2.json -Y 2-prompt.yaml
python api-parser.py -o v3.json -Y 3-prompt.yaml

VLLM推理

和api推理的区别,vllm实例化模型到本地,直接用vllm接口;而api是把模型部署成一个服务,直接用request的方式调用

my_prompt.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import json
import re
import random
import multiprocessing
import vllm
from vllm import LLM, SamplingParams
import os
from tqdm import tqdm
from distutils.version import LooseVersion


def compare_versions(version1, version2):
v1 = LooseVersion(version1)
v2 = LooseVersion(version2)
if v1 > v2:
return 1
elif v1 < v2:
return -1
else:
return 0

def messages_to_prompt(messages, system_prompt='You are a helpful assistant.', fill_system_prompt=True):
"""
将 messages 转换为 Qwen2 模型的输入 prompt。

:param messages: 包含对话消息的列表,每个消息是一个字典,包含 'role' 和 'content' 字段。
例如: [{'role': 'system', 'content': 'You are a helpful assistant.'},
{'role': 'user', 'content': 'What is the capital of France?'}]
:return: 转换后的 prompt 字符串,适用于 Qwen2 模型。
"""
prompt = ""
if fill_system_prompt and messages[0]['role'] != 'system':
messages.insert(0, {'role': 'system', 'content': system_prompt})
for message in messages:
role = message['role']
content = message['content']
prompt += f"<|im_start|>{role}\n{content}<|im_end|>\n"

# 添加 assistant 的开始标记,表示模型需要生成回复
prompt += "<|im_start|>assistant\n"

return prompt

def get_free_gpus():
try:
pynvml.nvmlInit()
device_count = pynvml.nvmlDeviceGetCount()
gpu_list = []

for i in range(device_count):
handle = pynvml.nvmlDeviceGetHandleByIndex(i)
mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
total_memory = mem_info.total / 1024**3 # 转换为GB
free_memory = mem_info.free / 1024**3
gpu_list.append((i, free_memory, total_memory))

# 按剩余显存降序排序
sorted_gpus = sorted(gpu_list, key=lambda x: x[1], reverse=True)
return sorted_gpus

except pynvml.NVMLError as e:
print(f"Error accessing NVIDIA GPU: {e}")
return []
finally:
pynvml.nvmlShutdown()

def call_vllm(all_messages, model_path,result_queue, sample_config, LLM_config, progress=None, enforce_eager=None):
# os.environ['VLLM_USE_V1'] = '1' if len(all_messages)>1000 else '0'
print(f'{enforce_eager=}')

if compare_versions(vllm.__version__, '0.8.0')<0:
del LLM_config['disable_cascade_attn']
print(f"{LLM_config=}")
print(f"{sample_config=}")
sampling_params = SamplingParams(**sample_config)
llm = LLM(**LLM_config)
if progress is not None:
progress(0.5, f"加载模型完成,批量预测")
print("初始化模型完成:", model_path)
all_prompt = [messages_to_prompt(messages) for messages in all_messages]
print(all_prompt[0])
# 排序,充分使用kv cache
all_index, all_prompt = zip(*sorted(zip(range(len(all_prompt)), all_prompt), key=lambda x: x[1], reverse=True))
# 生成
outputs_list = llm.generate(all_prompt, sampling_params, use_tqdm=True)
# 恢复原始顺序
all_index, outputs_list = zip(*sorted(zip(all_index, outputs_list), key=lambda x: x[0]))

if int(sample_config.get('n', 1))==1:
pred = [output.outputs[0].text for output in outputs_list]
else:
pred = [[output.outputs[i].text for i in range(int(sample_config.get('n', 1)))] for output in outputs_list]
result_queue.put(pred)
if progress is not None:
progress(0.75, f"预测完成,清理显存")

def local_call_vllm(all_messages, model_path, temperature=0, gpu_memory=0.3, max_tokens=512, seed=42, progress=None, top_p=1, min_p=-1, top_k=-1, stop=["<|endoftext|>", "<|im_end|>"], gpu_num=1, enforce_eager=None, use_beam_search=0, n=1, **kwargs):

try:
# multiprocessing.set_start_method('fork')
multiprocessing.set_start_method('spawn', force=True)
# 创建通信队列
except:
pass
if enforce_eager is None:
enforce_eager = False if len(all_messages)>300000 else True
result_queue = multiprocessing.Manager().Queue()
sample_config = {
"temperature": temperature,
"top_p": top_p,
"seed": seed,
"stop": stop,
"max_tokens": max_tokens,
"min_p": min_p if min_p!=-1 else 0,
"top_k": top_k,
"n": n,
"allowed_token_ids": kwargs.get("allowed_token_ids", None)
}
if use_beam_search not in [0, '0']:
sample_config['use_beam_search'] = True
sample_config['best_of'] = use_beam_search
print('使用beam_search,best_of=', use_beam_search)
for i in ['top_k', 'min_p', 'top_p']:
if i in sample_config:
del sample_config[i]
LLM_config = {
"model": model_path,
"enforce_eager": enforce_eager,
"trust_remote_code": True,
"tokenizer_mode": "auto",
"dtype": "auto",
"gpu_memory_utilization": gpu_memory,
"disable_log_stats": True,
"max_model_len": 20000,
"enable_prefix_caching": True,
"tensor_parallel_size": gpu_num,
"disable_cascade_attn": True,
}

gpu_list = get_free_gpus()
selected_gpu = []
# 选择第一个可用的GPU
for gpu in gpu_list:
if gpu[1] <= 80*gpu_memory:
continue
selected_gpu.append(str(gpu[0]))
if len(selected_gpu)==gpu_num:
print("使用GPU", ','.join(selected_gpu))
os.environ["CUDA_VISIBLE_DEVICES"] = ','.join(selected_gpu)
break
else:
print("没有可用的GPU", gpu_list)
return None


# 创建并启动子进程
process = multiprocessing.Process(
target=call_vllm,
args=(all_messages, model_path, result_queue, sample_config, LLM_config, progress, enforce_eager)
)
process.start()
# 等待子进程完成
process.join()

if not result_queue.empty():
data = result_queue.get()
if progress is not None:
progress(1, f"done")
return data

参考代码 ipynb

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import multiprocessing

import os
import sys
import yaml
import json
import copy
cur_path = r"/root/ruixinspace/zyt/LLaMA-Factory-0.9.3/debug"
print(cur_path)
sys.path.append(cur_path)

from my_prompt import get_messages,local_call_vllm,get_free_gpus


import ctypes

# 1. 获取 Conda 环境路径
conda_prefix = os.environ.get('CONDA_PREFIX')

if conda_prefix:
# 2. 构建新版 libstdc++ 的路径
new_lib_path = os.path.join(conda_prefix, 'lib', 'libstdc++.so.6')

# 3. 【核心步骤】强制加载这个新库
try:
ctypes.CDLL(new_lib_path)
print(f"✅ Successfully pre-loaded libstdc++ from: {new_lib_path}")
except OSError as e:
print(f"❌ Failed to load libstdc++: {e}")
print("Please run: conda install -c conda-forge libstdcxx-ng")

# 4. 同时也更新环境变量(作为双重保险)
current_ld_path = os.environ.get('LD_LIBRARY_PATH', '')
if conda_prefix + '/lib' not in current_ld_path:
os.environ['LD_LIBRARY_PATH'] = f"{conda_prefix}/lib:{current_ld_path}"
else:
print("Error: CONDA_PREFIX not found.")

# --- 现在再导入 vllm ---
import vllm
from vllm import LLM, SamplingParams
print("vLLM imported successfully!")



new_datas = []
with open("/root/ruixinspace/zyt/LLaMA-Factory-0.9.3/debug/train_datas.json",'r',encoding='utf-8') as f:
datas = json.load(f)

myprompt = f"""
"""

print(myprompt)

all_messages = []
for data in datas:
cur_prompt = copy.deepcopy(myprompt)
# print(data)
prompt = cur_prompt.format(his = "\n".join(data['历史对话']),input_data = data['当前query'])
# prompt+='/no_think'
message = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt}
]
all_messages.append(message)

# qwen3.5-27B模型
MODEL_PATH = r"Qwen3-235B-A22B-Thinking-2507-FP8"



output = local_call_vllm(all_messages, MODEL_PATH, temperature=0, gpu_memory=0.9, max_tokens=5000, seed=42, top_p=1, stop=["<|endoftext|>", "<|im_end|>"], enforce_eager=None, gpu_num=2, progress=None)

拿到output再后处理即可,根据带不带think,带不带json等情况,进行相应的提取即可。

后处理代码ipynb

仅供参考,以下只考虑,think和json两种

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
preds = []
for index in range(len(ori_preds)):
pred = ori_preds[index]
if "</think>" in pred['result']:
cur_label = pred['result'].split("</think>")[1].strip()
cur_label = json.loads(cur_label)
preds.append(cur_label['label'])
else:
try:
cur_label = pred['result'].split("```json")[-1].strip()[:-3]
# print(index,cur_label)
cur_label = json.loads(cur_label)
preds.append(cur_label['label'])
except:
try:
cur_label = pred['result'].split("\n\n")[-1].strip()
# print(index,cur_label)
cur_label = json.loads(cur_label)
preds.append(cur_label['label'])
except:
preds.append([])

指标计算

metric.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
from collections import defaultdict, Counter
import json

def calculate_tool_metrics(labels, predictions, neg = True):
"""
计算每个工具的 PRF1(仅工具名称),支持工具重复出现并计多次命中。
引入 "__NEGATIVE__" 虚拟工具处理空样本。
neg:是否考虑负样本

统计原则:
- 标签频次 = 标签列表中该工具出现的总次数
- 预测频次 = 预测列表中该工具出现的总次数
- 命中频次 = 该工具在标签和预测中出现次数的最小值(即 min(cnt_label, cnt_pred))

Args:
labels, predictions: 格式同前

Returns:
dict: 各工具及微平均/宏平均指标
"""
tool_counts = defaultdict(lambda: {'label_count': 0, 'hit_count': 0, 'pred_count': 0})
NEGATIVE_TOOL = "__NEGATIVE__"

def extract_tool_list(data):
"""提取工具名称列表(保留重复),返回 (list, is_empty)"""
if not isinstance(data, list):
raise ValueError("数据格式错误,每个label/prediction都应该是list")
if not data :
return [], True
tools = []
for item in data:
if not (isinstance(item, dict) and 'toolName' in item):
raise ValueError("数据格式错误,每个label/prediction中的元素都应该是dict,且包含toolName字段")
if isinstance(item, dict) and 'toolName' in item:
tools.append(item['toolName'])
if len(tools)!= len(data):
raise ValueError("tools长度应该和data长度相同")
return tools, len(tools) == 0

if len(labels) != len(predictions):
raise ValueError("数据长度不一致,请检查输入数据。")
if len(labels) == 0 and len(predictions) == 0:
raise ValueError("数据为空,请检查输入数据")

for label_item, pred_item in zip(labels, predictions):
label_tools, is_label_empty = extract_tool_list(label_item)
pred_tools, is_pred_empty = extract_tool_list(pred_item)

# --- 负样本统计 (__NEGATIVE__) ---
if neg:
if is_label_empty:
tool_counts[NEGATIVE_TOOL]['label_count'] += 1
if is_pred_empty:
tool_counts[NEGATIVE_TOOL]['pred_count'] += 1
if is_label_empty and is_pred_empty:
tool_counts[NEGATIVE_TOOL]['hit_count'] += 1

# --- 具体工具统计(支持重复) ---
label_counter = Counter(label_tools)
pred_counter = Counter(pred_tools)

# 累加每个工具的出现次数(标签侧和预测侧)
for tool, cnt in label_counter.items():
tool_counts[tool]['label_count'] += cnt
for tool, cnt in pred_counter.items():
tool_counts[tool]['pred_count'] += cnt

# 命中次数:取每个工具在两个列表中出现次数的最小值
all_tools = set(label_counter) | set(pred_counter)
for tool in all_tools:
hit = min(label_counter.get(tool, 0), pred_counter.get(tool, 0))
if hit > 0:
tool_counts[tool]['hit_count'] += hit

# 计算每个工具的 PRF1
tool_metrics = {}
total_tp = total_fp = total_fn = 0

for tool, counts in tool_counts.items():
label_count = counts['label_count']
hit_count = counts['hit_count']
pred_count = counts['pred_count']

tp = hit_count
fn = label_count - hit_count
fp = pred_count - hit_count

precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

tool_metrics[tool] = {
'precision': precision,
'recall': recall,
'f1': f1,
'tp': tp, 'fp': fp, 'fn': fn,
'label_count': label_count, 'pred_count': pred_count, 'hit_count': hit_count
}

total_tp += tp
total_fp += fp
total_fn += fn

# 微平均
micro_precision = total_tp / (total_tp + total_fp) if (total_tp + total_fp) > 0 else 0
micro_recall = total_tp / (total_tp + total_fn) if (total_tp + total_fn) > 0 else 0
micro_f1 = 2 * micro_precision * micro_recall / (micro_precision + micro_recall) if (micro_precision + micro_recall) > 0 else 0

# 宏平均(如果需要排除 __NEGATIVE__ 可自行过滤)
if tool_metrics:
macro_precision = sum(m['precision'] for m in tool_metrics.values()) / len(tool_metrics)
macro_recall = sum(m['recall'] for m in tool_metrics.values()) / len(tool_metrics)
macro_f1 = sum(m['f1'] for m in tool_metrics.values()) / len(tool_metrics)
else:
macro_precision = macro_recall = macro_f1 = 0

tool_metrics['micro_avg'] = {
'precision': micro_precision,
'recall': micro_recall,
'f1': micro_f1
}
tool_metrics['macro_avg'] = {
'precision': macro_precision,
'recall': macro_recall,
'f1': macro_f1
}

return tool_metrics


def calculate_tool_with_params_metrics(labels, predictions):
"""
计算带参数的工具 PRF1(工具名+标准化参数作为唯一键),支持重复键并计多次命中。
不考虑负样本
"""
tool_counts = defaultdict(lambda: {'label_count': 0, 'hit_count': 0, 'pred_count': 0})
NEGATIVE_TOOL = "__NEGATIVE__"
print("calculate_tool_with_params_metrics start")

def format_tool_key(tool_item):
"""将工具项格式化为 toolName-normalized_params 字符串"""
param_str = []
toolName = tool_item.get('toolName')
params = tool_item.get('paramList', {})
if params!={}:
for k,v in params.items():
if type(v) == str:
tool_param = toolName + '-' + k + ":" + v
elif type(v) == list:
vv = sorted(v,key=lambda x: (len(x),x))
tool_param = toolName + '-' + k + ":" + json.dumps(vv)
elif type(v) == dict:
vv = {k: v[k] for k in sorted(v)}
tool_param = toolName +'-' + k+":"+json.dumps(vv)
elif type(v) == bool:
tool_param = toolName +'-' + k+ ":" + str(v)
elif type(v) == int:
tool_param = toolName +'-' + k+ ":" + str(v)
elif type(v) == float:
tool_param = toolName +'-' + k+ ":" + str(v)
else:
tool_param = toolName +'-' + k+ ":" + str(v)
print(f"未知类型{type(v)}")

param_str.append(tool_param)
if toolName == "truckCheck":
if "truckType" in params and "value" in params:
if params['truckType'] == "车长":
param_str.append(toolName + '-' + "truckType-" + params['truckType'] + ':' + params['value'])
elif params['truckType'] == "车型":
param_str.append(toolName + '-' + "truckType-" + params['truckType'] + ':' + params['value'])
elif toolName == "priceCheck":
if "priceType" in params and "value" in params:
if params['priceType'] == "到车价":
param_str.append(toolName + '-' + "priceType-" + params['priceType'] + ':' + params['value'])
elif params['priceType'] == "总价":
param_str.append(toolName + '-' + "priceType-" + params['priceType'] + ':' + params['value'])
elif params == {}:
# 无参数的不考虑计算
pass

return param_str


# 只计算带参数的工具,其余工具不参与计算
def extract_formatted_tool_list(data):
"""提取格式化后的工具列表(保留重复),返回 (list, is_empty)"""
if not data or not isinstance(data, list):
return [], True
tools = []
for item in data:
if isinstance(item, dict) and 'toolName' in item:
param_str = format_tool_key(item)
if param_str:
tools.extend(param_str)
return tools, len(tools) == 0

if len(labels) != len(predictions):
raise ValueError("数据长度不一致,请检查输入数据。")
if len(labels) == 0 and len(predictions) == 0:
raise ValueError("数据为空,请检查输入数据")

for label_item, pred_item in zip(labels, predictions):
label_tools, is_label_empty = extract_formatted_tool_list(label_item)
pred_tools, is_pred_empty = extract_formatted_tool_list(pred_item)

# 负样本
# if is_label_empty:
# tool_counts[NEGATIVE_TOOL]['label_count'] += 1
# if is_pred_empty:
# tool_counts[NEGATIVE_TOOL]['pred_count'] += 1
# if is_label_empty and is_pred_empty:
# tool_counts[NEGATIVE_TOOL]['hit_count'] += 1

# 具体工具(支持重复)
label_counter = {}
for param_str in label_tools:
tool_param = param_str.split(':')[0]
value = param_str.split(':')[1]
if tool_param not in label_counter:
label_counter[tool_param] = []
label_counter[tool_param].append(value)
else:
label_counter[tool_param].append(value)

pred_counter = {}
for param_str in pred_tools:
tool_param = param_str.split(':')[0]
value = param_str.split(':')[1]
if tool_param not in pred_counter:
pred_counter[tool_param] = []
pred_counter[tool_param].append(value)
else:
pred_counter[tool_param].append(value)



for tool, cnt in label_counter.items():
tool_counts[tool]['label_count'] += len(cnt)
for tool, cnt in pred_counter.items():
tool_counts[tool]['pred_count'] += len(cnt)

if len(label_tools) > len(pred_tools):
l1 = label_tools
l2 = pred_tools
else:
l1 = pred_tools
l2 = label_tools
# 遍历短的,如果也在长的里面,那就hit加1
for param_str in l2:
if param_str in l1:
tool_param = param_str.split(':')[0]
tool_counts[tool_param]['hit_count'] += 1




# 计算 PRF1 (与上方函数相同)
tool_metrics = {}
total_tp = total_fp = total_fn = 0

for tool, counts in tool_counts.items():
label_count = counts['label_count']
hit_count = counts['hit_count']
pred_count = counts['pred_count']

tp = hit_count
fn = label_count - hit_count
fp = pred_count - hit_count

precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

tool_metrics[tool] = {
'precision': precision,
'recall': recall,
'f1': f1,
'tp': tp, 'fp': fp, 'fn': fn,
'label_count': label_count, 'pred_count': pred_count, 'hit_count': hit_count
}

total_tp += tp
total_fp += fp
total_fn += fn

micro_precision = total_tp / (total_tp + total_fp) if (total_tp + total_fp) > 0 else 0
micro_recall = total_tp / (total_tp + total_fn) if (total_tp + total_fn) > 0 else 0
micro_f1 = 2 * micro_precision * micro_recall / (micro_precision + micro_recall) if (micro_precision + micro_recall) > 0 else 0

if tool_metrics:
macro_precision = sum(m['precision'] for m in tool_metrics.values()) / len(tool_metrics)
macro_recall = sum(m['recall'] for m in tool_metrics.values()) / len(tool_metrics)
macro_f1 = sum(m['f1'] for m in tool_metrics.values()) / len(tool_metrics)
else:
macro_precision = macro_recall = macro_f1 = 0

tool_metrics['micro_avg'] = {
'precision': micro_precision,
'recall': micro_recall,
'f1': micro_f1
}
tool_metrics['macro_avg'] = {
'precision': macro_precision,
'recall': macro_recall,
'f1': macro_f1
}
return tool_metrics


# 样本维度,计算单工具和多工具的完全匹配样本
def calculate_strict_tools(labels, predictions):
important_tools = ['priceCheck','truckCheck',"loadTimeCheck","orderManagementCheck"]
important_metrics = defaultdict(lambda: {'label_count': 0, 'hit_count': 0, 'pred_count': 0})
single_cnt = 0
single_sum = 0
multi_cnt = 0
multi_sum = 0

for label_item, pred_item in zip(labels, predictions):
if len(label_item) == 1:
single_sum += 1
if len(pred_item) == 1:
label = label_item[0]
pred = pred_item[0]
if label == pred:
single_cnt+=1
if len(label_item) > 1:
multi_sum += 1
if len(pred_item) > 1:
if len(label_item) == len(pred_item):
sorted_label_item = sorted(label_item, key=lambda x: x['toolName'])
sorted_pred_item = sorted(pred_item, key=lambda x: x['toolName'])
if sorted_label_item == sorted_pred_item:
multi_cnt += 1
return single_cnt,single_sum,multi_cnt,multi_sum




if __name__ == "__main__":
# 测试重复工具多次命中
labels = [
{"toolName": "priceCheck", "paramList": {"amount": 100}},
{"toolName": "priceCheck", "paramList": {"amount": 200}}, # 同一工具名两次
{"toolName": "settlementMethod"}
]

predictions = [
{"toolName": "priceCheck", "paramList": {"amount": 100}}, # 命中一次
{"toolName": "priceCheck", "paramList": {"amount": 999}}, # 参数不同,不命中(仅名称匹配)
{"toolName": "settlementMethod"}
]


print("=== 工具名称指标(支持重复) ===")
res1 = calculate_tool_metrics(labels, predictions)
for tool, m in res1['tools'].items():
print(f"{tool}: tp={m['tp']}, fp={m['fp']}, fn={m['fn']}, P={m['precision']:.3f}, R={m['recall']:.3f}, F1={m['f1']:.3f}")
print("Micro:", res1['micro_avg'])
print("Macro:", res1['macro_avg'])

print("\n=== 带参数指标(支持重复) ===")
res2 = calculate_tool_with_params_metrics(labels, predictions)
for tool, m in res2['tools'].items():
if tool != "__NEGATIVE__":
print(f"{tool}: tp={m['tp']}, fp={m['fp']}, fn={m['fn']}, P={m['precision']:.3f}, R={m['recall']:.3f}, F1={m['f1']:.3f}")
print("Micro:", res2['micro_avg'])
print("Macro:", res2['macro_avg'])

参考脚本 ipynb

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import os
import json


from metric import calculate_tool_with_params_metrics, calculate_tool_metrics, calculate_strict_tools



with open("train_datas.json", "r", encoding="utf-8") as f:
original = json.load(f)
labels = [label['label'] for label in original]

with open("qwen3.6-max-preview-pred-v3-retry.json", "r", encoding="utf-8") as f:
ori_preds = json.load(f)




preds = []
for index in range(len(ori_preds)):
pred = ori_preds[index]
if "</think>" in pred['result']:
cur_label = pred['result'].split("</think>")[1].strip()
cur_label = json.loads(cur_label)
preds.append(cur_label['label'])
else:
try:
cur_label = pred['result'].split("```json")[-1].strip()[:-3]
# print(index,cur_label)
cur_label = json.loads(cur_label)
preds.append(cur_label['label'])
except:
try:
cur_label = pred['result'].split("\n\n")[-1].strip()
# print(index,cur_label)
cur_label = json.loads(cur_label)
preds.append(cur_label['label'])
except:
preds.append([])




tools_metrics = calculate_tool_metrics(labels, preds,neg = False)
tools_with_params_metrics = calculate_tool_with_params_metrics(labels, preds)

single_cnt,single_sum,multi_cnt,multi_sum = calculate_strict_tools(labels, preds)





# 最后写入表格
# 转换为 DataFrame
import pandas as pd
def dict_to_df(metrics_dict):
rows = []
for tool, m in metrics_dict.items():
# print(m)
rows.append({
'工具名': tool,
'P': m['precision'],
'R': m['recall'],
'F1': m['f1'],
'label_count': m['label_count'] if 'label_count' in m else None,
'pred_count': m['pred_count'] if 'pred_count' in m else None,
'hit_count': m['hit_count'] if 'hit_count' in m else None,
})
return pd.DataFrame(rows)

def data_label_pred(labels,preds,original):
assert len(labels) == len(preds) and len(original) == len(labels), print('长度不一致')
rows = []
for index in range(len(original)):
rows.append({
"url" : original[index]['url'],
"历史对话": "\n".join(original[index]['历史对话']),
"query": original[index]['当前query'],
"label": original[index]['label'],
"pred": preds[index],
})
return pd.DataFrame(rows)



df1 = dict_to_df(tools_metrics)
df2 = dict_to_df(tools_with_params_metrics)
df3 = data_label_pred(labels, preds,original)

# 写入同一个 Excel 文件的不同 Sheet
output_file = 'qwen3.6-max-preview-v3.xlsx'
with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
df1.to_excel(writer, sheet_name='工具指标', index=False, float_format='%.4f')
df2.to_excel(writer, sheet_name='参数指标', index=False, float_format='%.4f')
df3.to_excel(writer, sheet_name='badcase', index=False)

print(f"已将表格写入 {output_file}")

数据处理

一般来说,对于本地数据,主要是file或者scirpt的方式

  • file就是本地直接按照llamafactory的格式准备好了json

  • script的方式是本地有json格式的文件,然后写一个继承自datasets.GeneratorBasedBuilder 的类,重写三个方法即可,然后映射到llamafactory的指定格式

Trainer的典型流程

1
2
3
4
5
6
7
8
9
train
_inner_training_loop
training_step
compute_loss
_maybe_log_save_evaluate 这里是在每gradient_accumulation_steps整数倍进行

evaluate
evaluation_loop 新版本prediction也是走这
prediction_step 在这里拿到losses, logits, labels

以gradient_accumulation_steps=6为例,batch_size为64,会重复找dataloader拿6轮数据,每轮64条,每一轮都会计算loss,但是不回传梯度,只累计,在每个gradient_accumulation_steps的末尾去同步更新

1
2
3
4
5
for _ in range(total_updates):
update_step += 1
num_batches = args.gradient_accumulation_steps if update_step != (total_updates - 1) else remainder
batch_samples, num_items_in_batch = self.get_batch_samples(epoch_iterator, num_batches, args.device)
for i, inputs in enumerate(batch_samples):

_maybe_log_save_evaluate _evaluate_determine_best_metric两个重点函数,后面是用来评估具体metric中哪个指标,是greater好还是less好,返回是一个bool值,代表有没有找到一个更好的模型,对应True

1
2
3
4
5
6
if self.control.should_evaluate:
metrics = self._evaluate(trial, ignore_keys_for_eval)
is_new_best_metric = self._determine_best_metric(metrics=metrics, trial=trial)

if self.args.save_strategy == SaveStrategy.BEST:
self.control.should_save = is_new_best_metric

_evaluate内部会走到evaluate,然后走到evaluation_loop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
override = eval_dataset is not None
eval_dataset = eval_dataset if override else self.eval_dataset
if isinstance(eval_dataset, dict):
metrics = {}
for eval_dataset_name, _eval_dataset in eval_dataset.items():
dataset_metrics = self.evaluate(
eval_dataset=_eval_dataset if override else eval_dataset_name,
ignore_keys=ignore_keys,
metric_key_prefix=f"{metric_key_prefix}_{eval_dataset_name}",
)
metrics.update(dataset_metrics)
return metrics

# memory metrics - must set up as early as possible
self._memory_tracker.start()

eval_dataloader = self.get_eval_dataloader(eval_dataset)
if self.is_fsdp_xla_v2_enabled:
eval_dataloader = tpu_spmd_dataloader(eval_dataloader)

start_time = time.time()

eval_loop = self.prediction_loop if self.args.use_legacy_prediction_loop else self.evaluation_loop
output = eval_loop(
eval_dataloader,
description="Evaluation",
# No point gathering the predictions if there are no metrics, otherwise we defer to
# self.args.prediction_loss_only
prediction_loss_only=True if self.compute_metrics is None else None,
ignore_keys=ignore_keys,
metric_key_prefix=metric_key_prefix,
)

evaluation_loop内部流程比较复杂,涉及到多卡结果的gather等,但最终返回的就是一个包含metric的EvalLoopOutput

for训练依次从eval dataloader里面取数据,然后走prediction_step计算

1
2
3
4
5
6
7
8
9
10
11
12
# Main evaluation loop
for step, inputs in enumerate(dataloader):
# Update the observed num examples
observed_batch_size = find_batch_size(inputs)
if observed_batch_size is not None:
observed_num_examples += observed_batch_size
# For batch samplers, batch_size is not known by the dataloader in advance.
if batch_size is None:
batch_size = observed_batch_size

# Prediction step
losses, logits, labels = self.prediction_step(model, inputs, prediction_loss_only, ignore_keys=ignore_keys)

我们先看内部的prediction_step,以llamafactory的CustomSeq2SeqTrainer为例子,根据继承关系,会依次调用prediction_step

首先走到CustomSeq2SeqTrainer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@override
def prediction_step(
self,
model: "torch.nn.Module",
inputs: dict[str, Union["torch.Tensor", Any]],
prediction_loss_only: bool,
ignore_keys: Optional[list[str]] = None,
**gen_kwargs,
) -> tuple[Optional[float], Optional["torch.Tensor"], Optional["torch.Tensor"]]:
r"""Remove the prompt part in the generated tokens.

Subclass and override to inject custom behavior.
"""
if self.args.predict_with_generate: # do not pass labels to model when generate
labels = inputs.pop("labels", None)
else:
labels = inputs.get("labels")

loss, generated_tokens, _ = super().prediction_step(
model, inputs, prediction_loss_only=prediction_loss_only, ignore_keys=ignore_keys, **gen_kwargs
)
if generated_tokens is not None and self.args.predict_with_generate:
generated_tokens[:, : inputs["input_ids"].size(-1)] = self.processing_class.pad_token_id
generated_tokens = generated_tokens.contiguous()

return loss, generated_tokens, labels

这里调试可以看到,每个inputs包含8条数据,对应默认的eval batch大小

(Pdb) inputs.keys()
dict_keys([‘input_ids’, ‘attention_mask’])
(Pdb) len(inputs[‘input_ids’])
8
(Pdb) self.args.predict_with_generate
True

然后走到Seq2SeqTrainer,内部就是generate一次,然后返回loss, generated_tokens, labels,最后回到上面的for循环取eval数据,关注一下pad_across_processes

(Pdb) labels.shape
torch.Size([8, 88])
(Pdb) logits.shape
torch.Size([8, 3189])

1
2
3
4
5
6
7
8
9
10
if labels is not None:
# Pad labels here, preparing for preprocess_logits_for_metrics in next logits block.
labels = self.accelerator.pad_across_processes(labels, dim=1, pad_index=-100)
if logits is not None:
logits = self.accelerator.pad_across_processes(logits, dim=1, pad_index=-100)
if self.preprocess_logits_for_metrics is not None:
logits = self.preprocess_logits_for_metrics(logits, labels)
logits = self.gather_function(logits)
if not self.args.batch_eval_metrics or description == "Prediction":
all_preds.add(logits)

经过pad_across_processes以后会将同一批次右填充到当前批次的最大长度,默认是右填充,pad_first = False

经过上面的调试以后,还有几个问题。

  • 什么时候把输出padding到统一的长度的?
  • eval的时候,tokenizer的padding_size是right还是left?在哪里修改的?

per_device_eval_batch_size改成64试试

KTO典型流程

以llamafactory0.9.3为例,CustomKTOTrainer–>KTOTrainer–>BaseTrainer–>Trainer,各自重写了部分方法

1
2
3
4
5
6
7
8
9
10
11
12
# 以下是trl==0.9.6
loss计算过程,有点绕,kto_loss函数里面很详细的写了正负样本的loss和rewards计算
Trainer.compute_loss
KTOTrainer.compute_loss
CustomKTOTrainer.get_batch_loss_metrics
CustomKTOTrainer.concatenated_forward
CustomKTOTrainer.compute_reference_log_probs
KTOTrainer.kto_loss

CustomKTOTrainer.concatenated_forward-->CustomKTOTrainer.forward-->get_batch_logps

get_batch_logps内部这里将token-level概率求和转换成seq-level联合概率,ld_alpha控制LD-DPO,解决长度偏差问题,防止chosen过长

trl最新版,又进行了改动。底层逻辑不变,对函数封装进行了优化。目前按照0.9.6的版本进行分析

首先看一下ref_model的创建,finetuning_args.ref_model一般不会设置的,因此这里会创建一个和策略模型一模一样的参考模型,如果是lora微调的话,就返回None,用于后续lora adapter的适配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def create_ref_model(
model_args: "ModelArguments", finetuning_args: "FinetuningArguments", add_valuehead: bool = False
) -> Optional[Union["PreTrainedModel", "AutoModelForCausalLMWithValueHead"]]:
r"""Create reference model for PPO/DPO training. Evaluation mode is not supported.

The valuehead parameter is randomly initialized since it is useless for PPO training.
"""
if finetuning_args.ref_model is not None:
ref_model_args = ModelArguments.copyfrom(
model_args,
model_name_or_path=finetuning_args.ref_model,
adapter_name_or_path=finetuning_args.ref_model_adapters,
quantization_bit=finetuning_args.ref_model_quantization_bit,
)
ref_finetuning_args = FinetuningArguments()
tokenizer = load_tokenizer(ref_model_args)["tokenizer"]
ref_model = load_model(
tokenizer, ref_model_args, ref_finetuning_args, is_trainable=False, add_valuehead=add_valuehead
)
logger.info_rank0(f"Created reference model from {finetuning_args.ref_model}")
else:
if finetuning_args.finetuning_type == "lora":
ref_model = None
else:
ref_model_args = ModelArguments.copyfrom(model_args)
ref_finetuning_args = FinetuningArguments()
tokenizer = load_tokenizer(ref_model_args)["tokenizer"]
ref_model = load_model(
tokenizer, ref_model_args, ref_finetuning_args, is_trainable=False, add_valuehead=add_valuehead
)
logger.info_rank0("Created reference model from the model itself.")

return ref_model

CustomKTOTrainer.get_batch_loss_metrics

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def get_batch_loss_metrics(
self,
model: "PreTrainedModel",
batch: dict[str, "torch.Tensor"],
) -> tuple["torch.Tensor", dict[str, "torch.Tensor"]]:
r"""Compute the DPO loss and other metrics for the given batch of inputs for train or test."""
metrics = {}
(
policy_chosen_logps,
policy_rejected_logps,
policy_chosen_logits,
policy_rejected_logits,
policy_kl_logps,
policy_chosen_logps_avg,
) = self.concatenated_forward(model, batch)
reference_chosen_logps, reference_rejected_logps, reference_kl_logps = self.compute_reference_log_probs(
model, batch
)
losses, chosen_rewards, rejected_rewards, kl = self.kto_loss(
policy_chosen_logps,
policy_rejected_logps,
policy_kl_logps,
reference_chosen_logps,
reference_rejected_logps,
reference_kl_logps,
)
losses = losses.nanmean()

if self.ftx_gamma > 1e-6 and len(policy_chosen_logps) > 0: # remember to rescale
sft_loss = -policy_chosen_logps_avg
losses += self.ftx_gamma * sft_loss.nanmean() / len(policy_chosen_logps) * len(batch["labels"])

num_chosen = len(chosen_rewards)
num_rejected = len(rejected_rewards)
if num_chosen > 0:
metrics["rewards/chosen_sum"] = chosen_rewards.nansum().item()
metrics["logps/chosen_sum"] = policy_chosen_logps.nansum().item()
metrics["logits/chosen_sum"] = policy_chosen_logits.nansum().item()
metrics["count/chosen"] = float(num_chosen)

if num_rejected > 0:
metrics["rewards/rejected_sum"] = rejected_rewards.nansum().item()
metrics["logps/rejected_sum"] = policy_rejected_logps.nansum().item()
metrics["logits/rejected_sum"] = policy_rejected_logits.nansum().item()
metrics["count/rejected"] = float(num_rejected)

metrics["kl"] = kl.item()
return losses, metrics

首先会走到forward,会走两次

  • 第一次,在内部用策略模型完整计算一个batch的logits、logps、logps/valid_length
  • 第二次,用参考模型再计算一次,因为这里ref和policy是同一个,所以直接no_gra推理一边即可,得到_, kl_logps, _,只关注kl_logps

然后经过CustomKTOTrainer.concatenated_forward拿到choose、reject的logits、logps,期望样本的平均对数概率,参考模型的kl_logps

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@override
def concatenated_forward(
self, model: "PreTrainedModel", batch: dict[str, "torch.Tensor"]
) -> tuple["torch.Tensor", "torch.Tensor", "torch.Tensor", "torch.Tensor", "torch.Tensor", "torch.Tensor"]:
target_logits, target_logps, target_logps_avg = self.forward(model, batch)
with torch.no_grad():
_, kl_logps, _ = self.forward(model, batch, prefix="kl_")

if len(target_logps) != len(batch["kto_tags"]):
raise ValueError("Mismatched shape of inputs and labels.")

chosen_logits = target_logits[batch["kto_tags"]]
chosen_logps = target_logps[batch["kto_tags"]]
rejected_logits = target_logits[~batch["kto_tags"]]
rejected_logps = target_logps[~batch["kto_tags"]]
chosen_logps_avg = target_logps_avg[batch["kto_tags"]]
return chosen_logps, rejected_logps, chosen_logits, rejected_logits, kl_logps, chosen_logps_avg

同理,经过compute_reference_log_probs,用ref_model也计算一边,拿到reference_chosen_logps, reference_rejected_logps, reference_kl_logps

最后看一下最核心的kto_loss,很清晰简洁。trl源码确实值得深入探索

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def kto_loss(
self,
policy_chosen_logps: torch.FloatTensor,
policy_rejected_logps: torch.FloatTensor,
policy_KL_logps: torch.FloatTensor,
reference_chosen_logps: torch.FloatTensor,
reference_rejected_logps: torch.FloatTensor,
reference_KL_logps: torch.FloatTensor,
) -> Tuple[torch.FloatTensor, torch.FloatTensor, torch.FloatTensor, torch.FloatTensor]:
"""Compute the KTO loss for a batch of policy and reference model log probabilities.

Args:
policy_chosen_logps: Log probabilities of the policy model for the chosen responses. Shape: (num(chosen) in batch_size,)
policy_rejected_logps: Log probabilities of the policy model for the rejected responses. Shape: (num(rejected) in batch_size,)
policy_KL_logps: Log probabilities of the policy model for the KL responses. Shape: (batch_size,)
reference_chosen_logps: Log probabilities of the reference model for the chosen responses. Shape: (num(chosen) in batch_size,)
reference_rejected_logps: Log probabilities of the reference model for the rejected responses. Shape: (num(rejected) in batch_size,)
reference_KL_logps: Log probabilities of the reference model for the KL responses. Shape: (batch_size,)

Returns:
A tuple of four tensors: (losses, chosen_rewards, rejected_rewards, KL).
The losses tensor contains the KTO loss for each example in the batch.
The chosen_rewards and rejected_rewards tensors contain the rewards for the chosen and rejected responses, respectively.
The KL tensor contains the detached KL divergence estimate between the policy and reference models.
"""
kl = (policy_KL_logps - reference_KL_logps).mean().detach()
kl = self.accelerator.gather(kl).mean().clamp(min=0)

if policy_chosen_logps.shape[0] != 0 or reference_chosen_logps.shape[0] != 0:
chosen_logratios = policy_chosen_logps - reference_chosen_logps
chosen_losses = 1 - F.sigmoid(self.beta * (chosen_logratios - kl))
chosen_rewards = self.beta * chosen_logratios.detach()
else:
# lists can't be empty -- if they are, then accelerate.gather will hang
chosen_losses = torch.Tensor([]).to(self.accelerator.device)
chosen_rewards = torch.Tensor([]).to(self.accelerator.device)

if policy_rejected_logps.shape[0] != 0 or reference_rejected_logps.shape[0] != 0:
rejected_logratios = policy_rejected_logps - reference_rejected_logps
rejected_losses = 1 - F.sigmoid(self.beta * (kl - rejected_logratios))
rejected_rewards = self.beta * rejected_logratios.detach()
else:
# lists can't be empty -- if they are, then accelerate.gather will hang
rejected_losses = torch.Tensor([]).to(self.accelerator.device)
rejected_rewards = torch.Tensor([]).to(self.accelerator.device)

losses = torch.cat(
(self.desirable_weight * chosen_losses, self.undesirable_weight * rejected_losses),
0,
)

return losses, chosen_rewards, rejected_rewards, kl

步骤

1. 对数概率定义

xx 为提示,yy 为回答,y|y| 为回答的 token 数。模型 π\pi 在回答 yy 上的总对数概率与平均每 token 对数概率分别为:

Lπ(yx)=t=1ylogπ(ytx,y<t),(1)L_{\pi}(y \mid x) = \sum_{t=1}^{|y|} \log \pi(y_t \mid x, y_{<t}), \tag{1}

Lˉπ(yx)=1yLπ(yx).(2)\bar{L}_{\pi}(y \mid x) = \frac{1}{|y|} L_{\pi}(y \mid x). \tag{2}

对于完整序列 seqi\mathrm{seq}_i(包含 prompt 和 completion),记其平均每 token 对数概率为 Lˉπ(seqi)\bar{L}_{\pi}(\mathrm{seq}_i)

2. 批次 KL 散度估计

设批次大小为 NNπθ\pi_\theta 为当前策略模型,πref\pi_{\mathrm{ref}} 为冻结的参考模型。批次级别的 KL 散度估计为:

KL^=max ⁣(0,  1Ni=1N[Lˉπθ(seqi)Lˉπref(seqi)]).(3)\widehat{\mathrm{KL}} = \max\!\left(0,\; \frac{1}{N}\sum_{i=1}^{N} \Bigl[ \bar{L}_{\pi_\theta}(\mathrm{seq}_i) - \bar{L}_{\pi_{\mathrm{ref}}}(\mathrm{seq}_i) \Bigr] \right). \tag{3}

3. 回答级对数比率 (log-ratio)

对期望回答 (chosen) 与不期望回答 (rejected) 分别定义对数比率:

rch,i=Lπθ(yichxi)Lπref(yichxi),(4)r_{\mathrm{ch},i} = L_{\pi_\theta}(y_i^{\mathrm{ch}} \mid x_i) - L_{\pi_{\mathrm{ref}}}(y_i^{\mathrm{ch}} \mid x_i), \tag{4}

rrej,j=Lπθ(yjrejxj)Lπref(yjrejxj).(5)r_{\mathrm{rej},j} = L_{\pi_\theta}(y_j^{\mathrm{rej}} \mid x_j) - L_{\pi_{\mathrm{ref}}}(y_j^{\mathrm{rej}} \mid x_j). \tag{5}

4. 非对称 KTO 损失 (原论文 Eq.7 实现)

σ()\sigma(\cdot) 为 sigmoid 函数,β\beta 为温度系数。两类样本的损失分别为:

ich=1σ ⁣(β(rch,iKL^)),(desirable examples)(6)\ell_i^{\mathrm{ch}} = 1 - \sigma\!\Bigl( \beta \cdot \bigl( r_{\mathrm{ch},i} - \widehat{\mathrm{KL}} \bigr) \Bigr), \qquad\text{(desirable examples)} \tag{6}

jrej=1σ ⁣(β(KL^rrej,j)).(undesirable examples)(7)\ell_j^{\mathrm{rej}} = 1 - \sigma\!\Bigl( \beta \cdot \bigl( \widehat{\mathrm{KL}} - r_{\mathrm{rej},j} \bigr) \Bigr). \qquad\text{(undesirable examples)} \tag{7}

5. 加权损失聚合

NchN_{\mathrm{ch}}NrejN_{\mathrm{rej}} 为批次中期望/不期望样本的数量,λD\lambda_DλU\lambda_U 为对应权重,则主损失为:

LKTO=1Nch+Nrej(λDi=1Nchich+λUj=1Nrejjrej).(8)\mathcal{L}_{\mathrm{KTO}} = \frac{1}{N_{\mathrm{ch}} + N_{\mathrm{rej}}} \left( \lambda_D \sum_{i=1}^{N_{\mathrm{ch}}} \ell_i^{\mathrm{ch}} + \lambda_U \sum_{j=1}^{N_{\mathrm{rej}}} \ell_j^{\mathrm{rej}} \right). \tag{8}

6. 辅助 SFT 损失与最终损失

期望回答上的平均每 token 对数概率的负值构成辅助 SFT 损失:

LSFT=1Nchi=1NchLˉπθ(yichxi).(9)\mathcal{L}_{\mathrm{SFT}} = -\frac{1}{N_{\mathrm{ch}}} \sum_{i=1}^{N_{\mathrm{ch}}} \bar{L}_{\pi_\theta}(y_i^{\mathrm{ch}} \mid x_i). \tag{9}

最终训练损失为两者加权和:

L=LKTO+γftxNch+NrejNchLSFT,(10)\mathcal{L} = \mathcal{L}_{\mathrm{KTO}} + \gamma_{\mathrm{ftx}} \cdot \frac{N_{\mathrm{ch}}+N_{\mathrm{rej}}}{N_{\mathrm{ch}}} \cdot \mathcal{L}_{\mathrm{SFT}}, \tag{10}

其中 γftx\gamma_{\mathrm{ftx}} 为混合系数(ftx_gamma)。

7. 隐式奖励(仅用于监控)

对数比率缩放后得到隐式奖励,不参与梯度计算:

Rich=βrch,i,(11)R_i^{\mathrm{ch}} = \beta \cdot r_{\mathrm{ch},i}, \tag{11}

Rjrej=βrrej,j.(12)R_j^{\mathrm{rej}} = \beta \cdot r_{\mathrm{rej},j}. \tag{12}

以上就是llamafactory0.9.3+trl0.9.6完整的KTO流程

PPO流程

在llamafactory里面ref model也会加上value head,好像是为了工程上的简便处理。

actor和critic共用主干网络,减少显存占用

1
2
3
4
5
6
7
8
model = load_model(tokenizer, model_args, finetuning_args, training_args.do_train, add_valuehead=True)

tokenizer.padding_side = "left" # use left-padding in generation while using right-padding in training
data_collator = MultiModalDataCollatorForSeq2Seq(template=template, model=model, **tokenizer_module)

# Create reference model and reward model
ref_model = create_ref_model(model_args, finetuning_args, add_valuehead=True)
reward_model = create_reward_model(model, model_args, finetuning_args)

可以看到,model和ref model都加了valuehead,为什么这么做?只能去CustomPPOTrainer里面寻找答案了

CustomPPOTrainer其实就是继承自PPOTrainer和Trainer,PPOTrainer是trl实现的,最新版本的trl没有step方法,改成了train方法,看后续llamafactory怎么兼容吧,目前只支持trl到0.9.6,但0.24实现会更优雅一点

PPOTrainer内部train方法就是把ppo实现了一遍,包括pg loss+critic loss,然后CustomPPOTrainer再进行了一次封装,做了一定解耦,支持自定义的reward和inputs,具体细节可以看PPOTrainer内部,重点是GAE的计算、adapter里面value head和policy head的切换等。

OPD流程

以trl库为例,要求trl>=0.29.0 transformers>=5.6.0(相比4.0,5.0代码更简洁,train流程抽象成了几个函数)

继承关系:Trainer->_BaseTrainer->SFTTrainer->GKDTrainer

从GKDTrainer.training_step开始看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def training_step(
self, model: nn.Module, inputs: dict[str, torch.Tensor | Any], num_items_in_batch: int | None = None
) -> torch.Tensor:
"""
Perform a training step for the Generalized Knowledge Distillation (GKD) model.

This method implements the on-policy learning approach described in the GKD paper. With probability
`self.lmbda`, it generates new responses using the student model, which are then used for training instead of
the original inputs.
"""
if random.random() <= self.lmbda:
with (
unwrap_model_for_generation(
model,
self.accelerator,
generation_kwargs=self.generation_kwargs, # Override model.generation_config with generation_kwargs to fix transformers#42762
) as unwrapped_model
):
new_input_ids, new_attention_mask, new_labels = self.generate_on_policy_outputs(
unwrapped_model, inputs, self.generation_config, self.processing_class.pad_token_id
)
inputs["input_ids"] = new_input_ids
inputs["attention_mask"] = new_attention_mask
inputs["labels"] = new_labels
elif self.seq_kd:
with (
unwrap_model_for_generation(
self.teacher_model,
self.accelerator,
generation_kwargs=self.generation_kwargs, # Override model.generation_config with generation_kwargs to fix transformers#42762
) as unwrapped_model
):
new_input_ids, new_attention_mask, new_labels = self.generate_on_policy_outputs(
unwrapped_model, inputs, self.generation_config, self.processing_class.pad_token_id
)
inputs["input_ids"] = new_input_ids
inputs["attention_mask"] = new_attention_mask
inputs["labels"] = new_labels

loss = super().training_step(model, inputs, num_items_in_batch)
return loss

三个分支,分别对应student on-policy,也就是OPD,seq_kd对应传统的模型蒸馏,最后else对应传统的sft,这里就是是否修改了inputs。

generate_on_policy_outputs内部会负责处理修正新的response_id、attention_mask、labels

  • opd:修改了response,学生自己推理的
  • gkd:修改了response,教师推理的,比传统sft慢很多
  • stf:不改inputs,直接用标签的response

接下来SFTTrainer.training_step–>Trainer.training_step,最后走到GKDTrainer重写的compute_loss–>generalized_jsd_loss

generalized_jsd_loss在内部计算jsd散度,beta控制是forward KL、reverse KL、mix等

新版本transformers的train流程

trl基本上在内部都是复用了Trainer类,重写各种子类方法,实现各种sft、kto、dpo等

1
2
3
4
5
6
train-->
_inner_training_loop-->
_run_epoch-->
training_step-->
compute_loss

dataset流程

以sft为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
第一次加载数据是在_load_single_dataset中的load_dataset,此时拿到的是
(Pdb) dataset
Dataset({
features: ['system', 'instruction', 'output', 'history'],
num_rows: 50000
})
内部走到数据脚本中_generate_examples里面的yield,然后保存起来
yield key, {
"system": "You are a helpful assistant.",
"instruction": prompt,
"output": json.dumps(data['label'], ensure_ascii=False),
"history": [],
}

第二次在align_dataset
def align_dataset(
dataset: Union["Dataset", "IterableDataset"],
dataset_attr: "DatasetAttr",
data_args: "DataArguments",
training_args: "Seq2SeqTrainingArguments",
) -> Union["Dataset", "IterableDataset"]:
r"""Align the dataset to a specific format.

Aligned dataset:
_prompt: [{"role": "user", "content": "..."}] * (2T - 1)
_response: [{"role": "assistant", "content": "..."}] * N (N > 1 for ranking dataset)
_system: "..."
_tools: "..."
_images: []
_videos: []
_audios: []
"""
column_names = list(next(iter(dataset)).keys())
kwargs = {}
if not data_args.streaming:
kwargs = dict(
num_proc=data_args.preprocessing_num_workers,
load_from_cache_file=(not data_args.overwrite_cache) or (training_args.local_process_index != 0),
desc="Converting format of dataset",
)

dataset_converter = get_dataset_converter(dataset_attr.formatting, dataset_attr, data_args)
return dataset.map(
dataset_converter,
batched=False,
remove_columns=column_names,
**kwargs,
)
(Pdb) n
> /root/ruixinspace/zyt/LLaMA-Factory-0.9.3/src/llamafactory/data/converter.py(279)align_dataset()
-> return dataset.map(
(Pdb) column_names
['system', 'instruction', 'output', 'history']
(Pdb) data_args.streaming
False
(Pdb) kwargs
{'num_proc': 32, 'load_from_cache_file': False, 'desc': 'Converting format of dataset'}
(Pdb) dataset_converter
AlpacaDatasetConverter(dataset_attr=amh_shipper_zyt, data_args=DataArguments(template='qwen', dataset=['amh_shipper_zyt'], eval_dataset=['amh_shipper_zyt_val'], dataset_dir='data', media_dir='data', cutoff_len=4096, train_on_prompt=False, mask_history=False, streaming=False, buffer_size=16384, mix_strategy='concat', interleave_probs=None, overwrite_cache=True, preprocessing_batch_size=1000, preprocessing_num_workers=32, max_samples=None, eval_num_beams=None, ignore_pad_token_for_loss=True, val_size=0.0, eval_on_each_dataset=False, packing=False, neat_packing=False, tool_format=None, default_system=None, enable_thinking=True, tokenized_path=None, data_shared_file_system=False))
默认的formatting就是alpaca,这里实际就是再进行一次映射,利用AlpacaDatasetConverter映射成如下形式
{
"_prompt": prompt,
"_response": response,
"_system": example[self.dataset_attr.system] if self.dataset_attr.system else "",
"_tools": example[self.dataset_attr.tools] if self.dataset_attr.tools else "",
"_images": self._find_medias(example[self.dataset_attr.images]) if self.dataset_attr.images else None,
"_videos": self._find_medias(example[self.dataset_attr.videos]) if self.dataset_attr.videos else None,
"_audios": self._find_medias(example[self.dataset_attr.audios]) if self.dataset_attr.audios else None,
}

也就是_get_merged_dataset得到
(Pdb) datasets.values()
dict_values([Dataset({
features: ['_prompt', '_response', '_system', '_tools', '_images', '_videos', '_audios'],
num_rows: 50000
})])


第三次是在_get_preprocessed_dataset里面的dataset.map
-> dataset = dataset.map(
(Pdb) kwargs
{'num_proc': 32, 'load_from_cache_file': False, 'desc': 'Running tokenizer on dataset'}
(Pdb) column_names
['_prompt', '_response', '_system', '_tools', '_images', '_videos', '_audios']
(Pdb) dataset_processor
SupervisedDatasetProcessor(template=Template(format_user=StringFormatter(slots=['<|im_start|>user\n{{content}}<|im_end|>\n<|im_start|>assistant\n'], tool_format=None), format_assistant=StringFormatter(slots=['{{content}}<|im_end|>\n'], tool_format=None), format_system=StringFormatter(slots=['<|im_start|>system\n{{content}}<|im_end|>\n'], tool_format=None), format_function=FunctionFormatter(slots=['{{content}}<|im_end|>\n'], tool_format='qwen'), format_observation=StringFormatter(slots=['<|im_start|>user\n<tool_response>\n{{content}}\n</tool_response><|im_end|>\n<|im_start|>assistant\n'], tool_format=None), format_tools=ToolFormatter(slots=[], tool_format='qwen'), format_prefix=EmptyFormatter(slots=[], tool_format=None), default_system='You are Qwen, created by Alibaba Cloud. You are a helpful assistant.', stop_words=['<|im_end|>'], thought_words=('<think>', '</think>'), efficient_eos=False, replace_eos=True, replace_jinja_template=False, enable_thinking=True, mm_plugin=BasePlugin(image_token=None, video_token=None, audio_token=None, expand_mm_tokens=True)), tokenizer=Qwen2TokenizerFast(name_or_path='/root/ruixinspace/zyt/Qwen3-0.6B', vocab_size=151643, model_max_length=131072, is_fast=True, padding_side='right', truncation_side='right', special_tokens={'eos_token': '<|im_end|>', 'pad_token': '<|endoftext|>', 'additional_special_tokens': ['<|im_start|>', '<|im_end|>', '<|object_ref_start|>', '<|object_ref_end|>', '<|box_start|>', '<|box_end|>', '<|quad_start|>', '<|quad_end|>', '<|vision_start|>', '<|vision_end|>', '<|vision_pad|>', '<|image_pad|>', '<|video_pad|>']}, clean_up_tokenization_spaces=False, added_tokens_decoder={
151643: AddedToken("<|endoftext|>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
151644: AddedToken("<|im_start|>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
151645: AddedToken("<|im_end|>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
151646: AddedToken("<|object_ref_start|>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
151647: AddedToken("<|object_ref_end|>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
151648: AddedToken("<|box_start|>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
151649: AddedToken("<|box_end|>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
151650: AddedToken("<|quad_start|>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
151651: AddedToken("<|quad_end|>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
151652: AddedToken("<|vision_start|>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
151653: AddedToken("<|vision_end|>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
151654: AddedToken("<|vision_pad|>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
151655: AddedToken("<|image_pad|>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
151656: AddedToken("<|video_pad|>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
151657: AddedToken("<tool_call>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=False),
151658: AddedToken("</tool_call>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=False),
151659: AddedToken("<|fim_prefix|>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=False),
151660: AddedToken("<|fim_middle|>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=False),
151661: AddedToken("<|fim_suffix|>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=False),
151662: AddedToken("<|fim_pad|>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=False),
151663: AddedToken("<|repo_name|>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=False),
151664: AddedToken("<|file_sep|>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=False),
151665: AddedToken("<tool_response>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=False),
151666: AddedToken("</tool_response>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=False),
151667: AddedToken("<think>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=False),
151668: AddedToken("</think>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=False),
}
), processor=None, data_args=DataArguments(template='qwen', dataset=['amh_shipper_zyt'], eval_dataset=['amh_shipper_zyt_val'], dataset_dir='data', media_dir='data', cutoff_len=4096, train_on_prompt=False, mask_history=False, streaming=False, buffer_size=16384, mix_strategy='concat', interleave_probs=None, overwrite_cache=True, preprocessing_batch_size=1000, preprocessing_num_workers=32, max_samples=None, eval_num_beams=None, ignore_pad_token_for_loss=True, val_size=0.0, eval_on_each_dataset=False, packing=False, neat_packing=False, tool_format=None, default_system=None, enable_thinking=True, tokenized_path=None, data_shared_file_system=False))

这里会涉及到template的注册,事先注册了很多template,我们这里关注qwen即可
register_template(
name="qwen",
format_user=StringFormatter(slots=["<|im_start|>user\n{{content}}<|im_end|>\n<|im_start|>assistant\n"]),
format_assistant=StringFormatter(slots=["{{content}}<|im_end|>\n"]),
format_system=StringFormatter(slots=["<|im_start|>system\n{{content}}<|im_end|>\n"]),
format_function=FunctionFormatter(slots=["{{content}}<|im_end|>\n"], tool_format="qwen"),
format_observation=StringFormatter(
slots=["<|im_start|>user\n<tool_response>\n{{content}}\n</tool_response><|im_end|>\n<|im_start|>assistant\n"]
),
format_tools=ToolFormatter(tool_format="qwen"),
default_system="You are Qwen, created by Alibaba Cloud. You are a helpful assistant.",
stop_words=["<|im_end|>"],
replace_eos=True,
)

(Pdb) template
Template(format_user=StringFormatter(slots=['<|im_start|>user\n{{content}}<|im_end|>\n<|im_start|>assistant\n'], tool_format=None), format_assistant=StringFormatter(slots=['{{content}}<|im_end|>\n'], tool_format=None), format_system=StringFormatter(slots=['<|im_start|>system\n{{content}}<|im_end|>\n'], tool_format=None), format_function=FunctionFormatter(slots=['{{content}}<|im_end|>\n'], tool_format='qwen'), format_observation=StringFormatter(slots=['<|im_start|>user\n<tool_response>\n{{content}}\n</tool_response><|im_end|>\n<|im_start|>assistant\n'], tool_format=None), format_tools=ToolFormatter(slots=[], tool_format='qwen'), format_prefix=EmptyFormatter(slots=[], tool_format=None), default_system='You are Qwen, created by Alibaba Cloud. You are a helpful assistant.', stop_words=['<|im_end|>'], thought_words=('<think>', '</think>'), efficient_eos=False, replace_eos=True, replace_jinja_template=False, enable_thinking=True, mm_plugin=BasePlugin(image_token=None, video_token=None, audio_token=None, expand_mm_tokens=True))
(Pdb) template.mm_plugin
BasePlugin(image_token=None, video_token=None, audio_token=None, expand_mm_tokens=True)

BasePlugin只是做一些校验,直接跳过。重点在SupervisedDatasetProcessor内部的preprocess_dataset
preprocess_dataset-->_encode_data_example,在内部会计算input_ids和labels,也就是输入和输出。
这里涉及到template的注册,以及内部的一系列_encode过程,实际上在内部只是在拼接不同role的message格式,每个content注入到类似于<|im_start|>assistant\n{{content}}<|im_end|>\n等

此时拿到的数据是
(Pdb) dataset
Dataset({
features: ['input_ids', 'attention_mask', 'labels', 'images', 'videos', 'audios'],
num_rows: 50000
})

这里还有一个细节是,SupervisedDatasetProcessor在进行_encode_data_example的时候,实际上每个元素的'input_ids', 'attention_mask', 'labels'长度都是一样长的,这是为了方便对齐统一计算交叉熵loss。
(Pdb) len(train[1]['input_ids'])
2381
(Pdb) len(train[1]['labels'])
2381
(Pdb) len(train[1]['attention_mask'])
2381
但是你会发现labels的末尾包含了一个198的换行符,并不是<|im_end|>,也就是说,训练的时候,见到<|im_end|>,并不会停止,还会让模型去学会输出一个198的换行符!但是在推理的时候,见到<|im_end|>我们就截断了


最后训练得到的数据,需要经过data_collator再map一次
这里是llamafactory自己实现了SFTDataCollatorWith4DAttentionMask,最终也是走到DataCollatorForSeq2Seq里面的call

def __call__(self, features, return_tensors=None):
if return_tensors is None:
return_tensors = self.return_tensors

label_name = "label" if "label" in features[0].keys() else "labels"
labels = [feature[label_name] for feature in features] if label_name in features[0].keys() else None
# reconvert list[None] to None if necessary
# this might occur when we pass {..., "labels": None}
if labels is not None and all(label is None for label in labels):
labels = None
non_labels_features = [{k: v for k, v in feature.items() if k != label_name} for feature in features]

# run through tokenizer without labels to ensure no side effects
batch = pad_without_fast_tokenizer_warning(
self.tokenizer,
non_labels_features,
padding=self.padding,
max_length=self.max_length,
pad_to_multiple_of=self.pad_to_multiple_of,
return_tensors=return_tensors,
)

# we have to pad the labels manually as we cannot rely on `tokenizer.pad` and we need them to be of the same length to return tensors
no_padding = self.padding is False or self.padding == PaddingStrategy.DO_NOT_PAD
if labels is not None:
if no_padding:
if isinstance(features[0][label_name], list):
batch["labels"] = list(labels)
else:
batch["labels"] = [np.concatenate([label, []]) for label in labels]
else:
max_padding = self.padding == PaddingStrategy.MAX_LENGTH and self.max_length is not None
max_label_length = max(len(l) for l in labels) if not max_padding else self.max_length
if self.pad_to_multiple_of is not None:
max_label_length = (
(max_label_length + self.pad_to_multiple_of - 1)
// self.pad_to_multiple_of
* self.pad_to_multiple_of
)

padding_side = self.tokenizer.padding_side
if isinstance(features[0][label_name], list):
batch["labels"] = [
label + [self.label_pad_token_id] * (max_label_length - len(label))
if padding_side == "right"
else [self.label_pad_token_id] * (max_label_length - len(label)) + label
for label in labels
]
else:
batch["labels"] = [
np.concatenate(
[
label,
np.array([self.label_pad_token_id] * (max_label_length - len(label)), dtype=np.int64),
]
)
if padding_side == "right"
else np.concatenate(
[
np.array([self.label_pad_token_id] * (max_label_length - len(label)), dtype=np.int64),
label,
]
)
for label in labels
]

# reintroduce side effects via tokenizer that return respective datatypes for the `return_tensors` argument
if batch.get("labels", None) is not None:
if return_tensors == "pt":
import torch

batch["labels"] = torch.tensor(batch["labels"], dtype=torch.int64)
elif return_tensors == "tf":
import tensorflow as tf

batch["labels"] = tf.constant(batch["labels"], dtype=tf.int64)
else:
batch["labels"] = np.array(batch["labels"], dtype=np.int64)
else:
batch["labels"] = None

# prepare decoder_input_ids
if (
labels is not None
and self.model is not None
and hasattr(self.model, "prepare_decoder_input_ids_from_labels")
):
decoder_input_ids = self.model.prepare_decoder_input_ids_from_labels(labels=batch["labels"])
batch["decoder_input_ids"] = decoder_input_ids

return batch

内部逻辑也很清晰,就是填充非label的字段,然后再手动填充label,因为label填充是用的-100,其余字段填充是用的pad

# Keyword arguments for `model.generate`
gen_kwargs = generating_args.to_dict(obey_generation_config=True)
gen_kwargs["eos_token_id"] = [tokenizer.eos_token_id] + tokenizer.additional_special_tokens_ids
gen_kwargs["pad_token_id"] = tokenizer.pad_token_id
这里会发现pad_token_id 就是<|endoftext|>,而eos_token_id除了<|im_end|>以外,还包括各种<|im_start|>等,这样做是为了防止模型只输出当前轮次,强约束,防止模型出现幻觉?

padding side一些思考

参考:

第二个回复说的很好,贴出来一下

这是一个很专业、也很常见的疑问,而且你引用的那个 warning 本身并不等价于“训练一定有问题”。我分层把这个问题讲清楚,结合 PPO / GRPO(RLHF / RLAIF 场景) 来看。

一、先给结论(TL;DR)

PPO / GRPO 训练代码里不显式设置 padding_side=‘left’,通常不会影响训练效果,前提是:
attention mask、position ids、loss mask 处理是正确的。

那个 warning 主要针对“推理 / generation 阶段”,而不是 RL 训练本身。

二、为什么博客都在说「decoder-only 必须 left padding」?

你看到的 warning:

⚠️ A decoder-only architecture is being used, but right-padding was detected! For correct generation results, please set padding_side=‘left’

它的语境是:model.generate() 推理阶段。

原因是 decoder-only(GPT / LLaMA)的位置编码机制:
• decoder-only 是 严格因果注意力
• 每个 token 的 position id 很重要
• 如果你 right padding,那么:

[真实token][真实token][PAD][PAD][PAD]

在 batch 中:
• 不同样本的「最后一个真实 token」位置不一致
• generation 时用的是 最后一个 position
• 容易导致生成时 position 对齐错误

👉 所以:推理 / batch generate 时,left padding 是推荐做法

三、那为什么 PPO / GRPO 训练代码里不显式设?

关键点:PPO / GRPO ≠ generate 推理

在 PPO / GRPO 中,模型主要做三件事:
\1. 前向计算 logprob
\2. 根据 mask 算 loss
\3. 对已有 token 序列做 policy gradient

👉 并不是在调用 model.generate() 做自由生成

四、训练阶段 right padding 为什么通常没问题?

1️⃣ 训练时「position id 是连续的」

在训练中,一般是这样:

input_ids
attention_mask
labels / loss_mask

模型看到的是:
• PAD token 被 attention_mask=0 屏蔽
• loss 只在有效 token 上算
• position embedding 仍然是从 0,1,2,… 连续增长

✅ 只要 attention mask 正确,padding 在左还是右,本质等价

2️⃣ PPO / GRPO 的 loss 不是“最后一个 token”

PPO / GRPO 的 loss 形式是:

\mathbb{E}\left[ \min(r_t A_t,; \text{clip}(r_t, 1-\epsilon, 1+\epsilon) A_t) \right]
• 是 逐 token 的 logprob
• 通过 mask 控制哪些 token 参与
• 不是用“序列最后一个 token 的 hidden state”

👉 这和 generate 的需求完全不同

3️⃣ 实际工程中常见的做法

很多 PPO / GRPO 实现:
• 训练阶段:right padding
• 推理 / rollout generate 阶段:left padding

这是一个非常常见、且合理的工程折中。

五、那 warning 为什么会出现?

这个 warning 的触发条件通常是:

tokenizer.padding_side == “right”
model.generate(…)

也就是说:
• 不是因为“模型结构 + padding”就一定错
• 而是 decoder-only + generate + right padding

如果你在 PPO 中:
• rollout 阶段自己写了 generate
• 或者调用 HF generate

那 确实应该在 rollout 时用 left padding

六、总结对你问题的逐条回答

❓ 为什么 PPO / GRPO 代码中没显式设置 padding_side=‘left’?

✔️ 因为:
• 训练阶段不依赖 generate
• loss 是 masked token-level loss
• right padding 不影响数学正确性
• 显式设置反而可能影响 DataLoader / collate 简洁性

❓ 不加会影响模型训练效果吗?

通常不会,只要满足:
• attention_mask 正确
• loss_mask 正确
• position_id 没被错误重写

⚠️ 真正会影响训练效果的不是 padding_side,而是:
• reward / advantage 对齐是否正确
• logprob 计算是否错位
• reference model 是否一致

七、给你一个「推荐实践」

如果你在写 PPO / GRPO 框架,可以用下面策略:

【训练 forward / loss】

  • right padding(默认即可)
  • 正确 attention_mask + loss_mask

【rollout / generate】

  • tokenizer.padding_side = “left”
  • batch generate

这是目前业界最稳妥、最常见的做法。

在llamafactory中,训练的时候默认是right padding,推理的时候,需要left padding。所以lf在train之前初始化了padding为right,然后在do_eval之前设置了left padding。如果我想在train的时候,一边train一边eval,貌似会存在一些问题,一直是right padding,导致F1偏低。但是最后一次eval,是发生在do_eval的时候,此时llamafactory已经修改了padding side为left。因此最后一次的eval是正常的。

在TrainingArguments中,只要设置了eval_strategy,也会把do_eval设置为True

1
2
if self.do_eval is False and self.eval_strategy != IntervalStrategy.NO:
self.do_eval = True

一些零碎知识

为什么预训练的loss下界是语言的熵,而SFT的loss下界却是0呢?

预训练的目标分布是语言中天然存在的条件概率(熵>0),损失下界是语言熵;SFT 的目标分布是人工固化后的唯一序列(熵=0),损失下界可以为 0

  • :rocket: 各种rl+opd,看一下训练结果

  • :rocket:agent项目得一个,asyncio等系统学习一下

  • :star: cv+lc,一周时间准备?

    • kto,dpo区别,数据准备