OpenAI Agents SDK

OpenAI Agents SDK

OpenAI Agent SDK最近推出了新的Response API,相比以前的 OpenAIChatCompletions,两种API格式都支持,但是官方建议在agent中用前者。不过目前Response API还没适配qwen等国产模型,后续等国产大模型适配了再更新Response API。

一些参考

Python前向引用与延迟解析(前向引用:一种在类或函数内部引用尚未定义的类型或变量的行为;通过字符串的形式进行延迟类型解析)(双引号或单引号包裹、延迟加载)(互引用、互相引用)_python 前向引用-CSDN博客

注意的点

  • contextvars.ContextVar支持协程级别的局部变量,每个协程一份数据

  • dataclass装饰器包含了__init__()__repr__()方法,简化类

  • __slot__限制实例属性,可以节省开销,但是对类属性和子类不生效,只对当前类的实例属性生效;__all__限制哪些属性或者方法能够被import,只能影响import的导入行为,对内存开销等没影响

  • span还不清楚是什么意思?

  • 参数里面类型标注的时候,随处可见类被字符串包裹,这里涉及到前向引用和延迟解析,是由于python的逐行解析造成的,加引号可以防止引用到尚未声明的类型或函数,导致错误

  • 日志打印可以参考README的提示,标准化输出时间、日志级别、文件、行号,可以添加如下代码,如果想要在命令行再显示输出,可以按照README添加logger.addHandler(logging.StreamHandler())

    1
    2
    3
    logging.basicConfig(level=logging.DEBUG,filename="./logs.txt",filemode='w',
    format='%(asctime)s.%(msecs)03d [%(levelname)s] [%(filename)s:%(lineno)d] %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S')
  • 用qwen-plus、qwen-max都可以正常运行,但是qwen-long会报错,还不清楚是什么原因

Model解析

src/agents/models

interface.py

  • 定义Model,提供get_responsestream_response两个抽象方法
  • 定义ModelProvider,可以根据model_name拿到对应的Model实例,也是抽象方法

openai_responses.py

  • 定义OpenAIResponsesModelOpenAIChatCompletionsModel,其实就是两个工具类,前者是新的response API,后者就是旧式API,内部通过get_response-->_fetch_response-->ModelResponse拿到LLM的调用结果,值得注意的是,内部的_fetch_response有三个定义,但是只有一个具体实现,@overload装饰器仅仅用于静态类型检查,调用的时候只会调用具体实现的那个方法。说白了就是起到一个提示的效果,增强代码可读性。

openai_provider.py

  • 定义OpenAIProvider类,可以拿到OpenAIResponsesModelOpenAIChatCompletionsModel,内部包含一个AsyncOpenAI实例

run调用链路

以qwen-plus为例,示例代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import asyncio
import os

import httpx
from openai import AsyncOpenAI

from agents import (
Agent,
Runner,
function_tool,
set_default_openai_api,
set_default_openai_client,
set_tracing_disabled,
)

BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"
API_KEY = "sk-6e4bce6d2bec4508b3aa599594c66100"
# 不能用qwen-long
MODEL_NAME = "qwen-plus"

if not BASE_URL or not API_KEY or not MODEL_NAME:
raise ValueError(
"Please set EXAMPLE_BASE_URL, EXAMPLE_API_KEY, EXAMPLE_MODEL_NAME via env var or code."
)


"""This example uses a custom provider for all requests by default. We do three things:
1. Create a custom client.
2. Set it as the default OpenAI client, and don't use it for tracing.
3. Set the default API as Chat Completions, as most LLM providers don't yet support Responses API.

Note that in this example, we disable tracing under the assumption that you don't have an API key
from platform.openai.com. If you do have one, you can either set the `OPENAI_API_KEY` env var
or call set_tracing_export_api_key() to set a tracing specific key.
"""

client = AsyncOpenAI(
base_url=BASE_URL,
api_key=API_KEY,
http_client=httpx.AsyncClient(verify=False)
)
set_default_openai_client(client=client, use_for_tracing=False)
set_default_openai_api("chat_completions")
set_tracing_disabled(disabled=True)
import logging

