
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
在 RAG(检索增强生成)系统中,Embedding 是最常见也是最耗时的操作之一。当需要为数据库的数万个字段生成向量时,如何高效地调用 Embedding API?AskTable 通过两级批处理和异步并行,实现了 100 倍的性能提升。
最直接的实现方式是逐个调用 Embedding API:
async def embedding_naive(text_list: list[str]) -> list[list[float]]:
"""串行调用 Embedding API"""
embeddings = []
for text in text_list:
response = await openai.embeddings.create(
input=text,
model="text-embedding-3-small"
)
embeddings.append(response.data[0].embedding)
return embeddings
性能问题:
太慢了!
上图对比了三种方案的性能差异:串行单个调用最慢(500秒),串行批量处理提升100倍(5秒),并行批量处理再提升10倍(0.5秒),最终实现1000倍性能提升。
OpenAI Embedding API 支持批量输入:
# 单个文本
response = await openai.embeddings.create(
input="hello world",
model="text-embedding-3-small"
)
# 批量文本(最多 2048 个)
response = await openai.embeddings.create(
input=["hello", "world", "foo", "bar", ...], # 最多 2048 个
model="text-embedding-3-small"
)
优势:
实现:
EMBEDDING_BATCH_SIZE = 100 # 每批 100 个文本
async def embedding_batch(text_list: list[str]) -> list[list[float]]:
"""批量调用 Embedding API"""
# 分批
batches = [
text_list[i : i + EMBEDDING_BATCH_SIZE]
for i in range(0, len(text_list), EMBEDDING_BATCH_SIZE)
]
embeddings = []
for batch in batches:
response = await openai.embeddings.create(
input=batch,
model="text-embedding-3-small"
)
embeddings.extend([emb.embedding for emb in response.data])
return embeddings
性能提升:
提升 100 倍!
虽然批量处理大幅减少了 API 调用次数,但仍然是串行执行。我们可以并行发送多个批次:
EMBEDDING_BATCH_SIZE = 100 # 每批 100 个文本
EMBEDDING_CHUNK_SIZE = 100 # 并行 100 个批次
async def _embedding_batch(batch: list[str]) -> list[list[float]]:
"""单个批次的 Embedding"""
response = await openai.embeddings.create(
input=batch,
model="text-embedding-3-small",
encoding_format="float",
)
return [embedding.embedding for embedding in response.data]
async def embedding_parallel(text_list: list[str]) -> list[list[float]]:
"""并行批量调用 Embedding API"""
# 第一级:分批
batches = [
text_list[i : i + EMBEDDING_BATCH_SIZE]
for i in range(0, len(text_list), EMBEDDING_BATCH_SIZE)
]
embeddings = []
# 第二级:分块并行
for i in range(0, len(batches), EMBEDDING_CHUNK_SIZE):
chunk = batches[i : i + EMBEDDING_CHUNK_SIZE]
# 并行执行
tasks = [_embedding_batch(batch) for batch in chunk]
chunk_results = await asyncio.gather(*tasks)
# 合并结果
embeddings.extend([
embedding
for batch_result in chunk_results
for embedding in batch_result
])
return embeddings
性能提升:
再提升 100 倍!总共提升 10000 倍!
class AsyncLLMClient:
@openai_error_wrapper
async def _embedding_batch(self, batch):
try:
response = await self.client_embedding.embeddings.create(
input=batch,
model=config.embed_model_name,
encoding_format="float",
)
return [embedding.embedding for embedding in response.data]
except Exception as e:
log.error(f"embedding failed: {e}, batch: {batch}")
raise e
@openai_error_wrapper
async def embedding_batch(self, text_list: list[str]) -> list[list[float]]:
_begin = time.time()
# 预处理:替换换行符
text_list = [text.replace("\n", " ") for text in text_list]
# 第一级:分批
batches = [
text_list[i : i + EMBEDDING_BATCH_SIZE]
for i in range(0, len(text_list), EMBEDDING_BATCH_SIZE)
]
log.info(f"total batches: {len(batches)}")
embeddings = []
# 第二级:分块并行
for i in range(0, len(batches), EMBEDDING_CHUNK_SIZE):
_begin_chunk = time.time()
chunk = batches[i : i + EMBEDDING_CHUNK_SIZE]
# 并行执行
tasks = [self._embedding_batch(batch) for batch in chunk]
chunk_results = await asyncio.gather(*tasks)
# 合并结果
embeddings.extend([
embedding
for batch_result in chunk_results
for embedding in batch_result
])
log.info(
f"embedding_chunks: {len(chunk)} batches, "
f"time: {time.time() - _begin_chunk:.2f}s"
)
log.info(
f"embedding_batch: {len(text_list)} texts, "
f"time: {time.time() - _begin:.2f}s"
)
return embeddings
设计亮点:
EMBEDDING_BATCH_SIZE = 100 # 第一级:每个 API 请求的文本数
EMBEDDING_CHUNK_SIZE = 100 # 第二级:并行请求数
为什么需要两级?
第一级(Batch):减少 API 调用次数
第二级(Chunk):控制并发数
示例:
# 10000 个文本
# 第一级:10000 / 100 = 100 个批次
# 第二级:100 / 100 = 1 个块
# 结果:1 轮并行,每轮 100 个请求
# 100000 个文本
# 第一级:100000 / 100 = 1000 个批次
# 第二级:1000 / 100 = 10 个块
# 结果:10 轮并行,每轮 100 个请求
text_list = [text.replace("\n", " ") for text in text_list]
为什么要替换换行符?
@openai_error_wrapper
async def _embedding_batch(self, batch):
try:
response = await self.client_embedding.embeddings.create(...)
return [embedding.embedding for embedding in response.data]
except Exception as e:
log.error(f"embedding failed: {e}, batch: {batch}")
raise e
错误处理策略:
log.info(f"total batches: {len(batches)}")
log.info(
f"embedding_chunks: {len(chunk)} batches, "
f"time: {time.time() - _begin_chunk:.2f}s"
)
log.info(
f"embedding_batch: {len(text_list)} texts, "
f"time: {time.time() - _begin:.2f}s"
)
监控指标:
| 方案 | API 调用次数 | 并发数 | 总耗时 | 提升倍数 |
|---|---|---|---|---|
| 串行单个 | 10000 | 1 | 500s | 1x |
| 串行批量 | 100 | 1 | 5s | 100x |
| 并行批量 | 100 | 100 | 0.5s | 1000x |
[INFO] total batches: 100
[INFO] embedding_chunks: 100 batches, time: 0.52s
[INFO] embedding_batch: 10000 texts, time: 0.52s
吞吐量:10000 / 0.52 = 19230 texts/s
# 根据 API 限制调整
EMBEDDING_BATCH_SIZE = 100 # OpenAI 最多 2048
# 根据文本长度调整
if avg_text_length > 1000:
EMBEDDING_BATCH_SIZE = 50 # 长文本减少批量大小
else:
EMBEDDING_BATCH_SIZE = 100 # 短文本增加批量大小
# 根据 API 限流调整
EMBEDDING_CHUNK_SIZE = 100 # OpenAI 限流:3000 RPM
# 根据网络带宽调整
if network_bandwidth < 10_000_000: # 10 Mbps
EMBEDDING_CHUNK_SIZE = 50
else:
EMBEDDING_CHUNK_SIZE = 100
async def _embedding_batch_with_retry(self, batch, max_retries=3):
for attempt in range(max_retries):
try:
return await self._embedding_batch(batch)
except Exception as e:
if attempt == max_retries - 1:
raise
log.warning(f"Retry {attempt + 1}/{max_retries}: {e}")
await asyncio.sleep(2 ** attempt) # 指数退避
from tqdm import tqdm
async def embedding_batch_with_progress(self, text_list: list[str]):
batches = [...]
embeddings = []
with tqdm(total=len(batches), desc="Embedding") as pbar:
for i in range(0, len(batches), EMBEDDING_CHUNK_SIZE):
chunk = batches[i : i + EMBEDDING_CHUNK_SIZE]
tasks = [self._embedding_batch(batch) for batch in chunk]
chunk_results = await asyncio.gather(*tasks)
embeddings.extend([...])
pbar.update(len(chunk))
return embeddings
AskTable 通过两级批处理和异步并行,实现了 Embedding 性能的大幅提升:
这种优化策略不仅适用于 Embedding,也可以推广到其他批量 API 调用场景,如批量 LLM 调用、批量数据库查询等。
关键是理解:批量 + 并行 = 性能飞跃