生产级GCN链路支持

issue 676
对应PR

记录范围:d8248cad89945b5830e499c90f5e26f55e588e56473659b22c61f55a0d3bc7a6155153982277daa8

1. 两次主要提交的分工

  • d8248cad 主要补齐 DSL 侧的 gcn() 内建算子、配置项、运行时上下文扩展,以及图遍历到推理输入的组装逻辑。
  • 473659b2 主要补齐 infer 侧的 Python 进程、共享内存队列、上下文池、文件分发和启动脚本,让 DSL 算子可以真正跨进程调用 Python 推理。

2. DSL 侧主链路

  • BuildInSqlFunctionTable 注册 GCN,因此 SQL 可以直接写 CALL gcn().
  • DSLConfigKeys 新增 geaflow.dsl.gcn.hopsgeaflow.dsl.gcn.fanoutgeaflow.dsl.gcn.edge.directiongeaflow.dsl.gcn.vertex.feature.fields.
  • GCN.init() 做完整参数校验:必须运行在 AlgorithmModelRuntimeContext 上,必须开启 geaflow.infer.env.enable=truehops >= 1fanout 只能是 -1 或正数,边方向只能是 IN/OUT/BOTH,特征字段必须存在且是数值标量。
  • GCN.process() 在第一轮初始化 GCNState,构造本地 fragment 后向邻居发送 GCNExpandMessage;后续轮次根据 GCNFragmentMessageGCNExpandMessage 继续展开,并用 expandedRootMessages 防止重复扩散。
  • MergedNeighborhoodCollector 会合并静态边和动态图边,按 src->dst@label 去重,再按 fanout 做采样。
  • GCNState 保存 GCNAccumulator、已展开的根消息集合和动态执行标记。
  • GCNPayloadAssembler 将累计到的子图组装成 Python 侧需要的 payload:center_node_idsampled_nodesnode_featuresedge_indexedge_weight.
  • GCNResultDecoder 读取 Python 返回的 map,并转成 (node_id, embedding, prediction, confidence) 的结果行。

3. 动态图版本

  • GeaFlowAlgorithmAggTraversalFunctionGeaFlowAlgorithmDynamicAggTraversalFunction 会在 userFunction instanceof GCN 时选择对应的 model runtime context。
  • 静态图路径使用 GeaFlowAlgorithmModelRuntimeContext,动态图路径使用 GeaFlowAlgorithmDynamicModelRuntimeContext.
  • 动态上下文会把当前窗口的临时顶点/边合并进 GCN 邻域,保证 GCN 在增量图里也能拿到最新邻居和动态顶点值。
  • GCNfinish() 阶段只依赖 modelContext.infer(payload),因此静态和动态图最终都汇入同一条推理通道。

4. Infer 侧主链路

  • InferContextPool 以配置摘要作为 key,管理同配置下的 InferContext 复用与回收。
  • InferContextLease 负责借用/归还 InferContext,避免重复创建 Python 进程。
  • InferContext 内部创建 DataExchangeContextInferTaskRunImpl,前者负责共享内存队列,后者负责拉起 Python 子进程。
  • DataExchangeContext 通过 mmap 队列和 shutdown hook 管理收发缓冲区,close() 时会标记完成、关闭队列、删除临时文件并释放 native 内存。
  • InferTaskRunImpl 负责拼装 Python 执行命令,设置 PATHPYTHONPATHLD_LIBRARY_PATH,然后启动推理子进程。
  • InferDependencyManagerInferFileUtils 负责把 inferRuntime 资源、requirements.txtmodel.pt、用户 Python 文件复制到作业目录。
  • InferEnvironmentManager 负责首次创建 infer 环境、写 _lock/_finish/_failed 标记、安装 conda 和 Python 依赖。

5. Python 侧主链路

  • infer_server.py 在启动时通过 importlib 加载 TransFormFunctionUDF.py 中指定的 transform class。
  • TorchInferSession 先调用 load_model(model_path),再调用 transform_pre(),最后调用 transform_post(),这意味着模型状态要由 transform class 自己维护。
  • GCNTestTransform 是测试桩,不是真实模型;它只验证 Java 到 Python 的协议和返回格式。
  • model.ptrequirements.txt 是链路样例资源,验证的是文件分发和协议正确性,不是模型精度。

6. 端到端执行图

SQL: CALL gcn()
  -> BuildInSqlFunctionTable
  -> GCN.init / process / finish
  -> AlgorithmModelRuntimeContext.infer(payload)
  -> InferContextPool.borrow()
  -> InferContext
  -> InferTaskRunImpl
  -> infer_server.py
  -> TorchInferSession
  -> 用户 transform class
  -> Python 返回 map
  -> GCNResultDecoder
  -> context.take(result row)

