AskTable

AskTable AI Agent 工作原理:从架构到实现

AskTable 团队
AskTable 团队

AskTable AI Agent 工作原理:从架构到实现

AskTable 的核心能力来自于其强大的 AI Agent 系统。本文将深入解析 AskTable AI Agent 的架构设计和工作原理,帮助你理解如何构建一个生产级的 AI Agent 系统。

一、Agent 架构概览

1.1 核心组件

AskTable 的 AI Agent 系统由以下核心组件构成:

┌─────────────────────────────────────────────────────────┐
│                    Agent Base Class                      │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐  │
│  │ Message      │  │ Tool         │  │ Output       │  │
│  │ Builder      │  │ Registry     │  │ Parser       │  │
│  └──────────────┘  └──────────────┘  └──────────────┘  │
└─────────────────────────────────────────────────────────┘
                          ▲
                          │
        ┌─────────────────┼─────────────────┐
        │                 │                 │
   ┌────▼────┐      ┌────▼────┐      ┌────▼────┐
   │ DB      │      │ Data    │      │ Chart   │
   │ Agent   │      │ Node    │      │ Node    │
   │         │      │ Agent   │      │ Agent   │
   └─────────┘      └─────────┘      └─────────┘

1.2 设计理念

AskTable Agent 系统遵循以下设计原则:

二、Agent 基础框架

2.1 Agent 类核心实现

Agent 基类位于

app/atserver/ai/agent_rev.py
,提供了完整的 Agent 运行框架:

class Agent:
    def __init__(
        self,
        system_prompt: Any,
        max_tool_calls: int | None = None,
        max_completions: int | None = None,
        model: str | None = None,
    ):
        self.llm_client, default_model = get_current_llm("report")
        self.model = model or default_model

        # 核心状态
        self.system_prompt = system_prompt
        self.message_builder = ChatMessageBuilder(system_prompt)
        self.tools: list[ChatCompletionToolParam] = []
        self.actions: dict[str, Callable[..., Any]] = {}
        self.output_parser: Callable[..., Any] | None = None

        # 限制设置
        self.max_tool_calls = max_tool_calls or config.at_agent_max_tool_calls
        self.max_completions = max_completions or (config.at_agent_max_tool_calls + 10)

关键组件说明

2.2 工具注册机制

Agent 通过

add_tool()
方法动态注册工具:

def add_tool(self, tool: Callable[..., Any]) -> None:
    tool_name = getattr(tool, "__name__", tool.__class__.__name__)
    self.tools.append({
        "type": "function",
        "function": {
            "name": tool_name,
            "description": tool.__doc__ or "",
            "parameters": TypeAdapter(tool).json_schema(),
        },
    })
    self.actions[tool_name] = tool

工作原理

  1. 从函数中提取名称和文档字符串
  2. 使用 Pydantic 的
    TypeAdapter
    自动生成 JSON Schema
  3. 将工具定义添加到
    tools
    列表
  4. 将执行函数映射到
    actions
    字典

这种设计使得添加新工具非常简单,只需定义一个带类型注解的函数即可。

2.3 流式执行循环

Agent 的

run()
方法实现了完整的执行循环:

async def run(self, user_input: str | None = None) -> AsyncGenerator[StreamEvent, None]:
    # 第一步:处理用户输入
    async for chunk in self.step(user_input):
        yield chunk

    # 持续执行直到完成
    while True:
        if self.completion_count >= self.max_completions:
            raise CannotHandle("超过最大完成次数")

        last_message = self.message_builder.dump_anthropic()[-1]

        # 检查是否完成
        if last_message["role"] == "assistant":
            last_block = last_message["content"][-1]
            if last_block["type"] == "text":
                if self.output_parser:
                    try:
                        self.output_parser(last_block["text"])
                        break
                    except Exception as e:
                        # 解析失败,将错误反馈给 Agent
                        async for chunk in self.step(str(e)):
                            yield chunk
                else:
                    break

        # 继续执行工具调用
        elif last_message["role"] == "user":
            if self.tool_called_count >= self.max_tool_calls:
                raise CannotHandle("超过最大工具调用次数")
            async for chunk in self.step():
                yield chunk

