
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
在元数据检索中,单纯的向量检索或全文检索都有局限性。向量检索擅长语义理解但可能漏掉精确匹配,全文检索擅长精确匹配但不理解语义。AskTable 通过双重检索策略,结合两者优势,实现了更准确的元数据检索。
场景:用户问"北京地区的订单"
# 向量检索
query_embedding = embed("北京地区的订单")
results = vector_search(query_embedding)
# 可能返回:
# - region 字段(语义相关)
# - area 字段(语义相关)
# - province 字段(语义相关)
# 但可能漏掉:
# - 值为"北京"的具体记录
问题:
场景:用户问"销售情况"
# 全文检索
results = fulltext_search("销售情况")
# 可能返回:
# - 包含"销售"的字段
# - 包含"情况"的字段
# 但可能漏掉:
# - revenue 字段(英文,语义相关但关键词不匹配)
# - amount 字段(语义相关但关键词不匹配)
问题:
结合两者:
# 1. 向量检索:找相关字段
field_results = vector_search(["销售额", "订单金额"])
# 返回:sales.amount, orders.revenue
# 2. 全文检索:找具体值
value_results = fulltext_search(["北京", "华东"])
# 返回:region='北京', area='华东'
# 3. 合并结果
final_results = merge(field_results, value_results)
上图展示了双重检索的完整流程:将用户问题分解为语义概念和具体值,分别使用向量检索和全文检索,最后合并结果。这种策略既能理解语义(销售额≈revenue),又能精确匹配(北京="北京")。
优势:
async def retrieve_entities(
meta: MetaAdmin,
datasource: DataSourceAdmin,
subqueries: list[str], # 语义查询
keywords: list[str], # 关键词查询
) -> MetaAdmin:
"""
双重检索:向量检索 + 全文检索
"""
fields: list[RetrievedMetaEntity] = []
values: list[RetrievedMetaEntity] = []
# 1. 向量检索:找相关字段
fields = await datasource.retrieve_fields_by_question(subqueries)
log.info(f"retrieved {len(fields)} possible relevant fields")
# 2. 全文检索:找具体值
if not config.aisearch_host or not config.aisearch_master_key:
log.warning("Value index is not enabled, skipping value retrieval")
elif not keywords:
log.warning("No keywords provided, skipping value retrieval")
else:
values = await datasource.retrieve_values_by_question(keywords)
log.info(f"retrieved {len(values)} possible relevant values")
# 3. 合并结果
entities = _merge_values_fields(values + fields)
# 4. 过滤元数据
hit_tables = meta.filter_tables_by_names(
list(
set([(entity["schema_name"], entity["table_name"]) for entity in entities])
)
)
# 5. 添加上下文
_add_context_to_meta(meta=hit_tables, entities=entities)
if len(entities) == 0:
raise errors.NoDataToQuery()
return hit_tables
设计亮点:
subqueries: list[str] # 语义查询:["销售额", "订单金额", "地区"]
keywords: list[str] # 关键词查询:["北京", "华东", "2024"]
为什么要分解?
subqueries:用于向量检索,找相关字段
keywords:用于全文检索,找具体值
示例:
# 用户问题:"北京地区 2024 年的销售额"
# 分解为:
subqueries = ["销售额", "地区", "年份"] # 语义概念
keywords = ["北京", "2024"] # 具体值
if not config.aisearch_host or not config.aisearch_master_key:
log.warning("Value index is not enabled, skipping value retrieval")
elif not keywords:
log.warning("No keywords provided, skipping value retrieval")
else:
values = await datasource.retrieve_values_by_question(keywords)
优雅降级:
async def retrieve_fields_by_question(
self, subqueries: list[str], accessible_meta=None
) -> list[RetrievedMetaEntity]:
"""使用向量检索查找相关字段"""
if accessible_meta:
return await retrieve_meta(subqueries=subqueries, ds_id=self.id)
else:
return await retrieve_meta(subqueries=subqueries, ds_id=self.id)
调用 Qdrant:
# 在 meta/qdrant.py 中
async def retrieve(
subqueries: list[str],
ds_id: str,
meta: MetaAdmin | None = None,
top_k: int = 12,
threshold: float = 0.4,
):
"""向量检索元数据"""
# 1. 生成查询向量
query_embeddings = await embed_docs(subqueries)
# 2. 构建搜索请求
search_queries = [
SearchRequest(
vector=embedding,
filter=filter,
limit=top_k,
score_threshold=threshold,
with_payload=True,
)
for embedding in query_embeddings
]
# 3. 批量搜索
results = await db.search_batch(
collection_name=collection_name,
requests=search_queries,
)
# 4. 返回结果
hits = [point for result in results for point in result]
return unpack_qdrant_points(hits=hits)
返回格式:
[
{
"id": "uuid-1",
"payload": {
"schema_name": "public",
"table_name": "sales",
"field_name": "amount",
"type": "curr_desc",
"value": "销售金额,单位:元"
},
"score": 0.85
},
...
]
async def retrieve_values_by_question(
self, keywords: list[str], accessible_meta=None
) -> list[RetrievedMetaEntity]:
"""使用全文检索查找具体值"""
meta = self.brain_meta
visited_fields = []
async def _query_values(schema_name, table_name, field_name):
"""查询字段的唯一值"""
field_values = query_values(
self.id,
schema_name,
table_name,
field_name,
keywords
)
return field_values
# 遍历所有字段,查询包含关键词的值
tasks = []
for schema in meta.schemas.values():
for table in schema.tables.values():
for field in table.fields.values():
if field.is_index: # 只查询索引字段
tasks.append(_query_values(
schema.name,
table.name,
field.name
))
# 并行查询
results = await asyncio.gather(*tasks)
# 合并结果
return [item for sublist in results for item in sublist]
全文检索实现:
def query_values(
datasource_id: str,
schema_name: str,
table_name: str,
field_name: str,
keywords: list[str]
) -> list[RetrievedMetaEntity]:
"""
从 Azure AI Search 或 Elasticsearch 查询值
"""
# 构建查询
query = {
"search": " OR ".join(keywords), # "北京 OR 上海 OR 广州"
"filter": f"datasource_id eq '{datasource_id}' and "
f"schema_name eq '{schema_name}' and "
f"table_name eq '{table_name}' and "
f"field_name eq '{field_name}'",
"top": 10
}
# 执行查询
results = search_client.search(**query)
# 返回结果
return [
{
"id": result["id"],
"payload": {
"schema_name": schema_name,
"table_name": table_name,
"field_name": field_name,
"type": "value",
"value": result["value"]
},
"score": result["@search.score"]
}
for result in results
]
返回格式:
[
{
"id": "value-1",
"payload": {
"schema_name": "public",
"table_name": "sales",
"field_name": "region",
"type": "value",
"value": "北京"
},
"score": 0.95
},
...
]
def _merge_values_fields(entities: list[RetrievedMetaEntity]) -> list[dict]:
"""
合并字段检索和值检索的结果
"""
# 按表分组
table_entities = {}
for entity in entities:
table_key = (entity["payload"]["schema_name"], entity["payload"]["table_name"])
if table_key not in table_entities:
table_entities[table_key] = []
table_entities[table_key].append(entity)
# 去重和排序
merged = []
for table_key, table_entities_list in table_entities.items():
# 按 score 排序
sorted_entities = sorted(
table_entities_list,
key=lambda x: x["score"],
reverse=True
)
# 去重(同一字段只保留最高分)
seen_fields = set()
for entity in sorted_entities:
field_key = (
entity["payload"]["schema_name"],
entity["payload"]["table_name"],
entity["payload"]["field_name"]
)
if field_key not in seen_fields:
merged.append(entity["payload"])
seen_fields.add(field_key)
return merged
合并策略:
def _add_context_to_meta(meta: MetaAdmin, entities: list[dict]):
"""
将检索到的上下文添加到元数据中
"""
for entity in entities:
schema_name = entity["schema_name"]
table_name = entity["table_name"]
field_name = entity["field_name"]
# 找到对应的字段
if schema := meta.schemas.get(schema_name):
if table := schema.tables.get(table_name):
if field := table.fields.get(field_name):
# 添加检索上下文
if entity["type"] == "value":
# 添加示例值
field.sample_data = entity["value"]
elif entity["type"] == "curr_desc":
# 已有描述,不需要添加
pass
上下文类型:
用途:
用户问题:"北京地区的销售额"
检索过程:
# 1. 分解查询
subqueries = ["销售额", "地区"]
keywords = ["北京"]
# 2. 向量检索字段
field_results = [
{"table": "sales", "field": "amount", "score": 0.85},
{"table": "sales", "field": "region", "score": 0.80},
]
# 3. 全文检索值
value_results = [
{"table": "sales", "field": "region", "value": "北京", "score": 0.95},
]
# 4. 合并结果
# 表:sales
# 字段:amount(销售金额), region(地区)
# 示例值:region='北京'
# 5. 生成 SQL
# SELECT SUM(amount) FROM sales WHERE region = '北京'
用户问题:"2024 年 1 月的订单数量"
检索过程:
# 1. 分解查询
subqueries = ["订单数量", "时间", "年份", "月份"]
keywords = ["2024", "1月", "01"]
# 2. 向量检索字段
field_results = [
{"table": "orders", "field": "order_count", "score": 0.90},
{"table": "orders", "field": "created_at", "score": 0.85},
{"table": "orders", "field": "order_date", "score": 0.82},
]
# 3. 全文检索值
value_results = [
{"table": "orders", "field": "order_date", "value": "2024-01-15", "score": 0.92},
{"table": "orders", "field": "order_date", "value": "2024-01-20", "score": 0.91},
]
# 4. 合并结果
# 表:orders
# 字段:order_count, order_date
# 示例值:order_date='2024-01-15'
# 5. 生成 SQL
# SELECT COUNT(*) FROM orders
# WHERE order_date >= '2024-01-01' AND order_date < '2024-02-01'
# 并行执行向量检索和全文检索
field_task = datasource.retrieve_fields_by_question(subqueries)
value_task = datasource.retrieve_values_by_question(keywords)
fields, values = await asyncio.gather(field_task, value_task)
# 缓存常见查询的结果
cache_key = f"{datasource_id}:{hash(tuple(subqueries))}:{hash(tuple(keywords))}"
if cache_key in cache:
return cache[cache_key]
results = await retrieve_entities(...)
cache[cache_key] = results
return results
# 只在索引字段上进行全文检索
for field in table.fields.values():
if field.is_index: # 高基数字段
tasks.append(_query_values(...))
# ✅ 好的分解
subqueries = ["销售额", "地区"] # 语义概念
keywords = ["北京"] # 具体值
# ❌ 差的分解
subqueries = ["北京地区的销售额"] # 混在一起
keywords = []
# 设置合理的阈值
vector_threshold = 0.4 # 向量检索阈值
fulltext_threshold = 0.5 # 全文检索阈值
# 限制结果数量
top_k = 12 # 每个查询最多返回 12 个结果
# 全文检索不可用时,只用向量检索
if not fulltext_available:
log.warning("Fulltext search not available, using vector search only")
return await vector_search_only(subqueries)
AskTable 通过双重检索策略,结合向量检索和全文检索的优势:
通过双重检索,我们实现了:
这种策略不仅适用于元数据检索,也可以推广到其他需要结合语义理解和精确匹配的场景。