生产级GCN链路支持
记录范围:d8248cad89945b5830e499c90f5e26f55e588e56 与 473659b22c61f55a0d3bc7a6155153982277daa8。
1. 两次主要提交的分工
d8248cad主要补齐 DSL 侧的gcn()内建算子、配置项、运行时上下文扩展,以及图遍历到推理输入的组装逻辑。473659b2主要补齐 infer 侧的 Python 进程、共享内存队列、上下文池、文件分发和启动脚本,让 DSL 算子可以真正跨进程调用 Python 推理。
2. DSL 侧主链路
BuildInSqlFunctionTable注册GCN,因此 SQL 可以直接写CALL gcn().DSLConfigKeys新增geaflow.dsl.gcn.hops、geaflow.dsl.gcn.fanout、geaflow.dsl.gcn.edge.direction、geaflow.dsl.gcn.vertex.feature.fields.GCN.init()做完整参数校验:必须运行在AlgorithmModelRuntimeContext上,必须开启geaflow.infer.env.enable=true,hops >= 1,fanout只能是-1或正数,边方向只能是IN/OUT/BOTH,特征字段必须存在且是数值标量。GCN.process()在第一轮初始化GCNState,构造本地 fragment 后向邻居发送GCNExpandMessage;后续轮次根据GCNFragmentMessage和GCNExpandMessage继续展开,并用expandedRootMessages防止重复扩散。MergedNeighborhoodCollector会合并静态边和动态图边,按src->dst@label去重,再按fanout做采样。GCNState保存GCNAccumulator、已展开的根消息集合和动态执行标记。GCNPayloadAssembler将累计到的子图组装成 Python 侧需要的 payload:center_node_id、sampled_nodes、node_features、edge_index、edge_weight.GCNResultDecoder读取 Python 返回的 map,并转成(node_id, embedding, prediction, confidence)的结果行。
3. 动态图版本
GeaFlowAlgorithmAggTraversalFunction和GeaFlowAlgorithmDynamicAggTraversalFunction会在userFunction instanceof GCN时选择对应的 model runtime context。- 静态图路径使用
GeaFlowAlgorithmModelRuntimeContext,动态图路径使用GeaFlowAlgorithmDynamicModelRuntimeContext. - 动态上下文会把当前窗口的临时顶点/边合并进 GCN 邻域,保证
GCN在增量图里也能拿到最新邻居和动态顶点值。 GCN在finish()阶段只依赖modelContext.infer(payload),因此静态和动态图最终都汇入同一条推理通道。
4. Infer 侧主链路
InferContextPool以配置摘要作为 key,管理同配置下的InferContext复用与回收。InferContextLease负责借用/归还InferContext,避免重复创建 Python 进程。InferContext内部创建DataExchangeContext和InferTaskRunImpl,前者负责共享内存队列,后者负责拉起 Python 子进程。DataExchangeContext通过 mmap 队列和 shutdown hook 管理收发缓冲区,close()时会标记完成、关闭队列、删除临时文件并释放 native 内存。InferTaskRunImpl负责拼装 Python 执行命令,设置PATH、PYTHONPATH、LD_LIBRARY_PATH,然后启动推理子进程。InferDependencyManager和InferFileUtils负责把inferRuntime资源、requirements.txt、model.pt、用户 Python 文件复制到作业目录。InferEnvironmentManager负责首次创建 infer 环境、写_lock/_finish/_failed标记、安装 conda 和 Python 依赖。
5. Python 侧主链路
infer_server.py在启动时通过importlib加载TransFormFunctionUDF.py中指定的 transform class。TorchInferSession先调用load_model(model_path),再调用transform_pre(),最后调用transform_post(),这意味着模型状态要由 transform class 自己维护。GCNTestTransform是测试桩,不是真实模型;它只验证 Java 到 Python 的协议和返回格式。model.pt和requirements.txt是链路样例资源,验证的是文件分发和协议正确性,不是模型精度。
6. 端到端执行图
SQL: CALL gcn()
-> BuildInSqlFunctionTable
-> GCN.init / process / finish
-> AlgorithmModelRuntimeContext.infer(payload)
-> InferContextPool.borrow()
-> InferContext
-> InferTaskRunImpl
-> infer_server.py
-> TorchInferSession
-> 用户 transform class
-> Python 返回 map
-> GCNResultDecoder
-> context.take(result row)
7. 关键数据结构
GCNFragmentMessage:单个顶点 fragment,包含rootId、vertexId、特征向量、边记录和动态执行标记。GCNExpandMessage:消息扩展控制体,包含rootId、depth和当前路径,避免回路。GCNEdgeRecord:统一边身份,用于去重和 payload 构造。GCNAccumulator:收集当前 root 的节点特征和边集合。GCNState:算法状态壳,承载 accumulator 和扩散状态。
8. 测试覆盖
GCNTest覆盖了参数校验、邻域合并、状态恢复、消息扩散、payload 组装等核心逻辑。GQLAlgorithmTest增加了gql_algorithm_gcn.sql的端到端查询入口。gql_algorithm_gcn.sql构造了最小连通图,显式开启 infer env,并通过CALL gcn()验证整条链路。
9. 备注
- 这条链路是典型的
dsl-only入口,用户只需要写 SQL,不需要单独接外部 API. - 生产环境里,infer bootstrap 相关配置应视为高风险输入,尤其是安装脚本、模型文件和 Python 依赖来源。
双实例异步缓冲模型热更新
1. 1edb86e2…f6daca38 提交记录
这一段提交把 GCN 从“算子原型”补成了可跑的 infer 链路,重点是把 DSL 侧、runtime 侧、infer 侧串起来。
- DSL/runtime 侧开始支持模型化算子:新增
AlgorithmModelRuntimeContext相关实现,GCN通过 model context 读取动态顶点值、动态图边,并在finish()阶段调用infer(). GeaFlowAlgorithmAggTraversalFunction与GeaFlowAlgorithmDynamicAggTraversalFunction会在GCN场景下切到 model runtime context,保证静态图和动态图都能走同一条推理通道。- 推理上下文增加了池化和租约:
InferContextPool、InferContextLease、INFER_CONTEXT_POOL_MAX_SIZE、INFER_CONTEXT_POOL_BORROW_TIMEOUT_SEC,避免每个顶点都重新拉起 Python 进程。 InferContext、DataExchangeContext做了可关闭、可复用、可破损回收的生命周期管理,解决共享内存队列和子进程清理问题。- 文件准备链路补全了
model.pt、requirements.txt、Python runtime 资源的分发,同时InferFileUtils增加了 classpath fallback,避免运行时找不到用户 jar 时直接失败。 InferEnvironmentManager和install-infer-env.sh补了环境创建、conda 安装、锁文件和失败标记逻辑,确保 infer 环境只创建一次。inferSession.py改成先load_model()再执行transform_pre()/transform_post(),和TransFormFunctionUDF.py的协议保持一致。setup.py增加 Cython 缺失时的 fallback 编译路径,减少构建环境对 Python 包版本的硬依赖。PortUtil、TypeCastUtil、FrameworkConfigKeys、DSLConfigKeys这类公共基础设施同步补了 GCN 和 infer 所需能力,比如端口回退、数组转字符串、GCN 参数和 infer 池参数。
这段提交的最终效果是:SQL CALL gcn() 能把图邻域采样结果打包成 payload,经共享内存送到 Python 推理进程,再把返回结果解码回 DSL 输出行。
11. 热更新实现记录
这里的热更新不是在同一个 Python 进程里原地替换模型,而是通过“新配置 -> 新上下文 -> 旧上下文回收”的方式实现运行时切换。
InferContextPool用配置摘要生成池 key,配置变化会自然落到新的PoolEntry,避免旧的InferContext继续承载新模型请求。InferContext在推理失败时会把自己标成broken,并立刻停止 Python 子进程,防止坏上下文被复用。InferContextLease.close()会把上下文按broken状态归还给池,broken的上下文不会回到 idle 队列,而是直接关闭。InferContext.close()会先markFinished(),再关闭共享内存、数据桥和任务进程,保证热更新期间旧请求能尽快收敛退出。DataExchangeContext新增了markFinished()、shutdown hook 和幂等释放逻辑,解决切换上下文时共享内存和 native 内存泄漏的问题。InferEnvironmentManager通过_lock、_finish、_failed文件控制环境创建状态,保证热更新场景下不会重复安装或并发踩踏。inferSession.py采用“构造时load_model(),运行时只做transform_pre/transform_post”的模式,模型文件更新后只要新上下文重启,新的权重就会生效。
整体上,这套热更新策略是“按配置和上下文粒度切换”,不是在线 patch 进程内对象。