执行流程

  1. 发送用户输入到 LLM
  2. 流式接收 LLM 响应
  3. 如果 LLM 调用工具,执行工具并返回结果
  4. 将工具结果发送回 LLM
  5. 重复步骤 2-4 直到 LLM 返回最终文本
  6. 如果设置了 output_parser,验证输出格式
  7. 解析失败则将错误反馈给 LLM 重试

三、专用 Agent 实现

3.1 DBAgent:数据库查询 Agent

DBAgent
是最基础的数据库交互 Agent,提供三个核心工具:

class DBAgent:
    def __init__(
        self,
        prompt_name: str,
        datasource: DataSourceAdmin,
        meta: MetaAdmin | None = None,
        assumed_role: RoleAdmin | None = None,
        model: str | None = None,
    ):
        self.datasource = datasource
        self.assumed_role = assumed_role
        self.meta = meta or get_accessible_meta(datasource, assumed_role)

        # 构建数据库元数据
        self.db_meta = {
            "schemas": [
                {
                    "name": schema.name,
                    "tables": [{"name": table.name}]
                }
                for schema in self.meta.schemas.values()
                for table in schema.tables.values()
            ]
        }

        # 初始化 Agent
        system_prompt = get_prompt(prompt_name).compile(meta=self.db_meta)
        self.agent = Agent(system_prompt=system_prompt, model=model)

        # 注册工具
        self.add_tool = self.agent.add_tool
        self.set_output_parser = self.agent.set_output_parser

        # 数据工作区
        self.data_workspace: dict[str, dict[str, Any]] = {}

核心工具

1. show_table:显示表结构

def show_table(self, table_names: list[str]) -> str:
    """显示指定表的详细结构"""
    meta = self.meta.filter_tables_by_names(
        [(f.split(".")[0], f.split(".")[1]) for f in table_names]
    )
    tables = [
        table
        for schema in meta.schemas.values()
        for table in schema.tables.values()
    ]
    return json.dumps([
        {
            "name": table.name,
            "description": table.curr_desc,
            "fields": [
                {
                    "name": field.name,
                    "desc": field.curr_desc,
                    "data_type": field.data_type,
                }
                for field in table.fields.values()
            ],
        }
        for table in tables
    ])

2. search_metadata:语义搜索元数据

async def search_metadata(
    self,
    queries: list[str],  # 语义搜索查询
    keywords: list[str],  # 全文搜索关键词
) -> str:
    """通过语义搜索和关键词搜索查找相关表"""
    meta = await retrieve_entities(
        meta=self.meta,
        datasource=self.datasource,
        subqueries=queries,
        keywords=keywords,
    )
    tables = [
        table
        for schema in meta.schemas.values()
        for table in schema.tables.values()
    ]
    return json.dumps([
        {
            "table_full_name": table.full_name,
            "table_description": table.curr_desc,
        }
        for table in tables
    ])

3. execute_sql:执行 SQL 查询

async def execute_sql(self, sql: str) -> str:
    """执行 SQL 并将结果存储到数据工作区"""
    # 提取 SQL 注释作为描述
    description = sql.strip().split("\n")[0][3:].strip()

    # 异步执行查询
    def _connect_and_query():
        with self.datasource.accessor.connect():
            return self.datasource.accessor.query(sql)

    dataframe = await asyncio.to_thread(_connect_and_query)

    # 生成 DataFrame ID 并存储
    df_id = gen_id("df")
    self.data_workspace[df_id] = {
        "df": dataframe,
        "description": description,
        "sql": sql,
    }

    # 返回 DataFrame 摘要
    return json.dumps({
        "id": df_id,
        "dataframe": {
            "columns": dataframe.columns.tolist(),
            "shape": dataframe.shape,
            "content_head": dataframe_to_dicts(
                dataframe, self.dataframe_serialize_max_rows
            ),
        },
    })

数据工作区设计

3.2 DataNodeAgentV2:Canvas 数据节点 Agent

DataNodeAgentV2
是 Canvas 中数据节点的专用 Agent,支持更高级的功能:

