
企业微信

飞书
选择您喜欢的方式加入群聊

扫码添加咨询专家
在 Canvas 画布中,节点之间存在复杂的依赖关系:Chart 节点依赖 Data 节点的查询结果,Python 节点依赖多个父节点的 DataFrame。当一个节点更新时,如何智能地刷新依赖它的子节点?如何避免重复计算?如何保证执行顺序的正确性?
AskTable 的 Canvas 依赖图引擎,通过 拓扑排序 + 状态机 + 增量计算 的组合,实现了高效、可靠的节点执行管理。
本文将深入剖析这套引擎的设计与实现。
Canvas 中的节点形成一个 有向无环图(DAG):
加载图表中...
依赖关系:
手动刷新:
全量刷新:
✅ 智能刷新:自动识别需要刷新的节点 ✅ 增量计算:只刷新受影响的节点 ✅ 正确顺序:按依赖关系顺序执行 ✅ 并发执行:无依赖的节点可以并行执行 ✅ 缓存复用:未变更的节点复用缓存结果
拓扑排序是对 DAG 的一种线性排序,使得对于每条有向边
u → v,节点 u 都排在节点 v 之前。
示例:
依赖关系:A → C, A → D, B → D, D → E 拓扑排序结果:A, B, D, C, E(或 B, A, D, C, E)
def topological_sort(nodes: list[NodeModel]) -> list[NodeModel]: """ 拓扑排序节点列表(Kahn 算法) 仅考虑节点列表内部的依赖关系,外部依赖不参与排序。 """ if not nodes: return [] # 1. 构建图和入度表 node_ids = {n.id for n in nodes} in_degree: dict[str, int] = {n.id: 0 for n in nodes} graph: dict[str, list[str]] = defaultdict(list) for node in nodes: for dep_id in node.dependencies or []: if dep_id in node_ids: # 只考虑内部依赖 graph[dep_id].append(node.id) in_degree[node.id] += 1 # 2. 将入度为 0 的节点加入队列 queue = deque(nid for nid, deg in in_degree.items() if deg == 0) sorted_ids: list[str] = [] # 3. BFS 遍历 while queue: nid = queue.popleft() sorted_ids.append(nid) # 移除该节点后,更新子节点的入度 for child in graph[nid]: in_degree[child] -= 1 if in_degree[child] == 0: queue.append(child) # 4. 返回排序后的节点列表 id_to_node = {n.id: n for n in nodes} return [id_to_node[nid] for nid in sorted_ids if nid in id_to_node]
关键点:
示例:
# 节点依赖关系 nodes = [ Node(id="A", dependencies=[]), Node(id="B", dependencies=[]), Node(id="C", dependencies=["A"]), Node(id="D", dependencies=["A", "B"]), Node(id="E", dependencies=["D"]), ] # 拓扑排序结果 sorted_nodes = topological_sort(nodes) # 结果:[A, B, C, D, E] 或 [B, A, C, D, E]
class NodeStatus: PENDING = "pending" # 待执行 STREAMING = "streaming" # 执行中 SUCCESS = "success" # 执行成功 ERROR = "error" # 执行失败
状态转移:
加载图表中...
@asynccontextmanager async def node_status_handler(canvas_id: str, node_id: str): """ 并发安全的节点状态管理。 状态转移使用独立事务立即 commit。 """ # 1. 获取节点锁(原子操作) await db.acquire_node_lock( canvas_id, node_id, expected_statuses=[NodeStatus.PENDING, NodeStatus.SUCCESS, NodeStatus.ERROR], processing_status=NodeStatus.STREAMING, ) log.info(f"Node {node_id}: -> streaming") try: yield # 执行节点任务 except Exception as e: # 2. 执行失败,释放锁并设置 ERROR 状态 await db.release_node_lock(canvas_id, node_id, NodeStatus.ERROR, str(e)[:200]) log.error(f"Node {node_id} failed: {e}") raise else: # 3. 执行成功,释放锁并设置 SUCCESS 状态 await db.release_node_lock(canvas_id, node_id, NodeStatus.SUCCESS) log.info(f"Node {node_id}: streaming -> success")
关键点:
acquire_node_lock 使用数据库行锁(FOR UPDATE)acquire_node_lock 实现:
async def acquire_node_lock( canvas_id: str, node_id: str, expected_statuses: list[str], processing_status: str, ): """原子获取节点锁""" async with unit_of_work() as db: # 使用 FOR UPDATE 锁住节点行 node = ( await db.execute( select(NodeModel) .filter(NodeModel.canvas_id == canvas_id, NodeModel.id == node_id) .with_for_update() ) ).scalar_one() # 检查状态是否符合预期 if node.status not in expected_statuses: raise errors.ConflictError(f"Node {node_id} is {node.status}, cannot acquire lock") # 更新状态为 processing node.status = processing_status await db.commit()
何时需要刷新节点?
节点自身变更:
依赖节点变更:
手动触发:
def find_affected_nodes( all_nodes: list[NodeModel], changed_node_id: str ) -> list[NodeModel]: """找出受某个节点变更影响的所有子孙节点""" # 1. 构建依赖图 graph: dict[str, list[str]] = defaultdict(list) for node in all_nodes: for dep_id in node.dependencies or []: graph[dep_id].append(node.id) # 2. BFS 遍历所有子孙节点 affected_ids = set() queue = deque([changed_node_id]) while queue: nid = queue.popleft() for child_id in graph[nid]: if child_id not in affected_ids: affected_ids.add(child_id) queue.append(child_id) # 3. 返回受影响的节点(按拓扑排序) affected_nodes = [n for n in all_nodes if n.id in affected_ids] return topological_sort(affected_nodes)
示例:
# 节点依赖关系:A → C, A → D, B → D, D → E # 当节点 A 变更时 affected = find_affected_nodes(all_nodes, "A") # 结果:[C, D, E](按拓扑排序) # 当节点 D 变更时 affected = find_affected_nodes(all_nodes, "D") # 结果:[E]
async def load_parent_context( db: AsyncSession, canvas_id: str, dependency_ids: list[str], sample_rows: int = 5, ) -> list[dict]: """加载父节点上下文(复用缓存的 DataFrame)""" parent_contexts = [] for dep_id in dependency_ids: parent = await service.get_node(db, canvas_id, dep_id) # 检查父节点状态 if parent.status != "success": continue # 跳过未成功的父节点 # 检查是否有缓存的 DataFrame if not parent.dataframe_id: continue # 跳过无数据的父节点 # 从 Parquet 文件读取 DataFrame project_id = project_id_var.get() try: raw_df = await dataframe_client.read(f"{project_id}/{parent.dataframe_id}") except FileNotFoundError: continue # Parquet 文件缺失 # 构建上下文 records = raw_df.to_dict(orient="records") context = { "id": parent.id, "description": parent.description, "dataframe": { "columns": raw_df.columns.tolist(), "data": records[:sample_rows], # 采样数据 "sample_data": records[:sample_rows], }, } parent_contexts.append(context) return parent_contexts
关键点:
async def batch_refresh_nodes(canvas_id: str, node_ids: list[str]): """批量刷新节点(按拓扑排序并发执行)""" async with unit_of_work() as db: # 1. 获取所有节点 all_nodes = await service.get_nodes(db, canvas_id) # 2. 找出需要刷新的节点 nodes_to_refresh = [n for n in all_nodes if n.id in node_ids] # 3. 拓扑排序 sorted_nodes = topological_sort(nodes_to_refresh) # 4. 按层级分组(同一层级可以并发执行) levels = group_by_level(sorted_nodes, all_nodes) # 5. 逐层执行 for level_nodes in levels: # 并发执行同一层级的节点 tasks = [ execute_node(canvas_id, node.id) for node in level_nodes ] await asyncio.gather(*tasks, return_exceptions=True)
def group_by_level( sorted_nodes: list[NodeModel], all_nodes: list[NodeModel] ) -> list[list[NodeModel]]: """将节点按依赖层级分组""" # 1. 构建依赖图 node_map = {n.id: n for n in all_nodes} # 2. 计算每个节点的层级 levels: dict[str, int] = {} for node in sorted_nodes: # 节点的层级 = max(父节点层级) + 1 max_parent_level = -1 for dep_id in node.dependencies or []: if dep_id in levels: max_parent_level = max(max_parent_level, levels[dep_id]) levels[node.id] = max_parent_level + 1 # 3. 按层级分组 level_groups: dict[int, list[NodeModel]] = defaultdict(list) for node in sorted_nodes: level_groups[levels[node.id]].append(node) # 4. 返回按层级排序的分组 return [level_groups[i] for i in sorted(level_groups.keys())]
示例:
# 节点依赖关系:A → C, A → D, B → D, D → E # 层级分组结果 levels = group_by_level(sorted_nodes, all_nodes) # 结果: # Level 0: [A, B] # 无依赖,可以并发执行 # Level 1: [C, D] # 依赖 Level 0,可以并发执行 # Level 2: [E] # 依赖 Level 1
# 用户修改了 Data 节点 A 的 SQL 查询 await refresh_node(canvas_id="canvas-123", node_id="A") # 系统自动识别受影响的节点 affected_nodes = find_affected_nodes(all_nodes, "A") # 结果:[C, D, E] # 按拓扑排序执行 for node in affected_nodes: await execute_node(canvas_id, node.id)
# 用户选中多个节点并点击"批量刷新" await batch_refresh_nodes(canvas_id="canvas-123", node_ids=["A", "B", "D"]) # 系统按层级并发执行 # Level 0: [A, B] 并发执行 # Level 1: [D] 等待 A 和 B 完成后执行
# 节点 A 执行成功,DataFrame 缓存到 Parquet await persist_node_dataframe(node_a, df_a) # 节点 C 执行时,直接读取节点 A 的缓存 parent_contexts = await load_parent_context(db, canvas_id, ["A"]) # 无需重新执行节点 A
优化前:
# 串行执行所有节点 for node in sorted_nodes: await execute_node(canvas_id, node.id) # 总耗时 = sum(每个节点的耗时)
优化后:
# 按层级并发执行 for level_nodes in levels: await asyncio.gather(*[execute_node(canvas_id, n.id) for n in level_nodes]) # 总耗时 = sum(每层的最大耗时)
效果:
优化前:
# 每次执行子节点时,重新执行父节点 parent_df = await execute_parent_node(parent_id)
优化后:
# 直接读取父节点的缓存 parent_df = await dataframe_client.read(f"{project_id}/{parent.dataframe_id}")
效果:
优化前:
# 传递完整 DataFrame 给 AI(可能数万行) context = {"dataframe": {"data": raw_df.to_dict(orient="records")}}
优化后:
# 只传递采样数据(5 行) context = {"dataframe": {"sample_data": records[:5]}}
效果:
AskTable 的 Canvas 依赖图引擎,通过 拓扑排序 + 状态机 + 增量计算 的组合,实现了:
✅ 智能刷新:自动识别受影响的节点 ✅ 增量计算:只刷新必要的节点,复用缓存 ✅ 正确顺序:拓扑排序保证执行顺序 ✅ 并发执行:按层级并发,提升性能 ✅ 并发安全:状态机 + 行锁保证一致性
相关阅读:
技术交流: