拓扑路由集成设计:接入数学建模与 G5 仿真
日期: 2026-03-16(更新: 2026-03-17) 状态: Phase 1 完成 | Phase 2 完成 | Phase 3 完成 | Phase 4 未实现 前置: topology-routing-design.md(topo_routing 模块已实现) 调研依据: docs/research/simulator-survey-astra-sim-simai.md
核心问题
topo_routing 模块已经能回答"c0 到 c5 的路径是什么",但数学建模和 G5 仿真并没有用它。 两侧仍然各自用层级查表(c2c/b2b/r2r/p2p 四级固定参数)获取通信带宽和延迟。
目标:让两侧评估器都通过统一的路由查询接口获取通信参数,使拓扑结构真正影响性能评估结果。
业界参考
| 系统 | Analytical 层做法 | Simulation 层做法 |
|---|---|---|
| ASTRA-sim | 多维 α-β 公式,每维独立 bandwidth/latency | NS-3 包级仿真 |
| SimAI | busbw.yaml 按(并行维度 × 集合操作)配置等效带宽 | NS-3 + SimCCL P2P 分解 |
| Tier6-Model 现状 | 4 级固定 bandwidth + α-β 公式 | 事件驱动仿真 + 层级查表 C2CPhyLink |
两者的共同模式:
- 通信参数由拓扑决定,而非硬编码
- Analytical 层和 Simulation 层共享同一套拓扑理解
- Analytical 层用预计算表,Simulation 层用运行时查询
共用层:RoutingTable
数据结构
@dataclass(frozen=True)
class PairCommSpec:
"""两个芯片之间的通信规格"""
bandwidth_gbps: float # 路径瓶颈带宽(聚合值,= per_rail_bw × rail_count)
latency_us: float # 路径总延迟(含交换机转发)
hop_count: int # 跳数(0 = 同芯片)
path_key: str # 瓶颈边的链路类型(c2c/b2b/r2r/p2p 或自定义)
via_switch: bool # 路径是否经过交换机节点
rail_count: int = 1 # 瓶颈链路的并行 rail 数(G5 展开独立链路用)
rail_count 来自路径中带宽最低(瓶颈)边的 EdgeSpec.rail_count。math 侧使用聚合带宽 bandwidth_gbps,G5 侧按 rail_count 展开独立 C2CPhyLink。
RoutingTable 类
class RoutingTable:
"""初始化时预计算全对路由结果,供数学建模和 G5 共用"""
_table: dict[tuple[str, str], PairCommSpec]
@classmethod
def from_network_graph(cls, graph: NetworkGraph) -> RoutingTable:
"""有 NetworkGraph 时:全对 Dijkstra 预计算"""
...
@classmethod
def from_topology_spec(cls, topo: TopologySpecImpl) -> RoutingTable:
"""无 NetworkGraph 时:fallback 到层级查表"""
...
def get(self, src: str, dst: str) -> PairCommSpec:
"""查询两个芯片之间的通信规格,O(1)"""
...
def get_group_bottleneck(self, chip_ids: list[str]) -> PairCommSpec:
"""查询通信组中带宽最低的一对(瓶颈对)"""
...
注:get_group_bottleneck 是实际实现的方法名(对应设计早期草案中的 get_group_worst)。
辅助函数
def _dominant_link_type(edges: list[EdgeSpec]) -> str:
"""返回路径中带宽最低的边的 link_type,作为 path_key"""
return min(edges, key=lambda e: e.bandwidth_gbps).link_type
def _bottleneck_rail_count(edges: list[EdgeSpec]) -> int:
"""返回路径中带宽最低(瓶颈)边的 rail_count"""
return min(edges, key=lambda e: e.bandwidth_gbps).rail_count
def _has_switch_node(nodes: list[str], graph: NetworkGraph) -> bool:
"""路径是否经过任何交换机节点"""
return any(graph.nodes[n].type == "switch" for n in nodes)
文件位置
perfmodel/arch/topo_routing/
├── __init__.py # 导出 RoutingTable, PairCommSpec
├── graph.py # EdgeSpec 含 rail_count / per_rail_bw_gbps
├── graph_builder.py # _apply_links 支持 count 字段(聚合带宽)
├── graph_validator.py # 不变
├── router.py # 不变
└── routing_table.py # RoutingTable + PairCommSpec
性能预估
| 芯片数 | 芯片对数 | Dijkstra 次数 | 预计初始化耗时 |
|---|---|---|---|
| 8 | 28 | 28 | < 1 ms |
| 32 | 496 | 496 | ~ 5 ms |
| 64 | 2016 | 2016 | ~ 20 ms |
| 128 | 8128 | 8128 | ~ 100 ms |
规模在可接受范围。128 芯片以上可考虑延迟初始化或按需缓存。
数学建模侧接入
实现流程
初始化: build_eval_config() → _build_routing_table() → EvalConfig.routing_table
↓
L3 comm_ops.py: _resolve_path_key() → topology_path_key(层级推断,保留为 fallback 信息)
↓ attrs["chip_ids"] = ["c0","c1",...] ← 字符串格式,与 RoutingTable key 一致
_run_math_pipeline(): engine.evaluate(..., routing_table=eval_config.routing_table)
↓
EvaluationEngine: evaluator.evaluate(..., routing_table=routing_table)
↓
L4 CommEvaluator: 用 chip_ids 查 RoutingTable → 直接使用精确带宽/延迟
↓ fallback: routing_table 为 None 或 chip_ids 不足时,从 hardware dict 按 path_key 查
各文件实现
EvalConfig 持有 RoutingTable
# perfmodel/entry/eval_config.py
@dataclass
class EvalConfig:
...
routing_table: RoutingTable | None = None # 已实现
build_eval_config() 中,当拓扑配置包含 network 段时自动构建:
def _build_routing_table(topology_config: dict[str, Any]) -> RoutingTable | None:
if "network" not in topology_config:
return None
graph = build_network_graph(topology_config)
return RoutingTable.from_network_graph(graph)
EvaluationEngine 传入 routing_table
# perfmodel/evaluation/math/engine.py
def evaluate(self, ..., routing_table=None) -> EngineResult:
...
step_metrics = evaluator.evaluate(
...,
routing_table=routing_table,
)
_build_attrs() 对 COMM op 把 chip_ids 序列化为字符串格式,与 RoutingTable key 对齐:
attrs["chip_ids"] = json.dumps([f"c{cid}" for cid in op.chip_ids])
# 产出: '["c0","c1","c8","c9"]',而非整数列表
_run_math_pipeline 传入 routing_table
# perfmodel/entry/engine.py
engine_result = engine.evaluate(
...
routing_table=eval_config.routing_table,
)
CommEvaluator 使用 routing_table
# perfmodel/evaluation/math/evaluators/comm.py
def evaluate(self, ..., routing_table=None) -> StepMetrics:
chip_ids_raw = attrs.get("chip_ids", "[]")
chip_ids = json.loads(chip_ids_raw) if chip_ids_raw else []
if routing_table is not None and len(chip_ids) >= 2:
pair_spec = routing_table.get_group_bottleneck(chip_ids)
intra_bw = pair_spec.bandwidth_gbps * 1e9
inter_bw = pair_spec.bandwidth_gbps * 1e9
else:
# fallback: 从 hardware dict 按 path_key 查
...
注意:comm_ops.py 中的 _resolve_path_key() 未修改,仍保留层级推断逻辑,结果存入 topology_path_key。当 routing_table 不可用时,CommEvaluator fallback 到 path_key 查 hardware dict,行为与之前完全一致。
G5 仿真侧接入
已完成流程
初始化: EvalConfig.routing_table(同一个,与数学建模共用)
↓
MultiChipSim.__init__(routing_table=routing_table)
↓
_resolve_pair_configs(chip_ids, link_configs, topology, routing_table):
routing_table.get(f"c{src}", f"c{dst}") → PairCommSpec
↓ C2CLinkConfig(bandwidth_gbps=spec.bandwidth_gbps, base_latency_ns=spec.latency_us*1000)
↓ 每对芯片创建 C2CPhyLink(参数来自路由结果)
具体实现
# perfmodel/evaluation/g5/top/multi_chip.py
def _resolve_pair_configs(chip_ids, link_configs, topology, routing_table=None):
for src in chip_ids:
for dst in chip_ids:
if src == dst:
continue
if routing_table is not None:
try:
spec = routing_table.get(f"c{src}", f"c{dst}")
pair_configs[(src, dst)] = C2CLinkConfig(
bandwidth_gbps=spec.bandwidth_gbps,
base_latency_ns=spec.latency_us * 1000,
)
continue
except KeyError:
pass # fallback
# Fallback: 按 chip_board_map 层级推断
...
架构不变性
- 保持"每对芯片直连 C2CPhyLink"的模型,不做多跳仿真
- C2CPhyLink 的
_busy_until_ns争用机制继续生效 - fallback(chip_board_map 层级推断)保留,供无
network段的旧拓扑使用
_get_link_configs 格式支持
同时支持新格式(network.link_types)和旧格式(interconnect.links):
def _get_link_configs(self) -> dict[str, C2CLinkConfig]:
# 优先:新格式 network.link_types
network = self._topology.get("network")
if network is not None:
return self._get_link_configs_from_network(network)
# Fallback:旧格式 interconnect.links
interconnect = self._topology.get("interconnect")
if interconnect is None:
raise ValueError("Missing 'network' or 'interconnect' in topology config")
return self._get_link_configs_from_interconnect(interconnect)
远期:多跳仿真(可选,不在本次范围)
如果未来需要建模中间节点争用:
- 只为物理相邻节点创建 C2CPhyLink
- 数据沿 RoutePath 逐跳传输,经过每个中间节点
- 中间交换机节点增加转发延迟和端口争用
- 这是 SimAI NS-3 后端的简化版
多流竞争因子(未实现,Phase 4)
问题
当多个通信组同时使用同一条链路时,实际可用带宽被均分。当前模型假设每个流独享链路带宽。
方案
在 RoutingTable 基础上,增加链路使用计数:
class ContentionModel:
"""统计每条链路被多少个通信流共享"""
def estimate_contention(
self,
graph: NetworkGraph,
comm_groups: list[list[str]], # 每个通信组的芯片列表
) -> dict[str, float]:
"""返回每条边的争用因子(concurrent_flows 数)"""
edge_usage: dict[str, int] = defaultdict(int)
for group in comm_groups:
for i, src in enumerate(group):
for dst in group[i+1:]:
route = dijkstra_route(graph, src, dst)
for edge in route.edges:
edge_usage[edge.id] += 1
return {eid: count for eid, count in edge_usage.items()}
通信评估器可选择性地使用:
effective_bw = bottleneck_bw / contention_factor
优先级
这是精度增强项,需要调用层能提供所有并发通信组的列表,当前架构中此信息不容易拿到。ASTRA-sim Analytical 后端的 congestion_aware 模式做的就是这件事。
Switch 拓扑的 direct 算法(未实现,Phase 4)
背景
参考 ASTRA-sim:Switch 拓扑用 direct 算法(单步全连接,不需要 Ring 的 2(N-1) 步展开)。
判断逻辑
当路由结果的 via_switch=True 时,意味着通信组内的芯片通过交换机互联,可以使用 direct(单步)通信:
Ring AllReduce: T = 2(N-1) × (α + M/(N×β)) # 多步
Direct AllReduce: T = 2 × (α + M/β) # 单步(通过交换机聚合)
适用条件
- 通信组内所有芯片对的路由都经过同一个交换机(星型拓扑)
- 交换机支持 in-network aggregation(or 简化假设:交换机提供全带宽交叉)
实现位置
在 CommEvaluator 中根据 via_switch 标记选择公式。PairCommSpec.via_switch 字段已就绪,但 CommEvaluator 尚未使用它(始终走 Ring/Tree 公式)。
实施步骤
Phase 1:RoutingTable 共用层(已完成)
- 新建
perfmodel/arch/topo_routing/routing_table.py - 实现
PairCommSpec(含rail_count)和RoutingTable - 实现
from_network_graph()和from_topology_spec()两个构造方法 -
graph.pyEdgeSpec 加rail_count/per_rail_bw_gbps -
graph_builder.py_apply_links支持count字段(聚合带宽) - 单元测试:全对路由正确性、对称性、multi-rail 透传
- 在
__init__.py导出
Phase 2:数学建模侧接入(已完成)
-
EvalConfig增加routing_table字段 -
build_eval_config()中构建 RoutingTable -
CommEvaluator加routing_table=None参数,有 chip_ids 时查 RoutingTable -
EvaluationEngine.evaluate()加routing_table=None参数并传给 evaluator -
_build_attrs()把 chip_ids 序列化为字符串["c0","c1"],与 RoutingTable key 对齐 -
_run_math_pipeline()传入eval_config.routing_table - 集成测试:
_build_attrschip_ids 格式验证 +evaluate签名验证
Phase 3:G5 仿真侧接入(已完成)
-
MultiChipSim.__init__接收routing_table参数 -
_get_link_configs支持新格式network.link_types+ 旧格式interconnect.linksfallback -
_resolve_pair_configs优先查 RoutingTable,fallback 到 chip_board_map 层级推断 - 集成测试:C2CPhyLink 参数与路由结果一致
Phase 4:精度增强(未实现)
- Switch direct 算法:
via_switch时 CommEvaluator 切换公式 - 多流竞争因子:
ContentionModel实现 - 多维 AllReduce 分相位计算
验收标准
Phase 1(已完成)
- RoutingTable 对所有现有拓扑配置正确构建
-
get(src, dst)返回的 bandwidth/latency 与dijkstra_route()结果一致 -
get_group_bottleneck()返回通信组中带宽最低的一对 -
rail_count从瓶颈边正确透传到 PairCommSpec - 无 NetworkGraph 时 fallback 到层级查表,结果与现有行为一致
Phase 2(已完成)
- EvalConfig.routing_table 在有 network 段时自动构建
- CommEvaluator 接口支持 routing_table 参数
- EvaluationEngine.evaluate() 接受并传递 routing_table
- _build_attrs() 产出字符串格式 chip_ids(与 RoutingTable key 一致)
- _run_math_pipeline() 把 eval_config.routing_table 传入 engine
- 无
network段的旧配置不受影响(routing_table=None,fallback 到 hardware dict)
Phase 3(已完成)
- G5 仿真中每对 C2CPhyLink 的参数来自 RoutingTable
- 同 board 芯片对的带宽高于跨 rack 芯片对
- 无
network段时 fallback 到现有行为
Phase 4(未实现)
- Switch 拓扑下 AllReduce 延迟显著低于 Ring 拓扑(direct 算法生效)
- 多流竞争场景下有效带宽降低(拥塞因子生效)