
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
AskTable 提供了完整的 RESTful API,让你可以将 AI 数据分析能力集成到自己的应用中。本文将系统介绍 AskTable API 的使用方法,帮助你快速上手。
Base URL:
https://api.asktable.com/api/v1
认证方式:
Authorization: Bearer YOUR_API_KEY响应格式:
application/jsonAskTable API 按功能分为以下几类:
| 分类 | 说明 | 主要端点 |
|---|---|---|
| 认证 | API Key 管理 | |
| 数据源 | 数据源 CRUD、连接测试 | |
| 元数据 | 表结构、字段管理 | |
| 索引 | 向量索引、值索引 | |
| 聊天 | 多轮对话 | |
| 单轮查询 | 问题转 SQL/答案/图表 | |
| Canvas | 画卷和节点管理 | |
| 角色权限 | 角色和策略管理 | , |
| 训练数据 | 训练样本管理 | |
| 机器人 | 聊天机器人配置 | |
API Key 权限范围:
priv_admin:管理员权限,可以管理数据源、角色、策略等priv_asker:查询权限,可以发起查询、创建聊天等priv_visitor:访客权限,只能访问指定的聊天Python 示例:
import requests
API_KEY = "your_api_key_here"
BASE_URL = "https://api.asktable.com/api/v1"
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
response = requests.get(f"{BASE_URL}/datasources", headers=headers)
print(response.json())
cURL 示例:
curl -X GET "https://api.asktable.com/api/v1/datasources" \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json"
API 使用标准 HTTP 状态码:
| 状态码 | 说明 |
|---|---|
| 200 | 成功 |
| 201 | 创建成功 |
| 204 | 删除成功(无内容) |
| 400 | 请求参数错误 |
| 401 | 未认证(API Key 无效) |
| 403 | 权限不足 |
| 404 | 资源不存在 |
| 500 | 服务器错误 |
错误响应格式:
{
"detail": "错误描述信息"
}
端点:
POST /datasources
请求体:
{
"name": "我的 MySQL 数据库",
"engine": "mysql",
"access_config": {
"host": "localhost",
"port": 3306,
"user": "root",
"password": "password",
"database": "mydb"
}
}
支持的数据库类型:
mysql, postgresql, sqlite, sqlserver, oracleclickhouse, snowflake, bigquery, redshiftmongodb, elasticsearchexcel, csv, parquetPython 示例:
def create_datasource(name, engine, access_config):
response = requests.post(
f"{BASE_URL}/datasources",
headers=headers,
json={
"name": name,
"engine": engine,
"access_config": access_config
}
)
return response.json()
# 创建 MySQL 数据源
ds = create_datasource(
name="Production DB",
engine="mysql",
access_config={
"host": "db.example.com",
"port": 3306,
"user": "readonly",
"password": "secret",
"database": "sales"
}
)
print(f"数据源 ID: {ds['id']}")
端点:
POST /datasources/test-connection
请求体:
{
"engine": "mysql",
"access_config": {
"host": "localhost",
"port": 3306,
"user": "root",
"password": "password",
"database": "mydb"
}
}
响应:
{
"success": true,
"message": "连接成功"
}
端点:
GET /datasources
查询参数:
name:按名称过滤(可选)page:页码(默认 1)size:每页数量(默认 50)响应:
{
"items": [
{
"id": "ds_abc123",
"name": "Production DB",
"engine": "mysql",
"meta_status": "success",
"schema_count": 1,
"table_count": 15,
"field_count": 120,
"created_at": "2024-01-01T00:00:00Z"
}
],
"total": 1,
"page": 1,
"size": 50
}
端点:
GET /datasources/{datasource_id}
响应:
{
"id": "ds_abc123",
"name": "Production DB",
"engine": "mysql",
"access_config": {
"host": "db.example.com",
"port": 3306,
"user": "readonly",
"database": "sales"
},
"meta_status": "success",
"schema_count": 1,
"table_count": 15,
"field_count": 120,
"sample_questions": [
"本月销售额",
"销量前10的产品"
],
"created_at": "2024-01-01T00:00:00Z",
"modified_at": "2024-01-01T00:00:00Z"
}
端点:
PATCH /datasources/{datasource_id}
请求体:
{
"name": "新名称",
"desc": "数据源描述",
"sample_questions": [
"本月销售额",
"销量前10的产品"
]
}
端点:
DELETE /datasources/{datasource_id}
响应:204 No Content
端点:
POST /datasources/{datasource_id}/meta/sync
说明:从数据库同步表结构到 AskTable
响应:
{
"job_id": "job_xyz789",
"status": "running"
}
端点:
GET /datasources/{datasource_id}/meta
响应:
{
"schemas": {
"public": {
"name": "public",
"tables": {
"orders": {
"name": "orders",
"description": "订单表",
"fields": {
"id": {
"name": "id",
"data_type": "INTEGER",
"description": "订单ID",
"visibility": true
},
"amount": {
"name": "amount",
"data_type": "DECIMAL",
"description": "订单金额",
"visibility": true
}
}
}
}
}
}
}
端点:
PATCH /datasources/{datasource_id}/meta/tables/{table_name}
请求体:
{
"description": "客户订单记录,包含订单基本信息、金额、状态等"
}
端点:
PATCH /datasources/{datasource_id}/meta/tables/{table_name}/fields/{field_name}
请求体:
{
"description": "订单状态:pending(待处理)、paid(已支付)、shipped(已发货)、completed(已完成)、cancelled(已取消)",
"visibility": true
}
端点:
PUT /datasources/{datasource_id}/meta
请求体:
{
"schemas": {
"public": {
"tables": {
"orders": {
"description": "订单表",
"fields": {
"status": {
"description": "订单状态",
"visibility": true
}
}
}
}
}
}
}
端点:
POST /single-turn/q2s
请求体:
{
"datasource_id": "ds_abc123",
"question": "上周华东地区的销售额",
"role_id": "role_xyz",
"role_variables": {
"user_department": "销售部"
},
"parameterize": false
}
响应:
{
"id": "q2s_123",
"question": "上周华东地区的销售额",
"sql": "SELECT SUM(amount) as total_sales\nFROM sales\nWHERE region = '华东'\n AND order_date >= DATE_SUB(CURDATE(), INTERVAL 1 WEEK)\n AND order_date < CURDATE()",
"status": "success",
"timing": {
"total_duration": 2.5,
"llm_duration": 1.8
}
}
Python 示例:
def question_to_sql(datasource_id, question, role_id=None):
response = requests.post(
f"{BASE_URL}/single-turn/q2s",
headers=headers,
json={
"datasource_id": datasource_id,
"question": question,
"role_id": role_id
}
)
return response.json()
# 使用示例
result = question_to_sql(
datasource_id="ds_abc123",
question="本月销售额"
)
print(f"生成的 SQL: {result['sql']}")
端点:
POST /single-turn/q2a
请求体:
{
"datasource_id": "ds_abc123",
"question": "上周华东地区的销售额"
}
响应:
{
"id": "q2a_123",
"question": "上周华东地区的销售额",
"sql": "SELECT SUM(amount) as total_sales FROM ...",
"answer": "上周华东地区的销售额为 1,234,567 元",
"dataframe": {
"columns": ["total_sales"],
"data": [[1234567]]
},
"status": "success"
}
端点:
POST /single-turn/q2w
请求体:
{
"datasource_id": "ds_abc123",
"question": "各地区销售额对比"
}
响应:
{
"id": "q2w_123",
"question": "各地区销售额对比",
"sql": "SELECT region, SUM(amount) as total FROM sales GROUP BY region",
"chart_code": "const data = load_dataframe('df_abc');\n<BarChart data={data} x=\"region\" y=\"total\" />",
"dataframe": {
"columns": ["region", "total"],
"data": [
["华东", 1000000],
["华北", 800000],
["华南", 900000]
]
},
"status": "success"
}
端点:
POST /chats
请求体:
{
"bot_id": "bot_abc123"
}
响应:
{
"id": "chat_xyz789",
"bot_id": "bot_abc123",
"status": "active",
"created_at": "2024-01-01T00:00:00Z"
}
端点:
POST /chats/{chat_id}/messages
请求体:
{
"question": "本月销售额"
}
响应:
{
"id": "msg_123",
"chat_id": "chat_xyz789",
"role": "assistant",
"content": {
"text": "本月销售额为 2,345,678 元",
"sql": "SELECT SUM(amount) FROM orders WHERE ...",
"dataframe": {
"columns": ["total"],
"data": [[2345678]]
}
},
"created_at": "2024-01-01T00:00:00Z"
}
端点:
GET /chats/{chat_id}
响应:
{
"id": "chat_xyz789",
"bot_id": "bot_abc123",
"status": "active",
"messages": [
{
"id": "msg_1",
"role": "human",
"content": {"text": "本月销售额"},
"created_at": "2024-01-01T00:00:00Z"
},
{
"id": "msg_2",
"role": "assistant",
"content": {
"text": "本月销售额为 2,345,678 元",
"sql": "SELECT SUM(amount) FROM orders WHERE ..."
},
"created_at": "2024-01-01T00:00:01Z"
}
]
}
端点:
DELETE /chats/{chat_id}
响应:204 No Content
端点:
POST /canvas
请求体:
{
"name": "销售分析",
"datasource_id": "ds_abc123"
}
响应:
{
"id": "canvas_123",
"name": "销售分析",
"datasource_id": "ds_abc123",
"created_at": "2024-01-01T00:00:00Z"
}
端点:
POST /canvas/{canvas_id}/nodes
请求体:
{
"type": "data",
"question": "本月销售额",
"parent_ids": []
}
响应:
{
"id": "node_123",
"type": "data",
"question": "本月销售额",
"status": "pending",
"created_at": "2024-01-01T00:00:00Z"
}
端点:
POST /canvas/{canvas_id}/nodes/{node_id}/run
响应:Server-Sent Events (SSE) 流
data: {"type": "text", "text": "正在搜索相关表..."}
data: {"type": "text", "text": "找到表: sales"}
data: {"type": "tool_use", "tool": "execute_sql", "args": {...}}
data: {"type": "tool_result", "result": {...}}
data: {"type": "complete", "data": {...}}
Python 示例(流式接收):
import sseclient
def run_node_stream(canvas_id, node_id):
response = requests.post(
f"{BASE_URL}/canvas/{canvas_id}/nodes/{node_id}/run",
headers=headers,
stream=True
)
client = sseclient.SSEClient(response)
for event in client.events():
data = json.loads(event.data)
print(f"事件类型: {data['type']}")
if data['type'] == 'complete':
return data['data']
# 使用示例
result = run_node_stream("canvas_123", "node_456")
print(f"节点结果: {result}")
端点:
GET /canvas/{canvas_id}/nodes/{node_id}
响应:
{
"id": "node_123",
"type": "data",
"question": "本月销售额",
"status": "completed",
"data": {
"sql": "SELECT SUM(amount) FROM orders WHERE ...",
"description": "本月销售额",
"dataframe": {
"columns": ["total"],
"data": [[2345678]]
}
},
"created_at": "2024-01-01T00:00:00Z",
"completed_at": "2024-01-01T00:00:05Z"
}
端点:
POST /canvas/{canvas_id}/nodes/batch-refresh
请求体:
{
"node_ids": ["node_1", "node_2", "node_3"]
}
说明:按拓扑顺序刷新多个节点
端点:
POST /roles
请求体:
{
"name": "销售部角色",
"variables": [
{
"name": "user_department",
"type": "string",
"required": true
}
]
}
端点:
POST /policies
请求体:
{
"name": "销售部数据访问策略",
"role_id": "role_123",
"datasource_id": "ds_abc123",
"rules": [
{
"table": "sales",
"row_filter": "department = '{{user_department}}'"
}
]
}
在查询时传入
role_id 和 role_variables:
result = question_to_sql(
datasource_id="ds_abc123",
question="本月销售额",
role_id="role_123",
role_variables={
"user_department": "销售部"
}
)
生成的 SQL 会自动添加行级过滤:
SELECT SUM(amount)
FROM sales
WHERE department = '销售部' -- 自动添加
AND order_date >= '2024-01-01'
端点:
POST /training
请求体:
{
"datasource_id": "ds_abc123",
"question": "本月 GMV",
"sql": "SELECT SUM(amount) as gmv FROM orders WHERE order_date >= DATE_TRUNC('month', CURRENT_DATE)"
}
端点:
GET /training?datasource_id=ds_abc123
响应:
{
"items": [
{
"id": "train_123",
"question": "本月 GMV",
"sql": "SELECT SUM(amount) as gmv FROM ...",
"created_at": "2024-01-01T00:00:00Z"
}
]
}
端点:
DELETE /training/{training_id}
def safe_api_call(func, *args, **kwargs):
try:
response = func(*args, **kwargs)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
print("API Key 无效或已过期")
elif e.response.status_code == 403:
print("权限不足")
elif e.response.status_code == 404:
print("资源不存在")
else:
print(f"API 错误: {e.response.json()}")
raise
except requests.exceptions.RequestException as e:
print(f"网络错误: {e}")
raise
# 使用示例
result = safe_api_call(
requests.post,
f"{BASE_URL}/single-turn/q2s",
headers=headers,
json={"datasource_id": "ds_123", "question": "本月销售额"}
)
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
def api_call_with_retry(url, **kwargs):
response = requests.post(url, headers=headers, **kwargs)
response.raise_for_status()
return response.json()
# 使用示例
result = api_call_with_retry(
f"{BASE_URL}/single-turn/q2s",
json={"datasource_id": "ds_123", "question": "本月销售额"}
)
import asyncio
import aiohttp
async def batch_query(questions):
async with aiohttp.ClientSession() as session:
tasks = []
for question in questions:
task = session.post(
f"{BASE_URL}/single-turn/q2s",
headers=headers,
json={
"datasource_id": "ds_123",
"question": question
}
)
tasks.append(task)
responses = await asyncio.gather(*tasks)
return [await r.json() for r in responses]
# 使用示例
questions = [
"本月销售额",
"本月订单数",
"本月新增用户数"
]
results = asyncio.run(batch_query(questions))
for result in results:
print(f"{result['question']}: {result['sql']}")
from functools import lru_cache
import hashlib
@lru_cache(maxsize=100)
def cached_query(datasource_id, question):
response = requests.post(
f"{BASE_URL}/single-turn/q2s",
headers=headers,
json={
"datasource_id": datasource_id,
"question": question
}
)
return response.json()
# 使用示例
result1 = cached_query("ds_123", "本月销售额") # API 调用
result2 = cached_query("ds_123", "本月销售额") # 从缓存返回
from asktable import AskTable
# 初始化客户端
client = AskTable(api_key="YOUR_API_KEY")
# 创建数据源
ds = client.datasources.create(
name="My Database",
engine="mysql",
access_config={
"host": "localhost",
"port": 3306,
"user": "root",
"password": "password",
"database": "mydb"
}
)
# 问题转 SQL
result = client.query.q2s(
datasource_id=ds.id,
question="本月销售额"
)
print(result.sql)
# 问题转答案
result = client.query.q2a(
datasource_id=ds.id,
question="本月销售额"
)
print(result.answer)
import { AskTable } from '@asktable/sdk';
// 初始化客户端
const client = new AskTable({
apiKey: 'YOUR_API_KEY'
});
// 创建数据源
const ds = await client.datasources.create({
name: 'My Database',
engine: 'mysql',
accessConfig: {
host: 'localhost',
port: 3306,
user: 'root',
password: 'password',
database: 'mydb'
}
});
// 问题转 SQL
const result = await client.query.q2s({
datasourceId: ds.id,
question: '本月销售额'
});
console.log(result.sql);
AskTable API 提供了完整的功能,让你可以:
通过 API,你可以将 AskTable 的 AI 数据分析能力无缝集成到自己的应用中,为用户提供自然语言查询数据的能力。