class DataNodeAgentV2:
    def __init__(
        self,
        datasource: DataSourceAdmin,
        meta: MetaAdmin,
        assumed_role: RoleAdmin | None = None,
        reference_context: str | None = None,
        model: str | None = None,
        reasoning_effort: Literal["low", "medium", "high"] = "medium",
        history_messages: list[dict] | None = None,
    ):
        self.datasource = datasource
        self.meta = meta
        self.model = model or get_current_model_name("canvas")
        self.reasoning_effort = reasoning_effort

        # 虚拟文件系统(用于元数据探索)
        self.vfs = VirtualFS(self.meta)
        self._shell_interpreter = ShellInterpreter(self.vfs)

        # 注册工具
        self._register_tools()

        # 构建系统提示
        self._system_prompt = self._build_system_prompt(reference_context)

        # 数据工作区
        self.data_workspace: dict[str, dict[str, Any]] = {}
        self.submitted_df_id: str | None = None
        self.submitted_description: str | None = None

核心特性

1. Shell 工具:元数据探索

def shell(self, command: str) -> str:
    """执行 shell 命令探索数据库元数据(ls/cat/find/grep)"""
    result = self._shell_interpreter.execute(command)
    return result

DataNodeAgentV2 提供了一个虚拟文件系统,将数据库元数据映射为文件系统结构:

/
├── schema1/
│   ├── table1.table
│   ├── table2.table
│   └── ...
├── schema2/
│   └── ...

Agent 可以使用

ls
cat
find
grep
等命令探索元数据,就像操作真实文件系统一样。

2. Reasoning 支持

response = await self._client.chat.completions.create(
    model=self.model,
    messages=messages,
    tools=self._tools,
    stream=True,
    max_tokens=MAX_TOKENS,
    extra_body={"reasoning": {"effort": self.reasoning_effort}},
)

通过

reasoning_effort
参数控制 AI 的思考深度:

3. submit_node:提交节点结果

def submit_node(
    self,
    description: str,
    df_id: str | None = None,
) -> str:
    """
    提交最终节点结果

    如果无法回答问题(没有相关表/字段),设置 df_id 为 None
    并在 description 中说明原因
    """
    if df_id is not None and df_id not in self.data_workspace:
        raise ValueError(f"DataFrame {df_id} 不在工作区中")

    self.submitted_df_id = df_id
    self.submitted_description = description

    status_msg = "error" if df_id is None else "success"
    return json.dumps({
        "df_id": df_id,
        "description": description,
        "status": status_msg
    })

3.3 ChartNodeAgent:图表生成 Agent

ChartNodeAgent
负责根据数据节点生成可视化图表:

class ChartNodeAgent:
    def __init__(
        self,
        parent_nodes_context: list[dict],
        history_messages: list[dict] | None = None,
    ):
        # 格式化父节点信息
        parent_info_parts = []
        for i, node in enumerate(parent_nodes_context, 1):
            parent_info_parts.append(f"Node {i} (ID: {node['id']}):")
            if node.get("description"):
                parent_info_parts.append(f"  Description: {node['description']}")
            if node.get("sql"):
                parent_info_parts.append(f"  SQL: {node['sql']}")
            if node.get("dataframe"):
                df = node["dataframe"]
                parent_info_parts.append(
                    f"  Columns: {', '.join(df.get('columns', []))}"
                )

        formatted_parent_info = "\n".join(parent_info_parts)

        # 构建系统提示
        system_prompt = get_prompt("agent/canvas/create_chart_node").compile(
            formatted_parent_info=formatted_parent_info
        )

        self.agent = Agent(
            system_prompt=system_prompt,
            model=get_current_model_name("canvas")
        )
        self.agent.add_tool(self.submit_chart_code)

核心工具

def submit_chart_code(
    self,
    description: str,
    code: str | None = None,
) -> str:
    """
    提交最终图表代码

    如果无法生成图表,设置 code 为 None 并在 description 中说明原因
    """
    if code is not None:
        try:
            # 编译 JSX 代码
            self.compiled_code = compile_jsx(code)
            self.source_code = code
        except Exception as e:
            raise ValueError(f"代码编译失败: {str(e)}")
    else:
        self.compiled_code = None
        self.source_code = None

    self.submitted_description = description

    status_msg = "error" if code is None else "success"
    return json.dumps({
        "description": description,
        "status": status_msg,
        "has_code": code is not None,
    })

3.4 ReportAgent:报告生成 Agent

ReportAgent
用于生成数据报告,支持 Python 代码执行:

class ReportAgent(DBAgent):
    def __init__(
        self,
        datasource: DataSourceAdmin,
        assumed_role: RoleAdmin | None = None,
    ):
        super().__init__(
            prompt_name="agent/report_generator",
            datasource=datasource,
            assumed_role=assumed_role,
        )
        self.add_tool(self.show_table)
        self.add_tool(self.search_metadata)
        self.add_tool(self.execute_sql)
        self.set_output_parser(self.output_parser)

输出解析器

def output_parser(self, output: str) -> None:
    # 提取 <code>...</code> 标签中的代码
    pattern = r"<code>(.*?)</code>"
    match = re.search(pattern, output, re.DOTALL)

    if not match:
        raise ValueError("无效的输出格式,期望: <code>...</code>")

    code = match.group(1).strip()

    # 提取所有 load_dataframe('...') 的 df_id
    load_df_pattern = r"load_dataframe\(\s*['\"]([^'\"]+)['\"]"
    referenced_dataframes = re.findall(load_df_pattern, code)

    if not referenced_dataframes:
        raise ValueError("代码中未找到 load_dataframe('df_id') 模式")

    # 编译 JSX 代码
    self.compiled_code = compile_jsx(code)
    self.source_code = code

    # 检查引用的 dataframes 是否都在 workspace 中
    missing_ids = set(referenced_dataframes) - set(self.data_workspace.keys())
    if missing_ids:
        raise ValueError(f"引用的 dataframes {missing_ids} 不在数据工作区中")

    self.referenced_dataframes = referenced_dataframes

四、Prompt 管理系统

4.1 Prompt 架构

AskTable 使用 Langfuse 管理 Prompt,支持版本控制和动态更新:

class PromptProxy:
    def __init__(self, from_local: bool):
        self._from_local = from_local
        if not self._from_local:
            # 运行时从 Langfuse 获取
            self.prompt_cache = None
        else:
            # 从本地 assets/prompts.json 加载
            with open("assets/prompts.json") as f:
                prompts = json.load(f)
            self.prompt_cache = {
                prompt.name: TextPromptClient(prompt)
                for prompt in prompts
            }

    def get_prompt(self, prompt_name: str) -> PromptClient:
        if not self._from_local:
            return langfuse.get_prompt(
                prompt_name,
                label=config.observer_prompt_label
            )
        else:
            return self.prompt_cache[prompt_name]

4.2 Prompt 模板编译

Prompt 支持 Jinja2 模板语法,可以动态注入上下文:

# 获取 Prompt
system_prompt = get_prompt("agent/canvas/data_agent").compile(
    meta=db_meta,
    history=reference_context
)

Prompt 命名规范

五、流式响应机制

5.1 StreamEvent 类型

AskTable 定义了多种流式事件类型:

@dataclass
class TextDelta:
    type: Literal["text"]
    text: str

@dataclass
class AssistantStreamEvent:
    role: Literal["assistant"]
    content: TextDelta

@dataclass
class ToolUseStreamEvent:
    role: Literal["assistant"]
    tool_use: dict

@dataclass
class ToolResultStreamEvent:
    role: Literal["user"]
    tool_result: dict

5.2 流式处理流程

async def run(self, question: str) -> AsyncGenerator[StreamEvent, None]:
    # 发送用户消息
    self._message_builder.append_openai_message({
        "role": "user",
        "content": question
    })

    for _ in range(MAX_ITERATIONS):
        # 调用 LLM
        response = await self._client.chat.completions.create(
            model=self.model,
            messages=messages,
            tools=self._tools,
            stream=True,
            max_tokens=MAX_TOKENS,
        )

        # 流式接收响应
        async for chunk in response:
            event = self._message_builder.append_openai_delta(chunk)
            if event:
                yield event  # 实时返回给前端

        # 执行工具调用
        tool_use_blocks = self._message_builder.get_unresolved_tool_use_blocks()
        if not tool_use_blocks:
            return  # 完成

        for tool_use in tool_use_blocks:
            result = await self._execute_tool_use(tool_use)
            yield self._message_builder.append_tool_result(
                tool_use["id"],
                result
            )

流式响应的优势

  1. 实时反馈:用户可以看到 AI 的思考过程
  2. 降低延迟感知:即使总时间相同,流式输出让用户感觉更快
  3. 可中断:用户可以随时停止不需要的查询
  4. 透明度:显示工具调用和执行结果,增强可信度

六、错误处理与重试

6.1 工具执行错误处理

