
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
在 AI 驱动的数据分析系统中,让 LLM 生成的 Python 代码安全执行是一个核心挑战。直接在主进程中执行用户代码存在巨大的安全风险。AskTable 的 PythonExecutor 采用了远程执行架构,实现了代码的安全隔离执行。
每个执行器实例运行在独立的环境中:
class PythonExecutor:
def __init__(self, packages: list[str] = DEFAULT_PACKAGES):
self._base_url = BASE_URL
self._packages = packages
# 创建隔离的执行器实例
self._executor_id = self._create_executor(packages)
隔离特性:
只允许使用创建时指定的包:
DEFAULT_PACKAGES = ["pandas", "numpy", "scipy"]
# 创建执行器时指定允许的包
executor = PythonExecutor(packages=["pandas", "numpy", "scipy"])
安全优势:
os、subprocess 等危险包requests、urllib 等网络包pickle(除了内部序列化)代码在远程服务中执行,与主进程完全隔离:
def _execute(self, code: str) -> CodeExecutionResponse:
response = requests.post(
f"{self._base_url}/python/executor/{self._executor_id}/code",
json={"code": code},
)
return response.json()
def _create_executor(self, packages: list[str]) -> str:
"""调用 POST /python/executor 创建执行器"""
response = requests.post(
f"{self._base_url}/python/executor",
json={"packages": packages},
)
if response.status_code != 201:
raise Exception(f"Failed to create executor: {response.text}")
data = response.json()
return data["executor_id"]
服务端实现(伪代码):
@app.post("/python/executor")
def create_executor(request: CreateExecutorRequest):
# 1. 创建隔离环境
executor_id = generate_unique_id()
env = create_isolated_environment(executor_id)
# 2. 安装指定的包
for package in request.packages:
if package not in ALLOWED_PACKAGES:
raise ValueError(f"Package {package} is not allowed")
env.install_package(package)
# 3. 初始化全局变量
env.globals["_saved_dataframes"] = {}
env.globals["load_dataframe"] = load_dataframe_func
env.globals["save_dataframe"] = save_dataframe_func
# 4. 保存执行器
executors[executor_id] = env
return {"executor_id": executor_id}
def execute(
self,
code: str,
variables: dict[str, Any] = {},
dataframes: dict[str, pd.DataFrame] = {},
) -> CodeExecutionResponse:
# 1. 注入变量
if variables:
pickled_vars = base64.b64encode(pickle.dumps(variables)).decode()
code_inject = f"""
vars_dict = pickle.loads(base64.b64decode('{pickled_vars}'))
locals().update(vars_dict)
"""
self._execute(code=code_inject)
# 2. 注入 DataFrame
if dataframes:
self.send_dataframes(dataframes)
# 3. 执行用户代码
return self._execute(code=code)
def close(self):
"""销毁执行器,释放资源"""
response = requests.delete(
f"{self._base_url}/python/executor/{self._executor_id}"
)
if response.status_code != 204:
raise Exception(f"Failed to close executor: {response.text}")
使用 Pickle + Base64 序列化:
def send_dataframes(self, dataframes: dict[str, pd.DataFrame]) -> None:
for df_name, df in dataframes.items():
# 序列化 DataFrame
pickled_df = base64.b64encode(pickle.dumps(df)).decode()
# 注入到执行环境
code = dedent(f"""
import pickle
import base64
df = pickle.loads(base64.b64decode('{pickled_df}'))
_saved_dataframes['{df_name}'] = {{
"df": df,
"description": "initial dataframe"
}}
""")
self._execute(code=code)
为什么使用 Pickle + Base64?
def _execute(self, code: str) -> CodeExecutionResponse:
response = requests.post(
f"{self._base_url}/python/executor/{self._executor_id}/code",
json={"code": code},
)
response_json = response.json()
# 反序列化 DataFrame
dataframes = {}
for df_name, df_data in response_json.get("dataframes", {}).items():
raw_df = pickle.loads(base64.b64decode(df_data))
if raw_df:
dataframes[df_name] = ResponseDataframe(
df=raw_df.get("df"),
description=raw_df.get("description"),
sql=None,
)
return CodeExecutionResponse(
stdout=response_json.get("stdout"),
error=response_json.get("error"),
executor_id=response_json.get("executor_id"),
dataframes=dataframes,
)
执行器提供两个内置函数用于 DataFrame 管理:
def load_dataframe(df_id: str) -> pd.DataFrame:
"""加载 DataFrame"""
if df_id not in _saved_dataframes:
raise ValueError(f"DataFrame {df_id} not found")
return _saved_dataframes[df_id]["df"]
def save_dataframe(df: pd.DataFrame, description: str) -> str:
"""保存 DataFrame 并返回 ID"""
df_id = f"df_{generate_id()}"
_saved_dataframes[df_id] = {
"df": df,
"description": description
}
return df_id
executor = PythonExecutor(packages=["pandas", "numpy"])
# 发送原始数据
executor.send_dataframes({"df_raw": raw_df})
# 执行清洗代码
code = """
import pandas as pd
import numpy as np
# 加载数据
df = load_dataframe('df_raw')
# 数据清洗
df_cleaned = df.copy()
df_cleaned = df_cleaned.dropna()
df_cleaned['date'] = pd.to_datetime(df_cleaned['date'])
df_cleaned = df_cleaned[df_cleaned['amount'] > 0]
# 保存结果
df_id = save_dataframe(df_cleaned, "清洗后的数据")
print(f"清洗完成,共 {len(df_cleaned)} 条记录")
print(f"DataFrame ID: {df_id}")
"""
result = executor.execute(code)
print(result["stdout"])
# 输出: 清洗完成,共 1234 条记录
# DataFrame ID: df_abc123
# 获取清洗后的数据
cleaned_df = result["dataframes"]["df_abc123"]["df"]
executor = PythonExecutor(packages=["pandas", "numpy", "scipy"])
code = """
import pandas as pd
import numpy as np
from scipy import stats
# 加载数据
df = load_dataframe('df_sales')
# 基础统计
print("=== 基础统计 ===")
print(f"平均值: {df['amount'].mean():.2f}")
print(f"中位数: {df['amount'].median():.2f}")
print(f"标准差: {df['amount'].std():.2f}")
# 相关性分析
print("\\n=== 相关性分析 ===")
corr = df[['amount', 'quantity', 'price']].corr()
print(corr)
# 假设检验
print("\\n=== 假设检验 ===")
group_a = df[df['region'] == 'A']['amount']
group_b = df[df['region'] == 'B']['amount']
t_stat, p_value = stats.ttest_ind(group_a, group_b)
print(f"t-statistic: {t_stat:.4f}")
print(f"p-value: {p_value:.4f}")
"""
result = executor.execute(code)
print(result["stdout"])
code = """
import pandas as pd
# 加载数据
df = load_dataframe('df_orders')
# 时间序列聚合
df['date'] = pd.to_datetime(df['date'])
df['month'] = df['date'].dt.to_period('M')
monthly = df.groupby('month').agg({
'order_id': 'count',
'amount': 'sum',
'customer_id': 'nunique'
}).reset_index()
monthly.columns = ['月份', '订单数', '销售额', '客户数']
# 保存结果
df_id = save_dataframe(monthly, "月度汇总")
print(f"汇总完成,共 {len(monthly)} 个月")
"""
result = executor.execute(code)
monthly_df = result["dataframes"][list(result["dataframes"].keys())[0]]["df"]
# 注入变量
variables = {
"threshold": 1000,
"start_date": "2024-01-01",
"end_date": "2024-12-31"
}
code = """
import pandas as pd
df = load_dataframe('df_sales')
# 使用注入的变量
df_filtered = df[
(df['amount'] > threshold) &
(df['date'] >= start_date) &
(df['date'] <= end_date)
]
print(f"筛选条件: amount > {threshold}, date in [{start_date}, {end_date}]")
print(f"筛选后记录数: {len(df_filtered)}")
df_id = save_dataframe(df_filtered, "筛选后的数据")
"""
result = executor.execute(code, variables=variables)
ALLOWED_PACKAGES = [
"pandas", "numpy", "scipy",
"scikit-learn", "statsmodels",
# 禁止: os, subprocess, requests, socket, etc.
]
# 服务端配置
EXECUTION_TIMEOUT = 30 # 30 秒超时
@app.post("/python/executor/{executor_id}/code")
async def execute_code(executor_id: str, request: ExecuteCodeRequest):
try:
result = await asyncio.wait_for(
execute_in_env(executor_id, request.code),
timeout=EXECUTION_TIMEOUT
)
return result
except asyncio.TimeoutError:
return {"error": "Execution timeout"}
# Docker 容器资源限制
docker run \
--memory="512m" \
--cpus="1.0" \
--network="none" \
python-executor
# 只读文件系统
docker run \
--read-only \
--tmpfs /tmp:rw,noexec,nosuid,size=100m \
python-executor
class DataAnalysisAgent:
def __init__(self):
# 创建一次,多次使用
self.executor = PythonExecutor(packages=["pandas", "numpy", "scipy"])
def analyze(self, df: pd.DataFrame):
# 复用执行器
self.executor.send_dataframes({"df": df})
result = self.executor.execute(analysis_code)
return result
# 第一次发送
executor.send_dataframes({"df_1": df1, "df_2": df2})
# 后续代码可直接使用,无需重复发送
code1 = "df = load_dataframe('df_1'); ..."
code2 = "df = load_dataframe('df_2'); ..."
# 合并多个操作为一次执行
code = """
df1 = load_dataframe('df_1')
df2 = load_dataframe('df_2')
# 操作 1
result1 = df1.groupby('region')['sales'].sum()
# 操作 2
result2 = df2.merge(df1, on='id')
# 操作 3
final = result2[result2['amount'] > 1000]
save_dataframe(final, "最终结果")
"""
# 使用连接池复用 HTTP 连接
session = requests.Session()
adapter = HTTPAdapter(pool_connections=10, pool_maxsize=20)
session.mount('http://', adapter)
result = executor.execute(code)
if result["error"]:
print(f"执行错误: {result['error']}")
else:
print(f"执行成功: {result['stdout']}")
try:
executor.send_dataframes({"df": df})
except Exception as e:
print(f"DataFrame 序列化失败: {e}")
try:
result = executor.execute(code, timeout=30)
except TimeoutError:
print("代码执行超时")
AskTable 的 PythonExecutor 通过远程执行架构,实现了安全、高效的 Python 代码执行:
这种架构不仅保证了安全性,还为 LLM 生成的数据分析代码提供了可靠的执行环境。