
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
在企业级数据分析场景中,数据库往往包含数百甚至上千张表,每张表又有几十个字段。当用户用自然语言提问时,如何快速找到相关的表和字段?传统的关键词匹配往往力不从心,而 AskTable 通过 Qdrant 向量检索实现了真正的语义理解。
假设用户问:"上个月华东地区的销售情况如何?"
关键词匹配的问题:
revenue、sales_amount、order_totalregion、area、province暴力方案的问题:
AskTable 使用 Qdrant 向量数据库实现语义检索:
上图展示了从元数据向量化到检索的完整流程:左侧是离线的向量化过程,右侧是在线的检索过程。通过向量相似度计算,系统能在毫秒级从海量字段中找到最相关的数据。
AskTable 将数据库元数据(表名、字段名、描述)转换为向量并存储到 Qdrant:
async def create(meta: MetaAdmin):
"""
- get or create project collection in db
- upsert fields data to vector
"""
project_id = project_id_var.get()
collection_name = PROJECT_COLLECTION_PREFIX + project_id
await create_collection_if_not_exist(collection_name)
docs = []
for schema in meta.schemas.values():
for table in schema.tables.values():
for field in table.fields.values():
if field.curr_desc and field.curr_desc.strip():
assert table.curr_desc is not None
assert meta.datasource_id is not None
metadata = {
"datasource_id": meta.datasource_id,
"schema_name": schema.name,
"table_name": table.name,
"field_name": field.name,
"type": "curr_desc",
"value": field.curr_desc,
}
docs.append({
"page_content": table.curr_desc + field.curr_desc,
"metadata": metadata,
"id": _gen_id_by_field(
meta.datasource_id,
schema.name,
table.name,
field.name,
),
})
await upload_batch(collection_name, docs, with_id=True)
设计亮点:
"page_content": table.curr_desc + field.curr_desc
为什么要组合表描述和字段描述?
amount 单独看很模糊,但 订单表.订单金额 就很清晰def _gen_id_by_field(
datasource_id: str, schema_name: str, table_name: str, field_name: str
) -> str:
"""
根据datasource_id, schema_name, table_name, field_name生成唯一ID
使用 uuid5 来生成基于输入字符串的 UUID
"""
parts = [datasource_id, schema_name, table_name, field_name]
input_string = "-".join(parts)
return str(uuid.uuid5(uuid.NAMESPACE_DNS, input_string))
为什么使用 UUID5?
metadata = {
"datasource_id": meta.datasource_id,
"schema_name": schema.name,
"table_name": table.name,
"field_name": field.name,
"type": "curr_desc",
"value": field.curr_desc,
}
元数据的作用:
当用户提问时,AskTable 使用向量检索找到最相关的字段:
async def retrieve(
subqueries: list[str],
ds_id: str,
meta: MetaAdmin | None = None,
top_k: int = 12,
threshold: float = 0.4,
):
"""
retrieve meta from vector db
filter either by ds_id or by meta
"""
project_id = project_id_var.get()
collection_name = PROJECT_COLLECTION_PREFIX + project_id
db = get_vector_db()
if not await db.collection_exists(collection_name):
log.info(f"集合 {collection_name} 不存在,跳过查询。")
return []
# 将查询文本转换为向量
query_embeddings = await embed_docs(subqueries)
# 构建过滤条件
if meta and meta.datasource_id:
# 如果提供了 meta,只在允许的字段中搜索
allow_ids = []
for schema in meta.schemas.values():
for table in schema.tables.values():
for field in table.fields.values():
allow_ids.append(
_gen_id_by_field(
meta.datasource_id,
schema.name,
table.name,
field.name,
)
)
filter = Filter(must=[HasIdCondition(has_id=allow_ids)])
else:
# 否则按数据源 ID 过滤
filter = Filter(
must=[FieldCondition(key="datasource_id", match=MatchValue(value=ds_id))]
)
# 构建批量搜索请求
search_queries = [
SearchRequest(
vector=embedding,
filter=filter,
limit=top_k,
score_threshold=threshold,
with_payload=True,
)
for embedding in query_embeddings
]
# 批量搜索
results = await db.search_batch(
collection_name=collection_name,
requests=search_queries,
)
hits = [point for result in results for point in result]
return unpack_qdrant_points(hits=hits)
设计亮点:
query_embeddings = await embed_docs(subqueries)
search_queries = [
SearchRequest(vector=embedding, ...)
for embedding in query_embeddings
]
results = await db.search_batch(...)
为什么使用多查询?
用户问题:"上个月华东地区的销售情况如何?"
可以分解为多个子查询:
每个子查询独立检索,然后合并结果,提高召回率。
if meta and meta.datasource_id:
# 只在用户有权限的字段中搜索
allow_ids = [...]
filter = Filter(must=[HasIdCondition(has_id=allow_ids)])
安全性保障:
score_threshold=threshold # 默认 0.4
为什么需要阈值?
AskTable 实现了高效的批量 Embedding 处理:
async def embedding_batch(self, text_list: list[str]) -> list[list[float]]:
_begin = time.time()
text_list = [text.replace("\n", " ") for text in text_list]
# 分批处理
batches = [
text_list[i : i + EMBEDDING_BATCH_SIZE]
for i in range(0, len(text_list), EMBEDDING_BATCH_SIZE)
]
log.info(f"total batches: {len(batches)}")
embeddings = []
# 分块并行处理
for i in range(0, len(batches), EMBEDDING_CHUNK_SIZE):
_begin_chunk = time.time()
chunk = batches[i : i + EMBEDDING_CHUNK_SIZE]
tasks = [self._embedding_batch(batch) for batch in chunk]
chunk_results = await asyncio.gather(*tasks)
embeddings.extend([
embedding
for batch_result in chunk_results
for embedding in batch_result
])
log.info(
f"embedding_chunks: {len(chunk)} batches, "
f"time: {time.time() - _begin_chunk:.2f}s"
)
log.info(
f"embedding_batch: {len(text_list)} texts, "
f"time: {time.time() - _begin:.2f}s"
)
return embeddings
性能优化策略:
EMBEDDING_BATCH_SIZE = 100 # 每个 API 请求的文本数
EMBEDDING_CHUNK_SIZE = 100 # 并行请求数
为什么需要两级?
tasks = [self._embedding_batch(batch) for batch in chunk]
chunk_results = await asyncio.gather(*tasks)
性能提升:
text_list = [text.replace("\n", " ") for text in text_list]
为什么要替换换行符?
背景:
检索流程:
子查询分解:
向量检索(每个子查询 top_k=12):
product.product_line, sales.category, dim_product.line_namefinance.gross_margin, sales.profit_rate, report.margin_pctorders.order_date, sales.sale_date, fact_sales.date_key结果合并去重:
product, sales, finance传给 LLM:
背景:
实现方案:
# 每个租户一个独立的 collection
collection_name = f"meta_{project_id}"
# 检索时按 datasource_id 过滤
filter = Filter(
must=[FieldCondition(key="datasource_id", match=MatchValue(value=ds_id))]
)
优势:
背景:
实现方案:
# 根据用户角色获取可访问的元数据
accessible_meta = role.get_accessible_meta(datasource)
# 只在可访问的字段中检索
allow_ids = [
_gen_id_by_field(...)
for field in accessible_meta.all_fields()
]
filter = Filter(must=[HasIdCondition(has_id=allow_ids)])
安全保障:
# 按项目隔离 collection
collection_name = PROJECT_COLLECTION_PREFIX + project_id
优势:
# 使用确定性 ID 支持幂等更新
id = _gen_id_by_field(datasource_id, schema_name, table_name, field_name)
docs.append({"id": id, "page_content": ..., "metadata": ...})
await upload_batch(collection_name, docs, with_id=True)
优势:
async def delete_by_field_names(ds_id: str, fields_names: list[tuple[str, str, str]]):
"""删除指定字段的向量"""
filter = Filter(
should=[
Filter(
must=[
FieldCondition(key="datasource_id", match=MatchValue(value=ds_id)),
FieldCondition(key="field_name", match=MatchValue(value=field[2])),
FieldCondition(key="table_name", match=MatchValue(value=field[1])),
FieldCondition(key="schema_name", match=MatchValue(value=field[0])),
]
)
for field in fields_names
]
)
await db.delete(collection_name, filter)
优势:
推荐:
# 组合表描述和字段描述
page_content = f"{table.curr_desc} {field.curr_desc}"
不推荐:
# 只用字段名
page_content = field.name # 语义信息太少
# 只用字段描述
page_content = field.curr_desc # 缺少表的上下文
# 精确查询场景(如报表生成)
threshold = 0.6 # 高阈值,只返回高度相关的字段
# 探索性查询场景(如数据探索)
threshold = 0.3 # 低阈值,返回更多可能相关的字段
# 默认场景
threshold = 0.4 # 平衡准确率和召回率
# 简单查询
top_k = 5 # 减少噪音
# 复杂查询
top_k = 20 # 提高召回率
# 默认
top_k = 12 # 经验值
高质量元数据的特征:
示例:
# 好的描述
table.curr_desc = "订单表,记录所有客户订单信息"
field.curr_desc = "订单金额,单位:元,不含税"
# 差的描述
table.curr_desc = "orders" # 没有业务含义
field.curr_desc = "amount" # 信息不足
| 特性 | Qdrant | Pinecone | Milvus | Weaviate |
|---|---|---|---|---|
| 开源 | ✅ | ❌ | ✅ | ✅ |
| 本地部署 | ✅ | ❌ | ✅ | ✅ |
| 过滤性能 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ |
| 批量搜索 | ✅ | ✅ | ✅ | ✅ |
| Python SDK | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ |
| 文档质量 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ |
Qdrant 的优势:
AskTable 通过 Qdrant 向量检索实现了高效的数据库元数据检索:
这种架构不仅适用于 Text-to-SQL,也可以推广到其他需要从大量结构化数据中检索的场景,如:
通过向量检索,我们可以让 AI 真正"理解"数据,而不仅仅是匹配关键词。