跳到主要内容

拓扑路由集成设计:接入数学建模与 G5 仿真

日期: 2026-03-16(更新: 2026-03-17) 状态: Phase 1 完成 | Phase 2 完成 | Phase 3 完成 | Phase 4 未实现 前置: topology-routing-design.md(topo_routing 模块已实现) 调研依据: docs/research/simulator-survey-astra-sim-simai.md


核心问题

topo_routing 模块已经能回答"c0 到 c5 的路径是什么",但数学建模和 G5 仿真并没有用它。 两侧仍然各自用层级查表(c2c/b2b/r2r/p2p 四级固定参数)获取通信带宽和延迟。

目标:让两侧评估器都通过统一的路由查询接口获取通信参数,使拓扑结构真正影响性能评估结果。


业界参考

系统Analytical 层做法Simulation 层做法
ASTRA-sim多维 α-β 公式,每维独立 bandwidth/latencyNS-3 包级仿真
SimAIbusbw.yaml 按(并行维度 × 集合操作)配置等效带宽NS-3 + SimCCL P2P 分解
Tier6-Model 现状4 级固定 bandwidth + α-β 公式事件驱动仿真 + 层级查表 C2CPhyLink

两者的共同模式:

  1. 通信参数由拓扑决定,而非硬编码
  2. Analytical 层和 Simulation 层共享同一套拓扑理解
  3. Analytical 层用预计算表,Simulation 层用运行时查询

共用层:RoutingTable

数据结构

@dataclass(frozen=True)
class PairCommSpec:
"""两个芯片之间的通信规格"""
bandwidth_gbps: float # 路径瓶颈带宽(聚合值,= per_rail_bw × rail_count)
latency_us: float # 路径总延迟(含交换机转发)
hop_count: int # 跳数(0 = 同芯片)
path_key: str # 瓶颈边的链路类型(c2c/b2b/r2r/p2p 或自定义)
via_switch: bool # 路径是否经过交换机节点
rail_count: int = 1 # 瓶颈链路的并行 rail 数(G5 展开独立链路用)

rail_count 来自路径中带宽最低(瓶颈)边的 EdgeSpec.rail_count。math 侧使用聚合带宽 bandwidth_gbps,G5 侧按 rail_count 展开独立 C2CPhyLink。

RoutingTable 类

class RoutingTable:
"""初始化时预计算全对路由结果,供数学建模和 G5 共用"""

_table: dict[tuple[str, str], PairCommSpec]

@classmethod
def from_network_graph(cls, graph: NetworkGraph) -> RoutingTable:
"""有 NetworkGraph 时:全对 Dijkstra 预计算"""
...

@classmethod
def from_topology_spec(cls, topo: TopologySpecImpl) -> RoutingTable:
"""无 NetworkGraph 时:fallback 到层级查表"""
...

def get(self, src: str, dst: str) -> PairCommSpec:
"""查询两个芯片之间的通信规格,O(1)"""
...

def get_group_bottleneck(self, chip_ids: list[str]) -> PairCommSpec:
"""查询通信组中带宽最低的一对(瓶颈对)"""
...

注:get_group_bottleneck 是实际实现的方法名(对应设计早期草案中的 get_group_worst)。

辅助函数

def _dominant_link_type(edges: list[EdgeSpec]) -> str:
"""返回路径中带宽最低的边的 link_type,作为 path_key"""
return min(edges, key=lambda e: e.bandwidth_gbps).link_type

def _bottleneck_rail_count(edges: list[EdgeSpec]) -> int:
"""返回路径中带宽最低(瓶颈)边的 rail_count"""
return min(edges, key=lambda e: e.bandwidth_gbps).rail_count

def _has_switch_node(nodes: list[str], graph: NetworkGraph) -> bool:
"""路径是否经过任何交换机节点"""
return any(graph.nodes[n].type == "switch" for n in nodes)

文件位置

perfmodel/arch/topo_routing/
├── __init__.py # 导出 RoutingTable, PairCommSpec
├── graph.py # EdgeSpec 含 rail_count / per_rail_bw_gbps
├── graph_builder.py # _apply_links 支持 count 字段(聚合带宽)
├── graph_validator.py # 不变
├── router.py # 不变
└── routing_table.py # RoutingTable + PairCommSpec

性能预估

芯片数芯片对数Dijkstra 次数预计初始化耗时
82828< 1 ms
32496496~ 5 ms
6420162016~ 20 ms
12881288128~ 100 ms

规模在可接受范围。128 芯片以上可考虑延迟初始化或按需缓存。


数学建模侧接入

实现流程

初始化: build_eval_config() → _build_routing_table() → EvalConfig.routing_table

L3 comm_ops.py: _resolve_path_key() → topology_path_key(层级推断,保留为 fallback 信息)
↓ attrs["chip_ids"] = ["c0","c1",...] ← 字符串格式,与 RoutingTable key 一致
_run_math_pipeline(): engine.evaluate(..., routing_table=eval_config.routing_table)