async def _execute_tool_use(self, tool_use: dict) -> str:
    func_name = tool_use["name"]
    func_args_str = tool_use["input"]

    func = self._actions.get(func_name)
    if not func:
        return json.dumps({"error": f"未知工具: {func_name}"})

    try:
        args = json.loads(func_args_str)
        if iscoroutinefunction(func):
            result = await func(**args)
        else:
            result = func(**args)
        return result
    except Exception as e:
        return json.dumps({"error": str(e)})

错误处理策略

6.2 输出解析错误处理

if last_block["type"] == "text":
    if self.output_parser:
        try:
            self.output_parser(last_block["text"])
            break
        except Exception as e:
            # 将解析错误反馈给 LLM
            async for chunk in self.step(str(e)):
                yield chunk

自动修正机制

  1. LLM 输出不符合预期格式
  2. Output Parser 抛出异常
  3. 将错误信息作为新的用户消息发送给 LLM
  4. LLM 根据错误提示修正输出
  5. 重复直到输出正确或达到最大重试次数

6.3 限制与保护

# 最大工具调用次数
if self.tool_called_count >= self.max_tool_calls:
    raise CannotHandle("超过最大工具调用次数")

# 最大完成次数
if self.completion_count >= self.max_completions:
    raise CannotHandle("超过最大完成次数")

保护机制

七、最佳实践

7.1 设计 Agent 工具

好的工具设计

def execute_sql(
    self,
    sql: str = Field(..., description="要执行的 SQL 查询")
) -> str:
    """
    执行 SQL 查询

    ## 代码风格
    SQL 开头用注释描述查询目的:
    ```sql
    -- 上周华东地区所有门店的销量
    SELECT ...
    ```

    ## DataFrame 管理
    查询结果会自动存储到数据工作区
    """
    ...

设计要点

  1. 清晰的函数签名:使用 Pydantic Field 添加参数描述
  2. 详细的文档字符串:说明工具用途、使用方法、注意事项
  3. 结构化返回:返回 JSON 格式的结果,便于 LLM 解析
  4. 错误信息:返回有意义的错误信息,帮助 LLM 理解问题

7.2 编写 System Prompt

Prompt 结构

你是一个数据分析 Agent,负责根据用户问题生成 SQL 查询。

## 可用工具
- show_table: 查看表结构
- search_metadata: 搜索相关表
- execute_sql: 执行 SQL 查询

## 工作流程
1. 使用 search_metadata 找到相关表
2. 使用 show_table 查看表结构
3. 编写并执行 SQL 查询
4. 调用 submit_node 提交结果

## 数据库元数据
{{ meta }}

## 注意事项
- SQL 必须以注释开头说明查询目的
- 优先使用已有的表和字段
- 如果找不到相关数据,设置 df_id 为 None

Prompt 要点

  1. 明确角色:告诉 Agent 它是谁,要做什么
  2. 工具说明:列出可用工具及其用途
  3. 工作流程:给出推荐的执行步骤
  4. 上下文注入:使用模板变量注入动态信息
  5. 约束条件:说明输出格式、注意事项等

7.3 管理对话历史

历史消息持久化

# 导出对话历史
messages = agent.get_messages()

# 保存到数据库
save_to_database(messages)

# 恢复对话
agent = DataNodeAgentV2.create(
    datasource=datasource,
    history_messages=messages
)

历史消息的作用

7.4 优化性能

Prompt Caching

messages = self._message_builder.dump_openai(cache_control=True)

启用 Prompt Caching 可以:

并发执行

# 并发执行多个独立的工具调用
results = await asyncio.gather(
    self.search_metadata(["销售"], ["华东"]),
    self.search_metadata(["库存"], ["仓库"]),
)

数据采样

# 只返回前 N 行数据给 LLM
"content_head": dataframe_to_dicts(
    dataframe,
    DATAFRAME_SERIALIZE_MAX_ROWS
)

八、总结

AskTable 的 AI Agent 系统展示了如何构建一个生产级的 Agent 架构:

  1. 模块化设计:基础 Agent 类 + 专用 Agent 实现
  2. 工具驱动:通过动态注册工具扩展能力
  3. 流式响应:实时反馈提升用户体验
  4. 错误处理:优雅处理失败并自动重试
  5. 状态管理:维护对话历史和数据工作区
  6. 性能优化:Prompt Caching、数据采样、并发执行

通过理解这些设计原则和实现细节,你可以构建自己的 AI Agent 系统,或者更好地使用 AskTable 的 Agent 能力。

相关资源