logger = logging.getLogger("openai.agents")
logging.basicConfig(level=logging.DEBUG,filename="./logs.txt",filemode='w',
format='%(asctime)s.%(msecs)03d [%(levelname)s] [%(filename)s:%(lineno)d] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
# logger.addHandler(logging.StreamHandler())
# from agents import enable_verbose_stdout_logging
# enable_verbose_stdout_logging()

@function_tool
def get_weather(city: str):
print(f"[debug] getting weather for {city}")
return f"The weather in {city} is sunny."


async def main():
agent = Agent(
name="Assistant",
instructions="You are a helpful assistant. be VERY concise.",
model=MODEL_NAME,
tools=[get_weather],
)

result = await Runner.run(agent, "What's the weather in Tokyo?")
print(result.final_output)


if __name__ == "__main__":
asyncio.run(main())

调用过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
result = await Runner.run(agent, "What's the weather in Tokyo?")
-->hooks = RunHooks[Any]()
-->run_config = RunConfig() 初始只有workflow_name='Agent workflow' 其余为None和False
-->tool_use_tracker = AgentToolUseTracker()
-->with TraceCtxManager
-->__enter__()
-->get_current_trace() 拿到协程变量,初始为None,
-->self.trace = trace
-->return GLOBAL_TRACE_PROVIDER.create_trace
-->return NoOpTrace() or return TraceImpl
-->self.trace.start(mark_as_current=True)
-->NoOpTrace.start()
-->self._prev_context_token = Scope.set_current_trace(self)

------------------中间的代码逻辑------------------
while True
-->current_agent = starting_agent
-->current_span = agent_span(name="Assistant",handoffs=[],output_type=str,)
-->GLOBAL_TRACE_PROVIDER.create_span
-->return NoOpSpan(span_data) 和NoOpTrace类似
-->current_span.start(mark_as_current=True)
-->self._prev_span_token = Scope.set_current_span(self)
-->all_tools = await cls._get_all_tools(current_agent)
-->if current_turn == 1
# input_guardrail_results, turn_result
-->await asyncio.gather(cls._run_input_guardrails(),cls._run_single_turn())
# turn_result
-->else await cls._run_single_turn()
-->should_run_agent_start_hooks = False
# 根据next_step的类型分别做处理
-->if turn_result.next_step = NextStepRunAgain()
-->pass
# 前面pass后再进while循环走到这里
-->elif turn_result.next_step = NextStepFinalOutput()
-->await cls._run_output_guardrails()
-->return RunResult
-->finally current_span.finish(reset_current=True) # 这步执行完了再return
-->elif turn_result.next_step = NextStepHandoff
-->current_agent = cast(Agent[TContext], turn_result.next_step.new_agent)
current_span.finish(reset_current=True)
current_span = None
should_run_agent_start_hooks = True
-->__exit__()
-->self.trace.finish(reset_current=True)
-->NoOpTracefinish()
-->Scope.reset_current_trace(self._prev_context_token)

两个重点函数,cls._run_input_guardrails(),cls._run_single_turn()

1、_run_input_guardrails

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@classmethod
async def _run_input_guardrails(
cls,
agent: Agent[Any],
guardrails: list[InputGuardrail[TContext]],
input: str | list[TResponseInputItem],
context: RunContextWrapper[TContext],
) -> list[InputGuardrailResult]:
if not guardrails:
return []

# 初始化agent的时候,没有初始化围栏,所以这里是空
guardrail_tasks = [
asyncio.create_task(
RunImpl.run_single_input_guardrail(agent, guardrail, input, context)
)
for guardrail in guardrails
]

guardrail_results = []

# 依次执行围栏任务,如果触发了tripwire_triggered,中文意思理解为绊线?
# 就取消所有的任务
for done in asyncio.as_completed(guardrail_tasks):
result = await done
if result.output.tripwire_triggered:
# Cancel all guardrail tasks if a tripwire is triggered.
for t in guardrail_tasks:
t.cancel()
_error_tracing.attach_error_to_current_span(
SpanError(
message="Guardrail tripwire triggered",
data={"guardrail": result.guardrail.get_name()},
)
)
raise InputGuardrailTripwireTriggered(result)
else:
guardrail_results.append(result)

return guardrail_results

逻辑不难,就是依次构建guardrail的task,然后依次执行收集结果,如果中间触发了tripwire_triggered,就取消,并且抛出异常

task创建如下

1
2
3
4
5
6
run_single_output_guardrail
-->RunImpl.run_single_output_guardrail
-->with guardrail_span(guardrail.get_name()) as span_guardrail
-->await guardrail.run
-->guardrail_function
-->return OutputGuardrailResult 在这里会做统一的

为什么能用with去调一个函数?因为这个函数返回值是一个Span,该类实现了__enter__和__exit__方法,分别调用了start和finish方法,这也正好对应了注释说明

1
2
3
4
5
6
7
8
9
10
11
12
13
"""Create a new guardrail span. The span will not be started automatically, you should either
do `with guardrail_span() ...` or call `span.start()` + `span.finish()` manually.

Args:
name: The name of the guardrail.
triggered: Whether the guardrail was triggered.
span_id: The ID of the span. Optional. If not provided, we will generate an ID. We
recommend using `util.gen_span_id()` to generate a span ID, to guarantee that IDs are
correctly formatted.
parent: The parent span or trace. If not provided, we will automatically use the current
trace/span as the parent.
disabled: If True, we will return a Span but the Span will not be recorded.
"""

2、_run_single_turn

拿到model,调接口得到response,然后进行过滤,再判断是不是需要调用工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
-->if should_run_agent_start_hooks:
asyncio.gather() # 说白了就是先执行hook函数
-->await cls._get_new_response
-->model = cls._get_model(agent, run_config) # 这里走到了OpenAIProvider的get_model方法,上面有写
-->new_response = await model.get_response
-->context_wrapper.usage.add(new_response.usage)
-->return new_response
-->return await cls._get_single_step_result_from_response
# 返回ProcessedResponse
-->RunImpl.process_model_response
# 返回一个SingleStepResult
-->await RunImpl.execute_tools_and_side_effects
# First, lets run the tool calls - function tools and computer actions
-->await asyncio.gather(execute_function_tool_calls,execute_computer_actions)
# Second, check if there are any handoffs
-->return await cls.execute_handoffs
# Third, we'll check if the tool use should result in a final output
# if check_tool_use.is_final_output: return await cls.execute_final_output
# check if the model also produced a final output
-->potential_final_output_text
# There are two possibilities that lead to a final output:
# 1. Structured output schema => always leads to a final output
# 2. Plain text output schema => only leads to a final output if there are no tool calls
-->return await cls.execute_final_output
# If there's no final output, we can just run again
-->return SingleStepResult # 这里是最终的结果

最终的返回值为turn_result.next_step = NextStepFinalOutput(output='The weather in Tokyo is sunny.')里面的output

总结

调用链路跟传统的OpenAIChatCompletions区别不大,但是额外加了tracing、span、mcp、hooks、handoffs、guardrails等模块,其中hooks,就是插入一些函数,在指定阶段去执行(也不像是回调?)。handoffs是在多agents的情况下去分配任务到不同的agent,内部是将不同的agent也转换成了function的格式。guardrails是一种防护围栏机制,分为input和output,就是检测输入和输出是否合法。

具体使用可以参考官方文档:OpenAI Agents SDK