
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
在 AI 应用中,用户体验的关键在于响应速度。等待 10 秒看到完整答案,远不如逐字显示来得友好。AskTable 通过流式响应和工具调用的巧妙结合,实现了真正的实时交互体验。
# ❌ 传统方式:等待完整响应
response = await openai.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "分析上个月的销售数据"}]
)
# 用户需要等待 10 秒才能看到结果
print(response.choices[0].message.content)
用户体验差:
# ✅ 流式响应:逐 token 返回
stream = await openai.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "分析上个月的销售数据"}],
stream=True
)
async for chunk in stream:
delta = chunk.choices[0].delta.content
if delta:
print(delta, end="", flush=True) # 实时显示
用户体验好:
async def _step(self) -> AsyncGenerator[StreamEvent, None]:
"""执行一步 Agent 推理"""
self.completion_count += 1
# 创建流式请求
completion = await self.llm_client.chat.completions.create(
messages=self.message_builder.dump_openai(),
model=self.model,
tools=self.tools,
stream=True, # 关键:启用流式
)
# 逐 chunk 处理
async for chunk in completion:
delta = self.message_builder.append_openai_delta(chunk)
if delta:
if isinstance(delta, list):
for event in delta:
yield event # 实时返回事件
else:
yield delta
设计亮点:
@dataclass
class StreamEvent:
type: str # "text" | "tool_call" | "tool_result"
content: str
metadata: dict
事件类型:
示例流程:
# 用户提问:"上个月的销售额是多少?"
# Event 1: text
{"type": "text", "content": "让我查询一下"}
# Event 2: tool_call
{"type": "tool_call", "name": "search_metadata", "args": {"queries": ["销售额"]}}
# Event 3: tool_result
{"type": "tool_result", "content": "找到字段:sales.amount"}
# Event 4: tool_call
{"type": "tool_call", "name": "execute_sql", "args": {"sql": "SELECT SUM(amount) FROM sales WHERE ..."}}
# Event 5: tool_result
{"type": "tool_result", "content": "结果:1,234,567 元"}
# Event 6: text
{"type": "text", "content": "上个月的销售额是 1,234,567 元"}
class ChatMessageBuilder:
def append_openai_delta(self, chunk) -> StreamEvent | list[StreamEvent] | None:
"""处理流式 chunk,返回事件"""
delta = chunk.choices[0].delta
# 文本内容
if delta.content:
self._current_message["content"] += delta.content
return StreamEvent(type="text", content=delta.content)
# 工具调用
if delta.tool_calls:
for tool_call in delta.tool_calls:
# 累积工具调用参数
self._accumulate_tool_call(tool_call)
# 工具调用完整时返回事件
if self._is_tool_call_complete():
return StreamEvent(
type="tool_call",
name=self._current_tool_call["name"],
args=self._current_tool_call["arguments"]
)
return None
关键点:
async def _step(self) -> AsyncGenerator[StreamEvent, None]:
# ... 流式接收 LLM 响应 ...
# 执行工具调用
for tool_use in self.message_builder.get_unresolved_tool_use_blocks():
func = self.actions[tool_use["name"]]
params = json.loads(tool_use["input"])
try:
# 执行工具(支持同步和异步)
func_handler = func(**params)
if iscoroutine(func_handler):
result = await func_handler
else:
result = func_handler
except Exception as e:
result = json.dumps({"error": str(e)})
self.tool_called_count += 1
# 返回工具结果事件
yield self.message_builder.append_tool_result(tool_use["id"], result)
设计亮点:
# 同步工具
def show_table(self, table_names: list[str]) -> str:
return json.dumps([...])
# 异步工具
async def search_metadata(self, queries: list[str]) -> str:
results = await retrieve_entities(...)
return json.dumps(results)
# 自动检测并正确调用
if iscoroutine(func_handler):
result = await func_handler # 异步
else:
result = func_handler # 同步
try:
result = await func(**params)
except Exception as e:
# 将错误返回给 LLM,让它尝试其他方案
result = json.dumps({"error": str(e)})
优势:
async def run(
self, user_input: str | None = None
) -> AsyncGenerator[StreamEvent, None]:
"""持续执行直到得到最终答案"""
self.tool_called_count = 0
self.completion_count = 0
# 第一步:处理用户输入
async for chunk in self.step(user_input):
yield chunk
# 持续执行直到完成
while True:
if self.completion_count >= self.max_completions:
raise CannotHandle("Completion count exceeds max")
last_message = self.message_builder.dump_anthropic()[-1]
if last_message["role"] == "assistant":
last_block = last_message["content"][-1]
if last_block["type"] == "text":
# 得到最终答案,结束
break
elif last_message["role"] == "user":
last_block = last_message["content"][-1]
if last_block["type"] == "tool_result":
if self.tool_called_count >= self.max_tool_calls:
raise CannotHandle("Tool called count exceeds max")
# 继续执行下一步
async for chunk in self.step():
yield chunk
执行流程:
用户输入
↓
LLM 推理(流式)
↓
工具调用?
├─ 是 → 执行工具 → 返回结果 → LLM 推理(流式)→ ...
└─ 否 → 返回最终答案 → 结束
安全限制:
self.max_tool_calls = 10 # 最多调用 10 次工具
self.max_completions = 20 # 最多 20 次 LLM 请求
防止无限循环消耗资源。
用户问题:"分析上个月各地区销售额,找出增长最快的前 3 个地区"
流式响应过程:
[文本] 让我先查询相关的数据表...
[工具调用] search_metadata(queries=["销售额", "地区"])
[工具结果] 找到表:sales, regions
[文本] 现在获取表的详细结构...
[工具调用] show_table(table_names=["sales", "regions"])
[工具结果] 返回字段信息
[文本] 查询上个月各地区销售额...
[工具调用] execute_sql(sql="SELECT region, SUM(amount) FROM sales WHERE ...")
[工具结果] 返回查询结果
[文本] 查询上上个月数据用于对比...
[工具调用] execute_sql(sql="SELECT region, SUM(amount) FROM sales WHERE ...")
[工具结果] 返回查询结果
[文本] 根据数据分析,增长最快的前 3 个地区是:
1. 华东地区:增长 45%
2. 华南地区:增长 38%
3. 西南地区:增长 32%
用户体验:
用户问题:"查询 users 表的数据"
流式响应过程:
[文本] 让我查询 users 表...
[工具调用] execute_sql(sql="SELECT * FROM users")
[工具结果] 错误:表不存在
[文本] 抱歉,users 表不存在。让我搜索相关的表...
[工具调用] search_metadata(queries=["用户"])
[工具结果] 找到表:user_info, customer
[文本] 我找到了 user_info 表,是否查询这个表?
优势:
# 避免过于频繁的事件
class EventBuffer:
def __init__(self, flush_interval=0.1):
self.buffer = []
self.last_flush = time.time()
self.flush_interval = flush_interval
def add(self, event):
self.buffer.append(event)
if time.time() - self.last_flush > self.flush_interval:
return self.flush()
return None
def flush(self):
if not self.buffer:
return None
events = self.buffer
self.buffer = []
self.last_flush = time.time()
return events
# 控制生产速度,避免消费者跟不上
class BackpressureStream:
def __init__(self, max_buffer_size=100):
self.queue = asyncio.Queue(maxsize=max_buffer_size)
async def produce(self, event):
await self.queue.put(event) # 队列满时会阻塞
async def consume(self):
while True:
event = await self.queue.get()
yield event
# ✅ 好的事件设计
@dataclass
class StreamEvent:
type: str
content: str
metadata: dict # 额外信息
# ❌ 差的事件设计
@dataclass
class StreamEvent:
data: str # 类型不明确
# ✅ 优雅降级
try:
result = await tool(**params)
except Exception as e:
result = {"error": str(e), "suggestion": "try another approach"}
yield StreamEvent(type="error", content=str(e))
# ❌ 直接抛出异常
result = await tool(**params) # 可能中断整个流程
# ✅ 提供进度信息
yield StreamEvent(type="progress", content="正在查询数据库...")
yield StreamEvent(type="progress", content="正在分析结果...")
# ❌ 长时间无反馈
# 用户不知道系统在做什么
AskTable 通过流式响应和工具调用的结合,实现了流畅的实时交互体验:
流式响应不仅是技术实现,更是用户体验的关键。通过让用户实时看到 AI 的思考过程,我们构建了更加透明、可信的 AI 应用。