
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
在构建 LLM 应用时,不同模型提供商的 API 格式差异是一个常见的痛点。OpenAI 和 Anthropic 虽然都支持工具调用和流式响应,但消息格式却大相径庭。AskTable 的 ChatMessageBuilder 提供了一个优雅的解决方案:统一的内部消息格式 + 双向转换器。
messages = [
{"role": "system", "content": "You are a helpful assistant"},
{"role": "user", "content": "What's the weather?"},
{
"role": "assistant",
"content": "Let me check",
"tool_calls": [{
"id": "call_123",
"type": "function",
"function": {"name": "get_weather", "arguments": '{"city": "Beijing"}'}
}]
},
{"role": "tool", "tool_call_id": "call_123", "content": "Sunny, 25°C"}
]
messages = [
{
"role": "user",
"content": [{"type": "text", "text": "What's the weather?"}]
},
{
"role": "assistant",
"content": [
{"type": "text", "text": "Let me check"},
{"type": "tool_use", "id": "call_123", "name": "get_weather", "input": '{"city": "Beijing"}'}
]
},
{
"role": "user",
"content": [{"type": "tool_result", "tool_use_id": "call_123", "content": "Sunny, 25°C"}]
}
]
| 特性 | OpenAI | Anthropic |
|---|---|---|
| System Prompt | 独立的 system 消息 | 作为 API 参数传递 |
| 内容格式 | 字符串 | Content Block 数组 |
| 工具调用 | 字段 | Content Block |
| 工具结果 | 独立的 角色消息 | Content Block |
| Thinking | 不支持(部分模型支持 reasoning) | 原生支持 Block |
ChatMessageBuilder 使用类 Anthropic 的 Content Block 格式作为内部表示:
# 内部消息格式
InternalMessage = {
"role": "assistant" | "user",
"content": [
{"type": "text", "text": "..."},
{"type": "thinking", "thinking": "..."},
{"type": "tool_use", "id": "...", "name": "...", "input": "..."},
{"type": "tool_result", "tool_use_id": "...", "content": "..."}
]
}
def append_openai_message(self, message: ChatCompletionMessageParam) -> None:
role = message["role"]
if role == "user":
# 用户消息
self._messages.append({
"role": "user",
"content": [{"type": "text", "text": str(message["content"])}],
})
elif role == "assistant":
# 助手消息
blocks: list[ContentBlock] = []
# 添加文本内容
content = message.get("content")
if isinstance(content, str) and content:
blocks.append({"type": "text", "text": content})
# 添加工具调用
tool_calls = message.get("tool_calls")
if tool_calls:
for tc in tool_calls:
blocks.append({
"type": "tool_use",
"id": tc["id"],
"name": tc["function"]["name"],
"input": tc["function"]["arguments"],
})
if blocks:
self._messages.append({"role": "assistant", "content": blocks})
elif role == "tool":
# 工具结果消息
tool_call_id = message.get("tool_call_id")
content = message.get("content", "")
if tool_call_id:
# 如果上一条是 user 消息,追加到其中
if self._messages[-1]["role"] == "user":
self._messages[-1]["content"].append({
"type": "tool_result",
"tool_use_id": tool_call_id,
"content": str(content),
})
else:
# 否则创建新的 user 消息
self._messages.append({
"role": "user",
"content": [{
"type": "tool_result",
"tool_use_id": tool_call_id,
"content": str(content),
}],
})
流式响应需要增量构建消息:
def append_openai_delta(self, chunk: ChatCompletionChunk) -> StreamEvent | None:
if not chunk.choices:
return None
choice = chunk.choices[0]
delta = choice.delta
# 确保 assistant 消息存在
if not self._messages or self._messages[-1]["role"] != "assistant":
self._messages.append({"role": "assistant", "content": []})
blocks = self._messages[-1]["content"]
# 处理文本内容
if delta.content:
if blocks and blocks[-1]["type"] == "text":
# 追加到现有文本块
blocks[-1]["text"] += delta.content
else:
# 创建新文本块
blocks.append({"type": "text", "text": delta.content})
return AssistantStreamEvent(
role="assistant",
content=TextDelta(type="text", text=delta.content)
)
# 处理 thinking/reasoning
reasoning_text = None
if hasattr(delta, "reasoning_details") and delta.reasoning_details:
reasoning_text = delta.reasoning_details[0].get("text", "")
elif hasattr(delta, "reasoning") and delta.reasoning is not None:
reasoning_text = delta.reasoning
elif hasattr(delta, "reasoning_content") and delta.reasoning_content is not None:
reasoning_text = delta.reasoning_content
if reasoning_text:
if blocks and blocks[-1]["type"] == "thinking":
blocks[-1]["thinking"] += reasoning_text
else:
blocks.append({"type": "thinking", "thinking": reasoning_text})
return AssistantStreamEvent(
role="assistant",
content=ThinkingDelta(type="thinking", thinking=reasoning_text)
)
# 处理工具调用
if delta.tool_calls:
for tc_delta in delta.tool_calls:
idx = tc_delta.index if tc_delta.index is not None else 0
tool_use_block = self._get_or_create_tool_use_block(blocks, idx)
if tc_delta.id:
tool_use_block["id"] = tc_delta.id
if tc_delta.function:
if tc_delta.function.name:
tool_use_block["name"] = tc_delta.function.name
if tc_delta.function.arguments:
tool_use_block["input"] += tc_delta.function.arguments
return None # 工具调用完成后才发送事件
# 处理 finish_reason - 发送工具调用事件
if choice.finish_reason:
tool_use_blocks = [b for b in blocks if b["type"] == "tool_use"]
if tool_use_blocks:
events = [
AssistantStreamEvent(
role="assistant",
content=ToolUse(
type="tool_use",
id=b["id"],
name=b["name"],
input=b["input"],
),
)
for b in tool_use_blocks
]
return events if len(events) > 1 else events[0]
return None
def dump_openai(self, cache_control: bool = False) -> list[ChatCompletionMessageParam]:
openai_messages = []
# 添加 system prompt
if self.system_prompt is not None:
openai_messages.append({"role": "system", "content": self.system_prompt})
for msg in self._messages:
content_blocks = msg["content"]
# 分离不同类型的 block
text_parts = []
tool_uses = []
tool_results = []
for block in content_blocks:
if block["type"] == "text":
text_parts.append(block["text"])
elif block["type"] == "thinking":
# OpenAI 不支持 thinking,跳过
pass
elif block["type"] == "tool_use":
tool_uses.append(block)
elif block["type"] == "tool_result":
tool_results.append(block)
# 构建 assistant 消息
if msg["role"] == "assistant":
assistant_msg = {
"role": "assistant",
"content": "".join(text_parts),
}
if tool_uses:
assistant_msg["tool_calls"] = [
{
"id": tool["id"],
"type": "function",
"function": {
"name": tool["name"],
"arguments": tool["input"],
},
}
for tool in tool_uses
]
openai_messages.append(assistant_msg)
# 构建 user 消息
elif msg["role"] == "user" and text_parts:
openai_messages.append({"role": "user", "content": "".join(text_parts)})
# 构建 tool 消息
for tool_result in tool_results:
content = tool_result["content"]
openai_messages.append({
"role": "tool",
"tool_call_id": tool_result["tool_use_id"],
"content": str(content),
})
# 添加 cache control(用于 Anthropic 兼容的 OpenAI API)
if cache_control and openai_messages:
last_msg = openai_messages[-1]
if last_msg["role"] == "user":
content = last_msg.get("content", "")
if isinstance(content, str):
last_msg["content"] = [{
"type": "text",
"text": content,
"cache_control": {"type": "ephemeral"},
}]
return openai_messages
def dump_anthropic(self) -> list[InternalMessage]:
"""
导出为 Anthropic 格式(直接返回内部格式)
"""
return self._messages
ChatMessageBuilder 可以追踪哪些工具调用还没有返回结果:
def get_unresolved_tool_use_blocks(self) -> list[ContentBlock]:
"""查找最后一条 assistant 消息中未解决的 tool_use 块"""
for msg in reversed(self._messages):
if msg["role"] == "assistant":
tool_use_blocks = [
block for block in msg["content"] if block["type"] == "tool_use"
]
if not tool_use_blocks:
return []
# 收集已解决的 tool_use ID
resolved_ids = {
block["tool_use_id"]
for m in self._messages
if m["role"] == "user"
for block in m["content"]
if block["type"] == "tool_result"
}
return [b for b in tool_use_blocks if b["id"] not in resolved_ids]
return []
def append_tool_result(self, tool_call_id: str, content: str) -> StreamEvent:
# 创建 tool_result block
tool_result_block = {
"type": "tool_result",
"tool_use_id": tool_call_id,
"content": content,
}
# 添加为 user 消息
if self._messages[-1]["role"] == "user":
self._messages[-1]["content"].append(tool_result_block)
else:
self._messages.append({"role": "user", "content": [tool_result_block]})
return StreamUserEvent(
role="user",
content=ToolResult(
type="tool_result",
tool_use_id=tool_call_id,
content=content
),
)
# 初始化
builder = ChatMessageBuilder(system_prompt="You are a helpful assistant")
# 添加用户消息
builder.append_openai_message({
"role": "user",
"content": "What's the weather in Beijing?"
})
# 使用 OpenAI API
openai_messages = builder.dump_openai()
response = openai.chat.completions.create(
model="gpt-4",
messages=openai_messages
)
# 或使用 Anthropic API
anthropic_messages = builder.dump_anthropic()
response = anthropic.messages.create(
model="claude-3-5-sonnet-20241022",
system=builder.system_prompt,
messages=anthropic_messages
)
builder = ChatMessageBuilder()
# 流式接收 OpenAI 响应
stream = openai.chat.completions.create(
model="gpt-4",
messages=messages,
stream=True
)
for chunk in stream:
event = builder.append_openai_delta(chunk)
if event:
# 发送给前端
yield event
builder = ChatMessageBuilder()
# 用户消息
builder.append_openai_message({
"role": "user",
"content": "What's the weather?"
})
# LLM 响应(包含工具调用)
builder.append_openai_message({
"role": "assistant",
"content": "Let me check",
"tool_calls": [{
"id": "call_123",
"type": "function",
"function": {"name": "get_weather", "arguments": '{"city": "Beijing"}'}
}]
})
# 检查未解决的工具调用
unresolved = builder.get_unresolved_tool_use_blocks()
print(unresolved) # [{"type": "tool_use", "id": "call_123", ...}]
# 添加工具结果
builder.append_tool_result("call_123", "Sunny, 25°C")
# 继续对话
messages = builder.dump_openai()
ChatMessageBuilder 支持多种 thinking/reasoning 格式:
# OpenAI o1 格式
delta.reasoning_details = [{"text": "Let me think..."}]
# OpenRouter 格式
delta.reasoning = "Let me think..."
# Qwen 格式
delta.reasoning_content = "Let me think..."
所有格式都会被转换为统一的
thinking block:
{"type": "thinking", "thinking": "Let me think..."}
流式处理时增量构建消息,避免重复解析:
# 增量追加文本
if blocks and blocks[-1]["type"] == "text":
blocks[-1]["text"] += delta.content
只在需要时才导出为特定格式:
# 内部格式保持不变
builder._messages # 始终是统一格式
# 按需导出
openai_messages = builder.dump_openai() # 仅在调用时转换
对于相同的消息历史,可以缓存导出结果:
@lru_cache(maxsize=128)
def dump_openai_cached(self, messages_hash: str):
return self.dump_openai()
ChatMessageBuilder 通过统一的内部格式和双向转换器,优雅地解决了多模型 API 兼容性问题:
这种设计不仅简化了多模型集成,还为未来支持更多模型提供了可扩展的基础。