
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
在 Text-to-SQL 系统中,用户提出问题后,系统如何生成 SQL?生成的 SQL 是否正确?耗时多久?如果出错了如何追踪?AskTable 通过 Q2S(Question-to-SQL)日志系统,记录了从问题到 SQL 的完整转换过程。
用户反馈:"我问了'上个月的销售额',但结果不对"
没有日志时的困境:
有日志时的优势:
上图展示了 Q2S 日志的完整生命周期:从创建(processing)到成功(success)或失败(failed)的状态转换,每个状态都记录了关键信息,确保完整的可追溯性。
class Q2SModel(Base):
__tablename__ = "q2s_logs"
# 基础信息
id: str # q2s_xxx
project_id: str # 项目 ID
datasource_id: str # 数据源 ID
# 输入
question: str # 用户问题
role_id: str | None # 角色 ID
role_variables: dict | None # 角色变量
# 输出
query: dict | None # 生成的查询(包含 SQL、图表配置等)
err_msg: str | None # 错误信息
# 性能指标
duration: int # 耗时(毫秒)
trace_id: str | None # Langfuse trace ID
# 状态
status: str # processing | success | failed
STATUS_PROCESSING = "processing"
STATUS_SUCCESS = "success"
STATUS_FAILED = "failed"
# 时间戳
created_at: datetime
updated_at: datetime
设计亮点:
# 输入
question: str # "上个月的销售额是多少?"
role_id: str # "sales_manager"
role_variables: dict # {"region": "华东"}
# 输出
query: dict # {
# "sql": "SELECT SUM(amount) FROM sales WHERE ...",
# "chart_type": "number",
# "title": "上个月销售额"
# }
用途:
duration: int # 2500 (毫秒)
trace_id: str # Langfuse trace ID
用途:
status: str # processing | success | failed
状态流转:
创建 → processing
↓
成功 → success
↓
失败 → failed
async def create_q2s_log(
db_session: AsyncSession,
datasource_id: str,
question: str,
role_id: str | None = None,
role_variables: dict | None = None,
) -> Q2SModel:
"""创建 Q2S 日志"""
q2s_log = Q2SModel(
id=gen_id("q2s"), # 生成唯一 ID
project_id=project_id_var.get(), # 从上下文获取
datasource_id=datasource_id,
question=question,
role_id=role_id,
role_variables=role_variables,
duration=0,
status=Q2SModel.STATUS_PROCESSING, # 初始状态
)
db_session.add(q2s_log)
await db_session.flush() # 立即写入数据库
# 刷新以获取服务器端时间戳
await db_session.refresh(q2s_log)
return q2s_log
设计亮点:
await db_session.flush() # 立即写入
为什么要立即写入?
project_id=project_id_var.get() # 从上下文获取
使用 contextvars:
from contextvars import ContextVar
project_id_var: ContextVar[str] = ContextVar("project_id")
# 在请求开始时设置
project_id_var.set(current_project_id)
# 在任何地方都可以获取
project_id = project_id_var.get()
优势:
async def update_q2s_log(
db_session: AsyncSession,
q2s_id: str,
query: dict | None = None,
err_msg: str | None = None,
duration: int = 0,
trace_id: str | None = None,
) -> Q2SModel:
"""更新 Q2S 日志"""
# 查询日志
q2s_log = (
await db_session.execute(select(Q2SModel).filter_by(id=q2s_id))
).scalar_one()
# 更新状态
if err_msg:
q2s_log.status = Q2SModel.STATUS_FAILED
q2s_log.err_msg = err_msg[:128] # 限制长度
else:
q2s_log.query = query
q2s_log.status = Q2SModel.STATUS_SUCCESS
# 更新性能指标
q2s_log.duration = duration
q2s_log.trace_id = trace_id
await db_session.flush()
await db_session.refresh(q2s_log) # 刷新时间戳
return q2s_log
设计亮点:
q2s_log.err_msg = err_msg[:128] # 限制长度
为什么要截断?
if err_msg:
q2s_log.status = Q2SModel.STATUS_FAILED
q2s_log.err_msg = err_msg
else:
q2s_log.query = query
q2s_log.status = Q2SModel.STATUS_SUCCESS
清晰的状态管理:
async def get_q2s_logs(
db_session: AsyncSession,
datasource_id=None
) -> Page[Q2SModel]:
"""查询 Q2S 日志(分页)"""
project_id = project_id_var.get()
# 构建查询
if datasource_id:
stmt = select(Q2SModel).filter_by(
project_id=project_id,
datasource_id=datasource_id
)
else:
stmt = select(Q2SModel).filter_by(project_id=project_id)
# 按时间倒序
stmt = stmt.order_by(Q2SModel.created_at.desc())
# 分页
return await paginate(db_session, stmt)
设计亮点:
from fastapi_pagination import Page
from fastapi_pagination.ext.sqlalchemy import paginate
# 自动处理分页参数(page, size)
return await paginate(db_session, stmt)
返回格式:
{
"items": [...],
"total": 1000,
"page": 1,
"size": 20,
"pages": 50
}
# 按项目过滤
filter_by(project_id=project_id)
# 按数据源过滤
filter_by(datasource_id=datasource_id)
# 按时间排序
order_by(Q2SModel.created_at.desc())
用户反馈:"查询结果不对"
诊断流程:
# 1. 查询用户的 Q2S 日志
logs = await get_q2s_logs(db_session, datasource_id=user_datasource_id)
# 2. 找到问题日志
problem_log = logs.items[0]
# 3. 查看详细信息
print(f"问题:{problem_log.question}")
print(f"生成的 SQL:{problem_log.query['sql']}")
print(f"错误信息:{problem_log.err_msg}")
print(f"Trace ID:{problem_log.trace_id}")
# 4. 在 Langfuse 中查看完整 trace
# 包括:Prompt、LLM 响应、工具调用、中间结果
发现问题:
# 问题:上个月的销售额
# SQL:SELECT SUM(amount) FROM sales WHERE date >= '2024-02-01'
# 错误:没有结束日期,查询了 2 月至今的数据
修复方案:
# 优化 Prompt,明确时间范围处理
# 修改后 SQL:
# SELECT SUM(amount) FROM sales
# WHERE date >= '2024-02-01' AND date < '2024-03-01'
目标:找出慢查询,优化性能
分析流程:
# 1. 查询耗时超过 5 秒的日志
slow_logs = await db_session.execute(
select(Q2SModel)
.filter(Q2SModel.duration > 5000)
.order_by(Q2SModel.duration.desc())
.limit(100)
)
# 2. 分析慢查询模式
for log in slow_logs:
print(f"问题:{log.question}")
print(f"耗时:{log.duration}ms")
print(f"SQL:{log.query['sql']}")
print("---")
# 3. 发现问题
# - 某些查询没有使用索引
# - 某些查询扫描了全表
# - 某些查询 JOIN 了过多表
# 4. 优化方案
# - 添加索引
# - 优化 SQL 生成逻辑
# - 限制 JOIN 数量
目标:监控 SQL 生成质量
监控指标:
# 1. 成功率
success_rate = (
count(status == 'success') / count(total)
) * 100
# 2. 平均耗时
avg_duration = avg(duration)
# 3. 错误分布
error_distribution = group_by(err_msg).count()
# 4. 热门问题
popular_questions = group_by(question).count().order_by(desc).limit(10)
告警规则:
# 成功率低于 90%
if success_rate < 0.9:
send_alert("Q2S success rate is low")
# 平均耗时超过 3 秒
if avg_duration > 3000:
send_alert("Q2S average duration is high")
# 错误率突增
if error_rate > last_hour_error_rate * 2:
send_alert("Q2S error rate spike")
目标:了解用户如何使用系统
分析维度:
# 1. 最常问的问题
SELECT question, COUNT(*) as count
FROM q2s_logs
GROUP BY question
ORDER BY count DESC
LIMIT 20
# 2. 不同角色的查询模式
SELECT role_id, COUNT(*) as count, AVG(duration) as avg_duration
FROM q2s_logs
GROUP BY role_id
# 3. 查询时间分布
SELECT HOUR(created_at) as hour, COUNT(*) as count
FROM q2s_logs
GROUP BY hour
ORDER BY hour
# 4. 数据源使用情况
SELECT datasource_id, COUNT(*) as count
FROM q2s_logs
GROUP BY datasource_id
ORDER BY count DESC
# 定期清理旧日志
async def cleanup_old_logs(days=90):
cutoff_date = datetime.now() - timedelta(days=days)
await db_session.execute(
delete(Q2SModel).where(Q2SModel.created_at < cutoff_date)
)
def mask_sensitive_data(question: str) -> str:
"""脱敏敏感信息"""
# 脱敏手机号
question = re.sub(r'\d{11}', '***********', question)
# 脱敏身份证号
question = re.sub(r'\d{18}', '******************', question)
return question
# 创建日志时脱敏
q2s_log = Q2SModel(
question=mask_sensitive_data(question),
...
)
# 在 Langfuse 中记录 Q2S ID
from langfuse import Langfuse
langfuse = Langfuse()
trace = langfuse.trace(
name="q2s",
metadata={"q2s_id": q2s_log.id}
)
# 在 Q2S 日志中记录 Trace ID
q2s_log.trace_id = trace.id
优势:
AskTable 的 Q2S 日志系统提供了完整的查询追踪能力:
通过 Q2S 日志,我们可以:
日志不仅是调试工具,更是产品优化的数据基础。