
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
在 AI 驱动的数据分析系统中,如何让 LLM 生成的代码安全、高效地执行,是一个核心挑战。AskTable 的 ReportAgent 采用了一种创新的架构:JSX 编译 + Python 沙箱执行,实现了数据报告的动态生成。
ReportAgent 负责生成数据报告组件,其核心流程包括:
load_dataframe() 引用,验证数据源class ReportAgent(DBAgent):
def __init__(self, datasource: DataSourceAdmin, assumed_role: RoleAdmin | None = None):
super().__init__(
prompt_name="agent/report_generator",
datasource=datasource,
assumed_role=assumed_role,
)
self.add_tool(self.show_table)
self.add_tool(self.search_metadata)
self.add_tool(self.execute_sql)
self.set_output_parser(self.output_parser)
self.compiled_code: str | None = None
self.source_code: str | None = None
self.referenced_dataframes: list[str] = []
LLM 生成的代码需要包裹在
<code>...</code> 标签中:
def output_parser(self, output: str) -> None:
# 1. 提取代码块
pattern = r"<code>(.*?)</code>"
match = re.search(pattern, output, re.DOTALL)
if not match:
raise ValueError("Invalid output format. Expected: <code>...</code>")
code = match.group(1).strip()
# 2. 提取 DataFrame 引用
load_df_pattern = r"load_dataframe\(\s*['\"]( df_[A-Za-z0-9]+)['\"]\s*\)"
referenced_dataframes = re.findall(load_df_pattern, code)
if not referenced_dataframes:
raise ValueError("No load_dataframe('df_id') pattern found in code.")
# 3. 编译 JSX 代码
self.compiled_code = compile_jsx(code)
self.source_code = code
# 4. 验证 DataFrame 是否存在
missing_ids = set(referenced_dataframes) - set(self.data_workspace.keys())
if missing_ids:
raise ValueError(f"Referenced dataframes {missing_ids} are not in the data workspace")
self.referenced_dataframes = referenced_dataframes
JSX 编译通过远程服务实现,避免在 Python 环境中引入 Node.js 依赖:
BASE_URL = "http://localhost:5300/jsx"
def compile_jsx(code: str) -> str:
response = requests.post(BASE_URL, json={"code": code})
if response.status_code != 200:
raise Exception(response.json()["error"])
return response.json()["compiledCode"]
编译服务使用 Babel 进行 JSX 转译:
// Node.js 编译服务
const express = require('express');
const babel = require('@babel/core');
app.post('/jsx', (req, res) => {
const { code } = req.body;
try {
const result = babel.transformSync(code, {
presets: ['@babel/preset-react'],
plugins: ['@babel/plugin-transform-modules-commonjs']
});
res.json({ compiledCode: result.code });
} catch (error) {
res.status(400).json({ error: error.message });
}
});
<code>
import { BarChart } from '@/components/charts';
function SalesReport() {
const data = load_dataframe('df_abc123');
return (
<div className="report">
<h2>月度销售报告</h2>
<BarChart
data={data}
xField="month"
yField="sales"
title="2024 年销售趋势"
/>
<p>总销售额: {data.reduce((sum, row) => sum + row.sales, 0)}</p>
</div>
);
}
</code>
对于数据处理和分析任务,ReportAgent 支持执行 Python 代码:
class CorrAnalyzerAgent(DBAgent):
def __init__(self, datasource, assumed_role=None, preference=None, user_profile=None):
super().__init__(
prompt_name="agent/analysis_report_generator",
datasource=datasource,
assumed_role=assumed_role,
)
self.add_tool(self.execute_python)
self.executor = PythonExecutor(packages=["pandas", "numpy", "scipy"])
class PythonExecutor:
def __init__(self, packages: list[str] = DEFAULT_PACKAGES):
self._base_url = BASE_URL
self._packages = packages
self._executor_id = self._create_executor(packages)
def _create_executor(self, packages: list[str]) -> str:
"""创建隔离的执行器实例"""
response = requests.post(
f"{self._base_url}/python/executor",
json={"packages": packages},
)
if response.status_code != 201:
raise Exception(f"Failed to create executor: {response.text}")
return response.json()["executor_id"]
使用 Pickle + Base64 进行 DataFrame 传输:
def send_dataframes(self, dataframes: dict[str, pd.DataFrame]) -> None:
for df_name, df in dataframes.items():
# 序列化 DataFrame
pickled_df = base64.b64encode(pickle.dumps(df)).decode()
# 注入到执行环境
code = dedent(f"""
import pickle
import base64
df = pickle.loads(base64.b64decode('{pickled_df}'))
_saved_dataframes['{df_name}'] = {{
"df": df,
"description": "initial dataframe"
}}
""")
self._execute(code=code)
def execute(self, code: str, variables: dict = {}, dataframes: dict = {}) -> CodeExecutionResponse:
# 1. 注入变量
if variables:
pickled_vars = base64.b64encode(pickle.dumps(variables)).decode()
code_inject = f"""
vars_dict = pickle.loads(base64.b64decode('{pickled_vars}'))
locals().update(vars_dict)
"""
self._execute(code=code_inject)
# 2. 注入 DataFrame
if dataframes:
self.send_dataframes(dataframes)
# 3. 执行用户代码
return self._execute(code=code)
def _execute(self, code: str) -> CodeExecutionResponse:
response = requests.post(
f"{self._base_url}/python/executor/{self._executor_id}/code",
json={"code": code},
)
response_json = response.json()
# 反序列化 DataFrame
dataframes = {}
for df_name, df_data in response_json.get("dataframes", {}).items():
raw_df = pickle.loads(base64.b64decode(df_data))
if raw_df:
dataframes[df_name] = ResponseDataframe(
df=raw_df.get("df"),
description=raw_df.get("description"),
sql=None,
)
return CodeExecutionResponse(
stdout=response_json.get("stdout"),
error=response_json.get("error"),
executor_id=response_json.get("executor_id"),
dataframes=dataframes,
)
code = """
import pandas as pd
# 加载数据
df = load_dataframe('df_raw_sales')
# 数据清洗
df_cleaned = df.dropna()
df_cleaned['date'] = pd.to_datetime(df_cleaned['date'])
df_cleaned = df_cleaned[df_cleaned['amount'] > 0]
# 保存结果
df_id = save_dataframe(df_cleaned, "清洗后的销售数据")
print(f"清洗完成,共 {len(df_cleaned)} 条记录")
"""
result = executor.execute(code)
print(result["stdout"]) # "清洗完成,共 1234 条记录"
code = """
import pandas as pd
import numpy as np
from scipy import stats
# 加载数据
df = load_dataframe('df_sales')
# 计算统计指标
mean_sales = df['amount'].mean()
median_sales = df['amount'].median()
std_sales = df['amount'].std()
# 相关性分析
corr = df[['amount', 'quantity']].corr()
print(f"平均销售额: {mean_sales:.2f}")
print(f"中位数: {median_sales:.2f}")
print(f"标准差: {std_sales:.2f}")
print(f"相关系数:\\n{corr}")
"""
result = executor.execute(code)
code = """
import pandas as pd
# 加载数据
df = load_dataframe('df_orders')
# 按月汇总
df['month'] = pd.to_datetime(df['date']).dt.to_period('M')
monthly_sales = df.groupby('month').agg({
'amount': 'sum',
'order_id': 'count'
}).reset_index()
monthly_sales.columns = ['月份', '销售额', '订单数']
# 保存结果
df_id = save_dataframe(monthly_sales, "月度销售汇总")
"""
result = executor.execute(code)
每个执行器实例只能使用创建时指定的包:
executor = PythonExecutor(packages=["pandas", "numpy", "scipy"])
# 无法使用 requests、os 等危险包
每个执行器实例独立运行,互不干扰:
executor1 = PythonExecutor(packages=["pandas"])
executor2 = PythonExecutor(packages=["numpy"])
# executor1 和 executor2 完全隔离
执行器服务端设置超时限制,防止无限循环:
# 服务端配置
EXECUTION_TIMEOUT = 30 # 30 秒超时
限制内存和 CPU 使用:
# Docker 容器资源限制
docker run --memory="512m" --cpus="1.0" python-executor
同一会话中复用执行器实例,避免重复创建:
class CorrAnalyzerAgent:
def __init__(self):
# 创建一次,多次使用
self.executor = PythonExecutor(packages=["pandas", "numpy", "scipy"])
执行器内部缓存 DataFrame,避免重复传输:
# 第一次传输
executor.send_dataframes({"df_1": df})
# 后续代码可直接使用
code = "df = load_dataframe('df_1')"
合并多个小操作为一次执行:
code = """
df1 = load_dataframe('df_1')
df2 = load_dataframe('df_2')
result = pd.merge(df1, df2, on='id')
save_dataframe(result, "合并结果")
"""
AskTable 的 JSX 编译与 Python 沙箱执行系统,通过以下技术实现了安全、高效的代码执行:
这种架构不仅保证了安全性,还为 LLM 生成的代码提供了可靠的执行环境,是 AI 驱动数据分析系统的关键基础设施。