
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
在构建企业级 Text-to-SQL 系统时,单一 LLM 调用往往难以处理复杂的数据分析场景。本文将深入解析 AskTable 如何通过多智能体架构,将复杂任务分解为可管理的子任务,实现高质量的自然语言到 SQL 的转换。
传统的 Text-to-SQL 系统通常采用"一次性生成"的方式:用户提问 → LLM 生成 SQL → 执行查询。这种方式在面对复杂场景时存在明显局限:
AskTable 通过多智能体架构解决了这些问题,让每个 Agent 专注于特定任务,通过工具调用和流式响应实现复杂的数据分析流程。
上图展示了 Agent 的核心执行流程:接收问题后,Agent 会判断是否需要调用工具(如查询元数据、执行 SQL),如果需要则调用工具并处理结果,然后继续判断是否需要更多工具,直到生成最终答案。
AskTable 的 Agent 基类提供了完整的工具调用和流式响应能力:
class Agent:
def __init__(
self,
system_prompt: Any,
max_tool_calls: int | None = None,
max_completions: int | None = None,
model: str | None = None,
):
self.llm_client, default_model = get_current_llm("report")
self.model = model or default_model
# 核心状态
self.message_builder = ChatMessageBuilder(system_prompt)
self.tools: list[ChatCompletionToolParam] = []
self.actions: dict[str, Callable[..., Any]] = {}
self.output_parser: Callable[..., Any] | None = None
# 安全限制
self.max_tool_calls = max_tool_calls or config.at_agent_max_tool_calls
self.max_completions = max_completions or (config.at_agent_max_tool_calls + 10)
设计亮点:
add_tool() 动态注册工具函数DBAgent 是专门处理数据库相关任务的智能体,它封装了元数据检索、SQL 执行等核心能力:
class DBAgent:
def __init__(
self,
prompt_name: str,
datasource: DataSourceAdmin,
meta: MetaAdmin | None = None,
assumed_role: RoleAdmin | None = None,
model: str | None = None,
):
self.datasource = datasource
self.assumed_role = assumed_role
self.meta = meta or get_accessible_meta(datasource, assumed_role)
# 构建数据库元数据摘要
self.db_meta = {
"schemas": [
{
"name": schema.name,
"tables": [{"name": table.name}]
}
for schema in self.meta.schemas.values()
for table in schema.tables.values()
]
}
# 初始化 Agent 并注册工具
system_prompt = get_prompt(prompt_name).compile(meta=self.db_meta)
self.agent = Agent(system_prompt=system_prompt, model=model)
核心工具函数:
def show_table(
self,
table_names: list[str] = Field(
...,
description="table full names, e.g. `schema_name.table_name`",
),
) -> str:
meta = self.meta.filter_tables_by_names(
[(f.split(".")[0], f.split(".")[1]) for f in table_names]
)
tables = [
table
for schema in meta.schemas.values()
for table in schema.tables.values()
]
return json.dumps([
{
"name": table.name,
"description": table.curr_desc,
"fields": [
{
"name": field.name,
"desc": field.curr_desc,
"data_type": field.data_type,
}
for field in table.fields.values()
],
}
for table in tables
])
async def search_metadata(
self,
queries: list[str] = Field(
...,
description="subqueries to perform semantic search on meta data",
),
keywords: list[str] = Field(
...,
description="keywords to perform full-text search on database content",
),
) -> str:
meta = await retrieve_entities(
meta=self.meta,
datasource=self.datasource,
subqueries=queries,
keywords=keywords,
)
tables = [
table
for schema in meta.schemas.values()
for table in schema.tables.values()
]
return json.dumps([
{
"table_full_name": table.full_name,
"table_description": table.curr_desc,
}
for table in tables
])
async def execute_sql(self, sql: str) -> str:
"""
Execute sql
## DataFrame Management
the data will be stored in the data_workspace automatically
"""
description = sql.strip().split("\n")[0][3:].strip()
def _connect_and_query():
with self.datasource.accessor.connect():
data = self.datasource.accessor.query(sql)
return data
self.last_dataframe = await asyncio.to_thread(_connect_and_query)
df_id = gen_id("df")
dataframe_json = json.dumps({
"id": df_id,
"dataframe": {
"columns": self.last_dataframe.columns.tolist(),
"shape": self.last_dataframe.shape,
"content_head": dataframe_to_dicts(
self.last_dataframe, self.dataframe_serialize_max_rows
),
},
})
self.data_workspace[df_id] = {
"df": self.last_dataframe,
"description": description,
"sql": sql,
}
return dataframe_json
设计亮点:
show_tableasyncio.to_thread 避免阻塞主线程PlanAgent 继承自 DBAgent,专门负责生成数据分析计划:
class PlanAgent(DBAgent):
plan: str | None = None
def __init__(
self,
datasource: DataSourceAdmin,
type: ReportType,
assumed_role: RoleAdmin | None = None,
):
if type == ReportType.summary:
prompt_name = "agent/outline_generator"
elif type == ReportType.analysis:
prompt_name = "agent/analysis_outline_generator"
super().__init__(
prompt_name=prompt_name,
datasource=datasource,
assumed_role=assumed_role
)
self.add_tool(self.show_table)
self.add_tool(self.search_metadata)
self.set_output_parser(self.output_parser)
def output_parser(self, output: str) -> None:
"""从输出文本中抽取 <plan> </plan> 标签中的内容"""
import re
match = re.search(r"<plan>(.*?)</plan>", output, re.DOTALL)
if match:
self.plan = match.group(1).strip()
else:
raise errors.ParameterError("未找到 <plan> 标签")
设计亮点:
AskTable 的工具调用机制是多智能体架构的核心,它实现了 LLM 与外部工具的无缝集成:
def add_tool(self, tool: Callable[..., Any]) -> None:
tool_name = getattr(tool, "__name__", tool.__class__.__name__)
self.tools.append({
"type": "function",
"function": {
"name": tool_name,
"description": tool.__doc__ or "",
"parameters": TypeAdapter(tool).json_schema(),
},
})
self.actions[tool_name] = tool
技术细节:
TypeAdapter 自动生成 JSON Schemaasync def _step(self) -> AsyncGenerator[StreamEvent, None]:
self.completion_count += 1
completion = await self.llm_client.chat.completions.create(
messages=self.message_builder.dump_openai(),
model=self.model,
tools=self.tools,
stream=True,
)
# 流式接收 LLM 响应
async for chunk in completion:
delta = self.message_builder.append_openai_delta(chunk)
if delta:
if isinstance(delta, list):
for event in delta:
yield event
else:
yield delta
# 执行工具调用
for tool_use in self.message_builder.get_unresolved_tool_use_blocks():
func = self.actions[tool_use["name"]]
params = json.loads(tool_use["input"])
try:
func_handler = func(**params)
if iscoroutine(func_handler):
result = await func_handler
else:
result = func_handler
except Exception as e:
result = json.dumps({"error": str(e)})
self.tool_called_count += 1
yield self.message_builder.append_tool_result(tool_use["id"], result)
设计亮点:
async def run(
self, user_input: str | None = None
) -> AsyncGenerator[StreamEvent, None]:
"""持续执行直到 LLM 返回最终答案"""
self.tool_called_count = 0
self.completion_count = 0
# 第一步:处理用户输入
async for chunk in self.step(user_input):
yield chunk
counter = 0
# 持续执行直到得到最终答案
while True:
counter += 1
if counter > 50:
break
if self.completion_count >= self.max_completions:
raise CannotHandle(
f"Completion count exceeds max completions: {self.max_completions}"
)
last_message = self.message_builder.dump_anthropic()[-1]
if last_message["role"] == "assistant":
last_block = last_message["content"][-1]
if last_block["type"] == "text":
if not self.output_parser:
break
# 验证输出格式
try:
self.output_parser(last_block["text"])
break
except Exception as e:
# 格式错误,要求 LLM 重新生成
async for chunk in self.step(str(e)):
yield chunk
elif last_message["role"] == "user":
last_block = last_message["content"][-1]
if last_block["type"] == "tool_result":
if self.tool_called_count >= self.max_tool_calls:
raise CannotHandle(
f"Tool called count exceeds max tool calls: {self.max_tool_calls}"
)
# 继续执行下一步
async for chunk in self.step():
yield chunk
设计亮点:
用户问题:"分析上个月各地区销售额,找出增长最快的前 3 个地区,并给出原因分析"
Agent 执行流程:
用户问题:"我想了解客户表的结构"
Agent 执行流程:
# 初始化时只提供表名摘要
self.db_meta = {
"schemas": [
{"name": schema.name, "tables": [{"name": table.name}]}
for schema in self.meta.schemas.values()
for table in schema.tables.values()
]
}
# 需要时才加载详细字段信息
def show_table(self, table_names: list[str]) -> str:
meta = self.meta.filter_tables_by_names(...)
# 返回详细的字段信息
优势:
虽然当前实现是串行执行工具调用,但架构支持并行执行:
# 未来可以支持并行工具调用
async def execute_tools_parallel(self, tool_uses):
tasks = [self.execute_tool(tool_use) for tool_use in tool_uses]
results = await asyncio.gather(*tasks)
return results
# DataFrame 工作空间自动缓存查询结果
self.data_workspace[df_id] = {
"df": self.last_dataframe,
"description": description,
"sql": sql,
}
优势:
AskTable 的多智能体架构展示了如何将复杂的 Text-to-SQL 任务分解为可管理的子任务:
这种架构不仅适用于 Text-to-SQL,也可以推广到其他需要多步推理和工具调用的 LLM 应用场景,如代码生成、数据分析、自动化运维等。
通过合理的架构设计和工程实践,我们可以构建出既强大又可控的 AI 应用系统。