
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
本文将通过多个实战案例,展示如何使用 AskTable API 构建各种自定义应用。每个案例都包含完整的代码示例和详细说明。
构建一个简单的 Web 应用,让用户可以用自然语言查询数据库。
技术栈:
安装依赖:
pip install fastapi uvicorn requests python-dotenv
创建
:main.py
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import requests
import os
from dotenv import load_dotenv
load_dotenv()
app = FastAPI()
# 配置 CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# AskTable 配置
ASKTABLE_API_KEY = os.getenv("ASKTABLE_API_KEY")
ASKTABLE_BASE_URL = "https://api.asktable.com/api/v1"
DATASOURCE_ID = os.getenv("DATASOURCE_ID")
headers = {
"Authorization": f"Bearer {ASKTABLE_API_KEY}",
"Content-Type": "application/json"
}
class QueryRequest(BaseModel):
question: str
class QueryResponse(BaseModel):
question: str
sql: str
answer: str
data: list
@app.post("/api/query", response_model=QueryResponse)
async def query_data(request: QueryRequest):
"""处理用户查询"""
try:
# 调用 AskTable API
response = requests.post(
f"{ASKTABLE_BASE_URL}/single-turn/q2a",
headers=headers,
json={
"datasource_id": DATASOURCE_ID,
"question": request.question
}
)
response.raise_for_status()
result = response.json()
return QueryResponse(
question=result["question"],
sql=result["sql"],
answer=result["answer"],
data=result["dataframe"]["data"]
)
except requests.exceptions.HTTPError as e:
raise HTTPException(
status_code=e.response.status_code,
detail=e.response.json().get("detail", "API 错误")
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/datasources")
async def get_datasources():
"""获取数据源列表"""
try:
response = requests.get(
f"{ASKTABLE_BASE_URL}/datasources",
headers=headers
)
response.raise_for_status()
return response.json()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
创建
文件:.env
ASKTABLE_API_KEY=your_api_key_here
DATASOURCE_ID=your_datasource_id_here
运行后端:
python main.py
创建 React 应用:
npx create-react-app data-query-app --template typescript
cd data-query-app
npm install axios
创建
:src/App.tsx
import React, { useState } from 'react';
import axios from 'axios';
import './App.css';
interface QueryResult {
question: string;
sql: string;
answer: string;
data: any[][];
}
function App() {
const [question, setQuestion] = useState('');
const [result, setResult] = useState<QueryResult | null>(null);
const [loading, setLoading] = useState(false);
const [error, setError] = useState('');
const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault();
setLoading(true);
setError('');
setResult(null);
try {
const response = await axios.post('http://localhost:8000/api/query', {
question
});
setResult(response.data);
} catch (err: any) {
setError(err.response?.data?.detail || '查询失败');
} finally {
setLoading(false);
}
};
return (
<div className="App">
<header className="App-header">
<h1>数据查询助手</h1>
<form onSubmit={handleSubmit}>
<input
type="text"
value={question}
onChange={(e) => setQuestion(e.target.value)}
placeholder="输入你的问题,例如:本月销售额"
disabled={loading}
/>
<button type="submit" disabled={loading}>
{loading ? '查询中...' : '查询'}
</button>
</form>
{error && <div className="error">{error}</div>}
{result && (
<div className="result">
<h2>查询结果</h2>
<div className="answer">
<strong>答案:</strong>
<p>{result.answer}</p>
</div>
<div className="sql">
<strong>生成的 SQL:</strong>
<pre>{result.sql}</pre>
</div>
{result.data.length > 0 && (
<div className="data">
<strong>数据:</strong>
<table>
<tbody>
{result.data.map((row, i) => (
<tr key={i}>
{row.map((cell, j) => (
<td key={j}>{cell}</td>
))}
</tr>
))}
</tbody>
</table>
</div>
)}
</div>
)}
</header>
</div>
);
}
export default App;
运行前端:
npm start
创建一个 Slack 机器人,让团队成员可以在 Slack 中用自然语言查询数据。
功能:
安装依赖:
pip install slack-bolt requests python-dotenv
创建
:slack_bot.py
import os
import requests
from slack_bolt import App
from slack_bolt.adapter.socket_mode import SocketModeHandler
from dotenv import load_dotenv
load_dotenv()
# Slack 配置
app = App(token=os.environ.get("SLACK_BOT_TOKEN"))
# AskTable 配置
ASKTABLE_API_KEY = os.environ.get("ASKTABLE_API_KEY")
ASKTABLE_BASE_URL = "https://api.asktable.com/api/v1"
DATASOURCE_ID = os.environ.get("DATASOURCE_ID")
headers = {
"Authorization": f"Bearer {ASKTABLE_API_KEY}",
"Content-Type": "application/json"
}
def query_asktable(question: str) -> dict:
"""调用 AskTable API 查询数据"""
try:
response = requests.post(
f"{ASKTABLE_BASE_URL}/single-turn/q2a",
headers=headers,
json={
"datasource_id": DATASOURCE_ID,
"question": question
},
timeout=30
)
response.raise_for_status()
return response.json()
except Exception as e:
return {"error": str(e)}
@app.event("app_mention")
def handle_mention(event, say):
"""处理 @机器人 的消息"""
# 提取问题(去掉 @机器人 部分)
text = event["text"]
question = text.split(">", 1)[1].strip() if ">" in text else text
# 发送"正在查询"消息
say(f"正在查询:_{question}_")
# 调用 AskTable API
result = query_asktable(question)
if "error" in result:
say(f"❌ 查询失败:{result['error']}")
return
# 格式化响应
blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*问题:*\n{result['question']}"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*答案:*\n{result['answer']}"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*SQL:*\n```{result['sql']}```"
}
}
]
# 添加数据表格
if result.get("dataframe") and result["dataframe"].get("data"):
data = result["dataframe"]["data"]
columns = result["dataframe"]["columns"]
# 构建表格文本
table_text = " | ".join(columns) + "\n"
table_text += "-" * (len(table_text) - 1) + "\n"
for row in data[:10]: # 最多显示 10 行
table_text += " | ".join(str(cell) for cell in row) + "\n"
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*数据:*\n```{table_text}```"
}
})
say(blocks=blocks)
@app.command("/query")
def handle_query_command(ack, command, say):
"""处理 /query 命令"""
ack()
question = command["text"]
if not question:
say("请提供查询问题,例如:/query 本月销售额")
return
say(f"正在查询:_{question}_")
result = query_asktable(question)
if "error" in result:
say(f"❌ 查询失败:{result['error']}")
return
say(f"*答案:*\n{result['answer']}\n\n*SQL:*\n```{result['sql']}```")
if __name__ == "__main__":
handler = SocketModeHandler(app, os.environ["SLACK_APP_TOKEN"])
handler.start()
配置
:.env
SLACK_BOT_TOKEN=xoxb-your-bot-token
SLACK_APP_TOKEN=xapp-your-app-token
ASKTABLE_API_KEY=your_api_key_here
DATASOURCE_ID=your_datasource_id_here
运行机器人:
python slack_bot.py
app_mentions:readchat:writecommands创建一个定时任务,每天自动生成数据报告并发送邮件。
功能:
安装依赖:
pip install requests schedule jinja2 python-dotenv
创建
:report_generator.py
import os
import requests
import schedule
import time
from datetime import datetime
from jinja2 import Template
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import smtplib
from dotenv import load_dotenv
load_dotenv()
# AskTable 配置
ASKTABLE_API_KEY = os.getenv("ASKTABLE_API_KEY")
ASKTABLE_BASE_URL = "https://api.asktable.com/api/v1"
DATASOURCE_ID = os.getenv("DATASOURCE_ID")
# 邮件配置
SMTP_SERVER = os.getenv("SMTP_SERVER")
SMTP_PORT = int(os.getenv("SMTP_PORT", 587))
SMTP_USER = os.getenv("SMTP_USER")
SMTP_PASSWORD = os.getenv("SMTP_PASSWORD")
REPORT_RECIPIENTS = os.getenv("REPORT_RECIPIENTS").split(",")
headers = {
"Authorization": f"Bearer {ASKTABLE_API_KEY}",
"Content-Type": "application/json"
}
def query_asktable(question: str) -> dict:
"""调用 AskTable API 查询数据"""
try:
response = requests.post(
f"{ASKTABLE_BASE_URL}/single-turn/q2a",
headers=headers,
json={
"datasource_id": DATASOURCE_ID,
"question": question
},
timeout=30
)
response.raise_for_status()
return response.json()
except Exception as e:
return {"error": str(e), "question": question}
def generate_report():
"""生成数据报告"""
print(f"[{datetime.now()}] 开始生成报告...")
# 定义要查询的问题
questions = [
"昨日销售额",
"昨日订单数",
"昨日新增用户数",
"昨日销量前10的产品",
"昨日各地区销售额"
]
# 执行查询
results = []
for question in questions:
print(f" 查询: {question}")
result = query_asktable(question)
results.append(result)
# 生成 HTML 报告
html_template = """
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<style>
body {
font-family: Arial, sans-serif;
max-width: 800px;
margin: 0 auto;
padding: 20px;
}
h1 {
color: #333;
border-bottom: 2px solid #4CAF50;
padding-bottom: 10px;
}
.metric {
background: #f5f5f5;
padding: 15px;
margin: 10px 0;
border-radius: 5px;
}
.metric h3 {
margin-top: 0;
color: #4CAF50;
}
.answer {
font-size: 18px;
font-weight: bold;
color: #333;
}
.sql {
background: #f0f0f0;
padding: 10px;
border-radius: 3px;
font-family: monospace;
font-size: 12px;
overflow-x: auto;
}
table {
width: 100%;
border-collapse: collapse;
margin-top: 10px;
}
th, td {
border: 1px solid #ddd;
padding: 8px;
text-align: left;
}
th {
background-color: #4CAF50;
color: white;
}
.error {
color: red;
}
</style>
</head>
<body>
<h1>每日数据报告</h1>
<p>生成时间:{{ report_time }}</p>
{% for result in results %}
<div class="metric">
<h3>{{ result.question }}</h3>
{% if result.error %}
<p class="error">查询失败:{{ result.error }}</p>
{% else %}
<p class="answer">{{ result.answer }}</p>
{% if result.dataframe and result.dataframe.data %}
<table>
<thead>
<tr>
{% for col in result.dataframe.columns %}
<th>{{ col }}</th>
{% endfor %}
</tr>
</thead>
<tbody>
{% for row in result.dataframe.data[:10] %}
<tr>
{% for cell in row %}
<td>{{ cell }}</td>
{% endfor %}
</tr>
{% endfor %}
</tbody>
</table>
{% endif %}
<details>
<summary>查看 SQL</summary>
<div class="sql">{{ result.sql }}</div>
</details>
{% endif %}
</div>
{% endfor %}
</body>
</html>
"""
template = Template(html_template)
html_content = template.render(
report_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
results=results
)
# 发送邮件
send_email(html_content)
print(f"[{datetime.now()}] 报告生成完成")
def send_email(html_content: str):
"""发送邮件"""
try:
msg = MIMEMultipart('alternative')
msg['Subject'] = f"每日数据报告 - {datetime.now().strftime('%Y-%m-%d')}"
msg['From'] = SMTP_USER
msg['To'] = ", ".join(REPORT_RECIPIENTS)
html_part = MIMEText(html_content, 'html')
msg.attach(html_part)
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
server.starttls()
server.login(SMTP_USER, SMTP_PASSWORD)
server.send_message(msg)
print(f" 邮件已发送到: {', '.join(REPORT_RECIPIENTS)}")
except Exception as e:
print(f" 邮件发送失败: {e}")
# 配置定时任务
schedule.every().day.at("09:00").do(generate_report)
if __name__ == "__main__":
print("数据报告生成器已启动")
print(f"将在每天 09:00 生成报告并发送到: {', '.join(REPORT_RECIPIENTS)}")
# 立即执行一次(测试用)
# generate_report()
# 运行定时任务
while True:
schedule.run_pending()
time.sleep(60)
配置
:.env
ASKTABLE_API_KEY=your_api_key_here
DATASOURCE_ID=your_datasource_id_here
SMTP_SERVER=smtp.gmail.com
SMTP_PORT=587
SMTP_USER=your_email@gmail.com
SMTP_PASSWORD=your_app_password
REPORT_RECIPIENTS=recipient1@example.com,recipient2@example.com
运行报告生成器:
python report_generator.py
创建一个实时数据看板,展示关键业务指标。
技术栈:
创建
:dashboard_api.py
from fastapi import FastAPI, WebSocket
from fastapi.middleware.cors import CORSMiddleware
import requests
import asyncio
import os
from dotenv import load_dotenv
load_dotenv()
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# AskTable 配置
ASKTABLE_API_KEY = os.getenv("ASKTABLE_API_KEY")
ASKTABLE_BASE_URL = "https://api.asktable.com/api/v1"
DATASOURCE_ID = os.getenv("DATASOURCE_ID")
headers = {
"Authorization": f"Bearer {ASKTABLE_API_KEY}",
"Content-Type": "application/json"
}
def query_metric(question: str) -> dict:
"""查询单个指标"""
try:
response = requests.post(
f"{ASKTABLE_BASE_URL}/single-turn/q2a",
headers=headers,
json={
"datasource_id": DATASOURCE_ID,
"question": question
},
timeout=30
)
response.raise_for_status()
return response.json()
except Exception as e:
return {"error": str(e)}
@app.get("/api/metrics")
async def get_metrics():
"""获取所有指标"""
metrics = [
"今日销售额",
"今日订单数",
"今日新增用户数",
"本月销售额",
"本月订单数"
]
results = {}
for metric in metrics:
result = query_metric(metric)
results[metric] = result
return results
@app.websocket("/ws/metrics")
async def websocket_metrics(websocket: WebSocket):
"""WebSocket 实时推送指标"""
await websocket.accept()
try:
while True:
# 查询指标
metrics = await get_metrics()
# 发送给客户端
await websocket.send_json(metrics)
# 每 30 秒更新一次
await asyncio.sleep(30)
except Exception as e:
print(f"WebSocket 错误: {e}")
finally:
await websocket.close()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
创建
:Dashboard.tsx
import React, { useEffect, useState } from 'react';
import { Chart as ChartJS, ArcElement, Tooltip, Legend } from 'chart.js';
import { Doughnut } from 'react-chartjs-2';
ChartJS.register(ArcElement, Tooltip, Legend);
interface Metrics {
[key: string]: {
answer: string;
dataframe?: {
data: any[][];
};
};
}
function Dashboard() {
const [metrics, setMetrics] = useState<Metrics>({});
const [lastUpdate, setLastUpdate] = useState<Date>(new Date());
useEffect(() => {
// 建立 WebSocket 连接
const ws = new WebSocket('ws://localhost:8000/ws/metrics');
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
setMetrics(data);
setLastUpdate(new Date());
};
ws.onerror = (error) => {
console.error('WebSocket 错误:', error);
};
return () => {
ws.close();
};
}, []);
return (
<div className="dashboard">
<h1>实时数据看板</h1>
<p>最后更新: {lastUpdate.toLocaleTimeString()}</p>
<div className="metrics-grid">
{Object.entries(metrics).map(([question, result]) => (
<div key={question} className="metric-card">
<h3>{question}</h3>
{result.error ? (
<p className="error">{result.error}</p>
) : (
<p className="value">{result.answer}</p>
)}
</div>
))}
</div>
</div>
);
}
export default Dashboard;
通过这些实战案例,我们展示了如何使用 AskTable API 构建各种自定义应用:
AskTable API 的灵活性让你可以将 AI 数据分析能力集成到任何应用场景中,为用户提供自然语言查询数据的能力。