7. 关键数据结构

  • GCNFragmentMessage:单个顶点 fragment,包含 rootIdvertexId、特征向量、边记录和动态执行标记。
  • GCNExpandMessage:消息扩展控制体,包含 rootIddepth 和当前路径,避免回路。
  • GCNEdgeRecord:统一边身份,用于去重和 payload 构造。
  • GCNAccumulator:收集当前 root 的节点特征和边集合。
  • GCNState:算法状态壳,承载 accumulator 和扩散状态。

8. 测试覆盖

  • GCNTest 覆盖了参数校验、邻域合并、状态恢复、消息扩散、payload 组装等核心逻辑。
  • GQLAlgorithmTest 增加了 gql_algorithm_gcn.sql 的端到端查询入口。
  • gql_algorithm_gcn.sql 构造了最小连通图,显式开启 infer env,并通过 CALL gcn() 验证整条链路。

9. 备注

  • 这条链路是典型的 dsl-only 入口,用户只需要写 SQL,不需要单独接外部 API.
  • 生产环境里,infer bootstrap 相关配置应视为高风险输入,尤其是安装脚本、模型文件和 Python 依赖来源。

双实例异步缓冲模型热更新

对应PR

1. 1edb86e2…f6daca38 提交记录

这一段提交把 GCN 从“算子原型”补成了可跑的 infer 链路,重点是把 DSL 侧、runtime 侧、infer 侧串起来。

  • DSL/runtime 侧开始支持模型化算子:新增 AlgorithmModelRuntimeContext 相关实现,GCN 通过 model context 读取动态顶点值、动态图边,并在 finish() 阶段调用 infer().
  • GeaFlowAlgorithmAggTraversalFunctionGeaFlowAlgorithmDynamicAggTraversalFunction 会在 GCN 场景下切到 model runtime context,保证静态图和动态图都能走同一条推理通道。
  • 推理上下文增加了池化和租约:InferContextPoolInferContextLeaseINFER_CONTEXT_POOL_MAX_SIZEINFER_CONTEXT_POOL_BORROW_TIMEOUT_SEC,避免每个顶点都重新拉起 Python 进程。
  • InferContextDataExchangeContext 做了可关闭、可复用、可破损回收的生命周期管理,解决共享内存队列和子进程清理问题。
  • 文件准备链路补全了 model.ptrequirements.txt、Python runtime 资源的分发,同时 InferFileUtils 增加了 classpath fallback,避免运行时找不到用户 jar 时直接失败。
  • InferEnvironmentManagerinstall-infer-env.sh 补了环境创建、conda 安装、锁文件和失败标记逻辑,确保 infer 环境只创建一次。
  • inferSession.py 改成先 load_model() 再执行 transform_pre() / transform_post(),和 TransFormFunctionUDF.py 的协议保持一致。
  • setup.py 增加 Cython 缺失时的 fallback 编译路径,减少构建环境对 Python 包版本的硬依赖。
  • PortUtilTypeCastUtilFrameworkConfigKeysDSLConfigKeys 这类公共基础设施同步补了 GCN 和 infer 所需能力,比如端口回退、数组转字符串、GCN 参数和 infer 池参数。

这段提交的最终效果是:SQL CALL gcn() 能把图邻域采样结果打包成 payload,经共享内存送到 Python 推理进程,再把返回结果解码回 DSL 输出行。

11. 热更新实现记录

这里的热更新不是在同一个 Python 进程里原地替换模型,而是通过“新配置 -> 新上下文 -> 旧上下文回收”的方式实现运行时切换。

  • InferContextPool 用配置摘要生成池 key,配置变化会自然落到新的 PoolEntry,避免旧的 InferContext 继续承载新模型请求。
  • InferContext 在推理失败时会把自己标成 broken,并立刻停止 Python 子进程,防止坏上下文被复用。
  • InferContextLease.close() 会把上下文按 broken 状态归还给池,broken 的上下文不会回到 idle 队列,而是直接关闭。
  • InferContext.close() 会先 markFinished(),再关闭共享内存、数据桥和任务进程,保证热更新期间旧请求能尽快收敛退出。
  • DataExchangeContext 新增了 markFinished()、shutdown hook 和幂等释放逻辑,解决切换上下文时共享内存和 native 内存泄漏的问题。
  • InferEnvironmentManager 通过 _lock_finish_failed 文件控制环境创建状态,保证热更新场景下不会重复安装或并发踩踏。
  • inferSession.py 采用“构造时 load_model(),运行时只做 transform_pre/transform_post”的模式,模型文件更新后只要新上下文重启,新的权重就会生效。

整体上,这套热更新策略是“按配置和上下文粒度切换”,不是在线 patch 进程内对象。