EvaluationEngine: evaluator.evaluate(..., routing_table=routing_table)

L4 CommEvaluator: 用 chip_ids 查 RoutingTable → 直接使用精确带宽/延迟
↓ fallback: routing_table 为 None 或 chip_ids 不足时,从 hardware dict 按 path_key 查

各文件实现

EvalConfig 持有 RoutingTable

# perfmodel/entry/eval_config.py
@dataclass
class EvalConfig:
...
routing_table: RoutingTable | None = None # 已实现

build_eval_config() 中,当拓扑配置包含 network 段时自动构建:

def _build_routing_table(topology_config: dict[str, Any]) -> RoutingTable | None:
if "network" not in topology_config:
return None
graph = build_network_graph(topology_config)
return RoutingTable.from_network_graph(graph)

EvaluationEngine 传入 routing_table

# perfmodel/evaluation/math/engine.py
def evaluate(self, ..., routing_table=None) -> EngineResult:
...
step_metrics = evaluator.evaluate(
...,
routing_table=routing_table,
)

_build_attrs() 对 COMM op 把 chip_ids 序列化为字符串格式,与 RoutingTable key 对齐:

attrs["chip_ids"] = json.dumps([f"c{cid}" for cid in op.chip_ids])
# 产出: '["c0","c1","c8","c9"]',而非整数列表

_run_math_pipeline 传入 routing_table

# perfmodel/entry/engine.py
engine_result = engine.evaluate(
...
routing_table=eval_config.routing_table,
)

CommEvaluator 使用 routing_table

# perfmodel/evaluation/math/evaluators/comm.py
def evaluate(self, ..., routing_table=None) -> StepMetrics:
chip_ids_raw = attrs.get("chip_ids", "[]")
chip_ids = json.loads(chip_ids_raw) if chip_ids_raw else []

if routing_table is not None and len(chip_ids) >= 2:
pair_spec = routing_table.get_group_bottleneck(chip_ids)
intra_bw = pair_spec.bandwidth_gbps * 1e9
inter_bw = pair_spec.bandwidth_gbps * 1e9
else:
# fallback: 从 hardware dict 按 path_key 查
...

注意comm_ops.py 中的 _resolve_path_key() 未修改,仍保留层级推断逻辑,结果存入 topology_path_key。当 routing_table 不可用时,CommEvaluator fallback 到 path_key 查 hardware dict,行为与之前完全一致。


G5 仿真侧接入

已完成流程

初始化: EvalConfig.routing_table(同一个,与数学建模共用)

MultiChipSim.__init__(routing_table=routing_table)

_resolve_pair_configs(chip_ids, link_configs, topology, routing_table):
routing_table.get(f"c{src}", f"c{dst}") → PairCommSpec
↓ C2CLinkConfig(bandwidth_gbps=spec.bandwidth_gbps, base_latency_ns=spec.latency_us*1000)
↓ 每对芯片创建 C2CPhyLink(参数来自路由结果)

具体实现

# perfmodel/evaluation/g5/top/multi_chip.py
def _resolve_pair_configs(chip_ids, link_configs, topology, routing_table=None):
for src in chip_ids:
for dst in chip_ids:
if src == dst:
continue
if routing_table is not None:
try:
spec = routing_table.get(f"c{src}", f"c{dst}")
pair_configs[(src, dst)] = C2CLinkConfig(
bandwidth_gbps=spec.bandwidth_gbps,
base_latency_ns=spec.latency_us * 1000,
)
continue
except KeyError:
pass # fallback
# Fallback: 按 chip_board_map 层级推断
...

架构不变性

  • 保持"每对芯片直连 C2CPhyLink"的模型,不做多跳仿真
  • C2CPhyLink 的 _busy_until_ns 争用机制继续生效
  • fallback(chip_board_map 层级推断)保留,供无 network 段的旧拓扑使用

同时支持新格式(network.link_types)和旧格式(interconnect.links):

def _get_link_configs(self) -> dict[str, C2CLinkConfig]:
# 优先:新格式 network.link_types
network = self._topology.get("network")
if network is not None:
return self._get_link_configs_from_network(network)
# Fallback:旧格式 interconnect.links
interconnect = self._topology.get("interconnect")
if interconnect is None:
raise ValueError("Missing 'network' or 'interconnect' in topology config")
return self._get_link_configs_from_interconnect(interconnect)

远期:多跳仿真(可选,不在本次范围)

如果未来需要建模中间节点争用:

  • 只为物理相邻节点创建 C2CPhyLink
  • 数据沿 RoutePath 逐跳传输,经过每个中间节点
  • 中间交换机节点增加转发延迟和端口争用
  • 这是 SimAI NS-3 后端的简化版

多流竞争因子(未实现,Phase 4)

问题

当多个通信组同时使用同一条链路时,实际可用带宽被均分。当前模型假设每个流独享链路带宽。

方案

在 RoutingTable 基础上,增加链路使用计数

class ContentionModel:
"""统计每条链路被多少个通信流共享"""

def estimate_contention(
self,
graph: NetworkGraph,
comm_groups: list[list[str]], # 每个通信组的芯片列表
) -> dict[str, float]:
"""返回每条边的争用因子(concurrent_flows 数)"""
edge_usage: dict[str, int] = defaultdict(int)
for group in comm_groups:
for i, src in enumerate(group):
for dst in group[i+1:]:
route = dijkstra_route(graph, src, dst)
for edge in route.edges:
edge_usage[edge.id] += 1
return {eid: count for eid, count in edge_usage.items()}

通信评估器可选择性地使用:

effective_bw = bottleneck_bw / contention_factor

优先级

这是精度增强项,需要调用层能提供所有并发通信组的列表,当前架构中此信息不容易拿到。ASTRA-sim Analytical 后端的 congestion_aware 模式做的就是这件事。


Switch 拓扑的 direct 算法(未实现,Phase 4)

背景

参考 ASTRA-sim:Switch 拓扑用 direct 算法(单步全连接,不需要 Ring 的 2(N-1) 步展开)。

判断逻辑

当路由结果的 via_switch=True 时,意味着通信组内的芯片通过交换机互联,可以使用 direct(单步)通信:

Ring AllReduce:    T = 2(N-1) × (α + M/(N×β))    # 多步
Direct AllReduce: T = 2 × (α + M/β) # 单步(通过交换机聚合)

适用条件

  • 通信组内所有芯片对的路由都经过同一个交换机(星型拓扑)
  • 交换机支持 in-network aggregation(or 简化假设:交换机提供全带宽交叉)

实现位置

CommEvaluator 中根据 via_switch 标记选择公式。PairCommSpec.via_switch 字段已就绪,但 CommEvaluator 尚未使用它(始终走 Ring/Tree 公式)。


实施步骤

Phase 1:RoutingTable 共用层(已完成)

  1. 新建 perfmodel/arch/topo_routing/routing_table.py
  2. 实现 PairCommSpec(含 rail_count)和 RoutingTable
  3. 实现 from_network_graph()from_topology_spec() 两个构造方法
  4. graph.py EdgeSpec 加 rail_count / per_rail_bw_gbps
  5. graph_builder.py _apply_links 支持 count 字段(聚合带宽)
  6. 单元测试:全对路由正确性、对称性、multi-rail 透传
  7. __init__.py 导出

Phase 2:数学建模侧接入(已完成)

  1. EvalConfig 增加 routing_table 字段
  2. build_eval_config() 中构建 RoutingTable
  3. CommEvaluatorrouting_table=None 参数,有 chip_ids 时查 RoutingTable
  4. EvaluationEngine.evaluate()routing_table=None 参数并传给 evaluator
  5. _build_attrs() 把 chip_ids 序列化为字符串 ["c0","c1"],与 RoutingTable key 对齐
  6. _run_math_pipeline() 传入 eval_config.routing_table
  7. 集成测试:_build_attrs chip_ids 格式验证 + evaluate 签名验证

Phase 3:G5 仿真侧接入(已完成)

  1. MultiChipSim.__init__ 接收 routing_table 参数
  2. _get_link_configs 支持新格式 network.link_types + 旧格式 interconnect.links fallback
  3. _resolve_pair_configs 优先查 RoutingTable,fallback 到 chip_board_map 层级推断
  4. 集成测试:C2CPhyLink 参数与路由结果一致

Phase 4:精度增强(未实现)

  1. Switch direct 算法:via_switch 时 CommEvaluator 切换公式
  2. 多流竞争因子:ContentionModel 实现
  3. 多维 AllReduce 分相位计算

验收标准

Phase 1(已完成)

  • RoutingTable 对所有现有拓扑配置正确构建
  • get(src, dst) 返回的 bandwidth/latency 与 dijkstra_route() 结果一致
  • get_group_bottleneck() 返回通信组中带宽最低的一对
  • rail_count 从瓶颈边正确透传到 PairCommSpec
  • 无 NetworkGraph 时 fallback 到层级查表,结果与现有行为一致

Phase 2(已完成)

  • EvalConfig.routing_table 在有 network 段时自动构建
  • CommEvaluator 接口支持 routing_table 参数
  • EvaluationEngine.evaluate() 接受并传递 routing_table
  • _build_attrs() 产出字符串格式 chip_ids(与 RoutingTable key 一致)
  • _run_math_pipeline() 把 eval_config.routing_table 传入 engine
  • network 段的旧配置不受影响(routing_table=None,fallback 到 hardware dict)

Phase 3(已完成)

  • G5 仿真中每对 C2CPhyLink 的参数来自 RoutingTable
  • 同 board 芯片对的带宽高于跨 rack 芯片对
  • network 段时 fallback 到现有行为

Phase 4(未实现)

  • Switch 拓扑下 AllReduce 延迟显著低于 Ring 拓扑(direct 算法生效)
  • 多流竞争场景下有效带宽降低(拥塞因子生效)