
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
在 AI 数据分析场景中,一个 SQL 查询可能耗时数秒,一个 Python 分析脚本可能运行数分钟。如何让用户实时看到执行进度?如何在网络断线后恢复推送?如何在多实例部署下保证事件不丢失?
AskTable 的 Canvas 流式执行架构,通过 Redis Streams + SSE + arq 的组合,实现了高性能、高可靠的实时推送系统。
本文将深入剖析这套架构的设计与实现。
短轮询(Polling):
// 客户端每秒轮询一次
setInterval(async () => {
const status = await fetch(`/api/node/${nodeId}/status`);
updateUI(status);
}, 1000);
问题:
长轮询(Long Polling):
# 服务器阻塞等待直到有新事件
async def get_status(node_id):
while True:
if has_new_event(node_id):
return get_event(node_id)
await asyncio.sleep(0.1)
问题:
✅ 实时性:毫秒级推送延迟 ✅ 断点续传:网络断线后自动恢复,不丢失事件 ✅ 横向扩展:支持多实例部署,事件跨实例广播 ✅ 低成本:复用现有 Redis,无需额外组件 ✅ 可靠性:事件持久化,支持历史回放
Redis Streams 是 Redis 5.0 引入的数据结构,专为消息队列和事件流设计。
核心优势:
XREAD BLOCK 支持长连接等待新事件XRANGE 支持按 ID 范围查询历史事件为什么不用 Redis Pub/Sub?
为什么不用 Kafka/RabbitMQ?
SSE 是 HTML5 标准,专为服务器到客户端的单向推送设计。
核心优势:
EventSource APILast-Event-ID 自动重连SSE vs WebSocket:
| 特性 | SSE | WebSocket |
|---|---|---|
| 方向 | 单向(服务器 → 客户端) | 双向 |
| 协议 | HTTP | WebSocket |
| 断点续传 | 原生支持(Last-Event-ID) | 需要自己实现 |
| 浏览器支持 | 所有现代浏览器 | 所有现代浏览器 |
| 复杂度 | 低 | 中 |
AskTable 的场景:
AskTable 的流式执行架构分为三层:
at:canvas:stream:{canvas_id}:{node_id})class NodeStreamStore:
STREAM_TTL_SECONDS = 3600 # stream 完成后保留 1 小时
BLOCK_MS = 2000 # XREAD 阻塞等待时间
def _stream_key(self, canvas_id: str, node_id: str) -> str:
"""生成 Stream Key"""
return f"at:canvas:stream:{canvas_id}:{node_id}"
async def start_stream(self, canvas_id: str, node_id: str):
"""开始新 stream,清除旧数据并创建空 stream"""
key = self._stream_key(canvas_id, node_id)
async with self._redis.pipeline(transaction=True) as pipe:
pipe.delete(key) # 清除旧数据
# 种子消息让 key 立即存在,subscribe 跳过 _init 事件
pipe.xadd(key, {"event": StreamEvent.INIT, "data": ""})
await pipe.execute()
async def append(self, canvas_id: str, node_id: str, event: str, data: str) -> str:
"""追加事件到 stream,返回 stream ID"""
key = self._stream_key(canvas_id, node_id)
stream_id = await self._redis.xadd(
key, {"event": event, "data": data}, maxlen=STREAM_MAXLEN, approximate=True
)
return stream_id
async def complete(self, canvas_id: str, node_id: str):
"""标记 stream 完成,设置 TTL"""
key = self._stream_key(canvas_id, node_id)
await self._redis.expire(key, self.STREAM_TTL_SECONDS)
关键点:
at:canvas:stream:{canvas_id}:{node_id},每个节点独立 Stream_init 事件让 Stream 立即存在,避免订阅时 key 不存在class StreamEvent(StrEnum):
INIT = "_init" # 种子事件(不推送给客户端)
START = "start" # 任务开始
DELTA = "delta" # 增量数据(如 AI 生成的文本)
DONE = "done" # 任务完成
ERROR = "error" # 任务失败
_TERMINAL_EVENTS = frozenset((StreamEvent.DONE, StreamEvent.ERROR))
事件生命周期:
INIT → START → DELTA × N → DONE/ERROR
async def subscribe(
self, canvas_id: str, node_id: str, from_id: str = "0-0"
) -> AsyncGenerator[tuple[str, str, str], None]:
"""订阅 stream 事件,支持断点续传。
1. XRANGE 重放 from_id 之后的历史消息
2. 如果未遇到终止事件(done/error),切换到 XREAD BLOCK 等待实时消息
3. 遇到 done/error 终止
from_id: "0-0" 从头读取,或传入 Last-Event-ID 实现断点重续
"""
key = self._stream_key(canvas_id, node_id)
# Phase 1: 重放历史(XRANGE)
range_start = f"({from_id}" if from_id != "0-0" else "-"
raw_entries = await self._redis.xrange(key, min=range_start)
last_id = from_id
for stream_id, event, data, terminal in self._iter_entries(raw_entries):
last_id = stream_id
yield (stream_id, event, data)
if terminal:
return
# Phase 2: 实时等待(XREAD BLOCK)
while True:
results = await self._redis.xread(
{key: last_id}, block=self.BLOCK_MS, count=10
)
if not results:
# 超时,检查 stream 是否还存在
if not await self._redis.exists(key):
return
continue
for _stream_name, entries in results:
for stream_id, event, data, terminal in self._iter_entries(entries):
last_id = stream_id
yield (stream_id, event, data)
if terminal:
return
关键点:
XRANGE:重放历史消息(支持断点续传)XREAD BLOCK:阻塞等待新消息(实时推送)({from_id} 跳过 from_id 本身,避免重复推送DONE/ERROR 事件后停止订阅XREAD 超时后检查 Stream 是否还存在断点续传示例:
# 客户端首次连接
async for stream_id, event, data in stream_store.subscribe(canvas_id, node_id, from_id="0-0"):
print(f"{stream_id}: {event} - {data}")
# 假设在 stream_id="1234567890-0" 时断线
# 客户端重连(传入 Last-Event-ID)
async for stream_id, event, data in stream_store.subscribe(canvas_id, node_id, from_id="1234567890-0"):
print(f"{stream_id}: {event} - {data}")
# 从 "1234567890-0" 之后继续推送,无遗漏
@router.get("/canvas/{canvas_id}/node/{node_id}/stream")
async def stream_node_events(canvas_id: str, node_id: str, last_event_id: str = "0-0"):
"""SSE 端点:推送节点执行事件"""
async def event_generator():
try:
async for stream_id, event, data in stream_store.subscribe(
canvas_id, node_id, from_id=last_event_id
):
# SSE 格式:id + event + data
yield f"id: {stream_id}\n"
yield f"event: {event}\n"
yield f"data: {data}\n\n"
except Exception as e:
# 错误事件
yield f"event: error\n"
yield f"data: {json.dumps({'message': str(e)})}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # 禁用 Nginx 缓冲
},
)
关键点:
id + event + data + 空行StreamingResponse 支持异步生成器X-Accel-Buffering: no 禁用 Nginx 缓冲客户端代码:
const eventSource = new EventSource(
`/api/canvas/${canvasId}/node/${nodeId}/stream`
);
eventSource.addEventListener("start", (e) => {
console.log("Task started");
});
eventSource.addEventListener("delta", (e) => {
const data = JSON.parse(e.data);
updateUI(data); // 实时更新 UI
});
eventSource.addEventListener("done", (e) => {
console.log("Task completed");
eventSource.close();
});
eventSource.addEventListener("error", (e) => {
console.error("Task failed", e.data);
eventSource.close();
});
// 断线自动重连(浏览器原生支持)
eventSource.onerror = () => {
console.log("Connection lost, reconnecting...");
// EventSource 会自动重连,并传递 Last-Event-ID
};
async def run_agent_task(canvas_id: str, node_id: str, message: str):
"""后台执行 Agent,事件写入 Redis Stream"""
try:
async with node_status_handler(canvas_id, node_id):
# 1. 写入 START 事件
await stream_store.append(canvas_id, node_id, StreamEvent.START, "null")
# 2. 创建 Agent 会话
async with unit_of_work() as db:
fresh_node = await service.get_node(db, canvas_id, node_id)
handler = get_handler(fresh_node.type)
agent = await _create_agent_session(db, fresh_node, handler, canvas_id)
# 3. 流式执行 Agent
async for event in agent.run(message):
await stream_store.append(
canvas_id, node_id, StreamEvent.DELTA, json.dumps(event)
)
# 4. 持久化结果
agent_result = agent.get_result()
streaming_result = handler.format_result(fresh_node, agent_result)
if streaming_result is not None:
merged_data = {**fresh_node.data, **streaming_result.data}
df_id = await persist_node_dataframe(merged_data, fresh_node.title, db)
await service.update_node(
db_session=db,
canvas_id=canvas_id,
node_id=node_id,
data=merged_data,
dataframe_id=df_id,
)
# 5. 写入 DONE 事件
await stream_store.append(canvas_id, node_id, StreamEvent.DONE, "null")
except Exception as e:
# 6. 写入 ERROR 事件
await stream_store.append(
canvas_id, node_id, StreamEvent.ERROR, json.dumps({"message": str(e)[:200]})
)
raise
finally:
# 7. 设置 TTL
await stream_store.complete(canvas_id, node_id)
关键点:
XREAD 批量读取:
results = await self._redis.xread(
{key: last_id}, block=self.BLOCK_MS, count=10
)
count=10:每次最多读取 10 条消息MAXLEN 限制:
stream_id = await self._redis.xadd(
key, {"event": event, "data": data}, maxlen=STREAM_MAXLEN, approximate=True
)
maxlen=10000:限制 Stream 最大长度approximate=True:允许近似裁剪,性能更好TTL 自动清理:
await self._redis.expire(key, self.STREAM_TTL_SECONDS)
事件持久化:
断点续传:
Last-Event-ID 恢复XRANGE 重放历史消息,无遗漏跨实例广播:
幂等性:
# 任务可能运行数分钟
async for event in agent.run(message):
await stream_store.append(canvas_id, node_id, StreamEvent.DELTA, json.dumps(event))
# 每生成一段内容,立即推送
优势:
# 客户端 A 订阅
async for stream_id, event, data in stream_store.subscribe(canvas_id, node_id):
print(f"Client A: {event}")
# 客户端 B 订阅(同时)
async for stream_id, event, data in stream_store.subscribe(canvas_id, node_id):
print(f"Client B: {event}")
优势:
# 任务完成后,新客户端连接
async for stream_id, event, data in stream_store.subscribe(canvas_id, node_id, from_id="0-0"):
print(f"{event}: {data}")
# 完整回放所有历史事件
优势:
| 方案 | 实时性 | 断点续传 | 横向扩展 | 部署成本 | 可靠性 |
|---|---|---|---|---|---|
| Redis Streams + SSE | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| WebSocket + Redis Pub/Sub | ⭐⭐⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ |
| 短轮询 | ⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ |
| 长轮询 | ⭐⭐⭐⭐ | ⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ |
| Kafka + WebSocket | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐⭐ |
AskTable 选择 Redis Streams + SSE 的原因:
问题:Nginx 默认会缓冲响应,导致 SSE 延迟。
解决方案:
headers={
"X-Accel-Buffering": "no", # 禁用 Nginx 缓冲
}
问题:
XREAD BLOCK 超时后,如何判断 Stream 是否还存在?
解决方案:
if not results:
# 超时,检查 stream 是否还存在
if not await self._redis.exists(key):
return # Stream 已删除,停止订阅
continue # Stream 还在,继续等待
问题:Redis Stream ID 格式是
{timestamp}-{sequence},如何正确解析?
解决方案:
XRANGE 和 XREAD 都支持字符串比较问题:客户端可能收到重复事件(如网络抖动)。
解决方案:
stream_idSet 去重const processedIds = new Set();
eventSource.addEventListener("delta", (e) => {
if (processedIds.has(e.lastEventId)) {
return; // 已处理,跳过
}
processedIds.add(e.lastEventId);
updateUI(JSON.parse(e.data));
});
AskTable 的 Canvas 流式执行架构,通过 Redis Streams + SSE + arq 的组合,实现了:
✅ 实时性:毫秒级推送延迟 ✅ 断点续传:网络断线后自动恢复,无遗漏 ✅ 横向扩展:支持多实例部署,事件跨实例广播 ✅ 低成本:复用 Redis,无需额外组件 ✅ 高可靠性:事件持久化,支持历史回放
我们计划将 NodeStreamStore 和 SSE 端点实现 开源,帮助更多团队构建实时推送系统。敬请期待!
相关阅读:
技术交流: