跳到主要内容

G5 Switch + RoutingTable 集成实现计划

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: 让 G5 指令级仿真使用 Python topo_routing 的 RoutingTable 作为路由数据源,替换 CLE,并将 SwitchModel(VOQ + iSLIP)接入仿真主循环。

Architecture: Python topo_routing 是路由算法的唯一实现。仿真启动前,Python RoutingTable + NetworkGraph 通过 PyO3 getattr 一次性提取到 Rust 结构体中。Rust 侧废弃 CLE,C2CNetwork 从提取的数据重建,支持 chip 和 switch 两类节点。仿真运行时,包按预计算路径逐跳转发,经过 switch 时进入 SwitchModel 的 VOQ/iSLIP 动态仿真。

Tech Stack: Python 3.11+, Rust (PyO3), perfmodel/arch/topo_routing (Python), g5_rs crate (Rust)

设计文档: docs/design/g5-switch-routing-integration/design.md


文件结构

Python 侧修改

文件职责
perfmodel/arch/topo_routing/routing_table.pyPairCommSpec 新增 path_nodes 字段
perfmodel/arch/topo_routing/graph.pyRoutePath docstring 补充语义说明
perfmodel/arch/topo_routing/graph_builder.py_build_switch_nodes 增加 id 校验
perfmodel/arch/topo_routing/strategies/ecmp.pyECMP 浮点 epsilon 比较修复
perfmodel/evaluation/g5/sim_engine.pysimulate() 签名增加 routing 参数
perfmodel/evaluation/g5/pipeline.py传入 network_graph + routing_table

Rust 侧修改

文件职责
src/tier6/switch.rs修复 iSLIP RR、VOQ 准入控制、ECN 标记
src/input.rs新增 extract_routing_data() 从 PyO3 提取图+路由
src/tier6/c2c_network.rs重写: 从路由数据构建,支持 switch 节点
src/tier6/cle.rs删除
src/tier6/mod.rs移除 pub mod cle
src/types.rsSwitchTick 增加 switch_id,C2CLinkDone 增加 arrived_at_node
src/top/multi_chip.rs增加 switches 字段,重写事件循环
src/top/event_handlers.rs重写 handle_rc_pack_done / SwitchTick / C2CLinkDone 分派
src/lib.rssimulate_multi_chip 签名增加 routing 参数

测试文件

文件职责
tests/topology_routing/test_path_nodes.py测试 PairCommSpec.path_nodes
tests/topology_routing/test_ecmp_epsilon.py测试 ECMP 浮点比较
tests/topology_routing/test_switch_id_validation.py测试 switch id 校验
Rust inline #[cfg(test)]switch.rs / c2c_network.rs / input.rs 内联测试

Task 1: Python — PairCommSpec 新增 path_nodes 字段

Files:

  • Modify: perfmodel/arch/topo_routing/routing_table.py:19-42 (PairCommSpec dataclass)

  • Modify: perfmodel/arch/topo_routing/routing_table.py:178-192 (from_network_graph 构建逻辑)

  • Modify: perfmodel/arch/topo_routing/routing_table.py:162-171 (self-pair 条目)

  • Modify: perfmodel/arch/topo_routing/routing_table.py:219-234 (from_topology_spec fallback)

  • Test: tests/topology_routing/test_path_nodes.py

  • Step 1: 写失败测试

# tests/topology_routing/test_path_nodes.py
"""测试 PairCommSpec.path_nodes 字段"""

from perfmodel.arch.topo_routing.graph import EdgeSpec, NetworkGraph, NodeSpec
from perfmodel.arch.topo_routing.graph_builder import build_network_graph
from perfmodel.arch.topo_routing.routing_table import RoutingTable


def _make_clos_config():
"""Clos 拓扑: 4 chips, 2 ToR switches, 1 Spine switch"""
return {
"pods": [
{"count": 1, "racks": [
{"count": 1, "boards": [
{"count": 1, "chips": [{"name": "A", "count": 2}]},
]},
{"count": 1, "boards": [
{"count": 1, "chips": [{"name": "A", "count": 2}]},
]},
]},
],
"network": {
"switches": [
{"id": "s0", "type": "tor", "port_count": 32, "forwarding_latency_ns": 150},
{"id": "s1", "type": "tor", "port_count": 32, "forwarding_latency_ns": 150},
{"id": "s2", "type": "spine", "port_count": 64, "forwarding_latency_ns": 200},
],
"link_types": {
"c2c": {"bandwidth_gbps": 448, "latency_us": 0.2},
"b2b": {"bandwidth_gbps": 400, "latency_us": 1.0},
"tor2spine": {"bandwidth_gbps": 800, "latency_us": 2.0},
},
"links": [
{"from": "c0", "to": "c1", "type": "c2c"},
{"from": "c2", "to": "c3", "type": "c2c"},
{"from": ["c0", "c1"], "to": "s0", "type": "b2b"},
{"from": ["c2", "c3"], "to": "s1", "type": "b2b"},
{"from": ["s0", "s1"], "to": "s2", "type": "tor2spine"},
],
},
}


def test_path_nodes_direct():
"""同 board 直连: path_nodes 只有两个 chip"""
graph = build_network_graph(_make_clos_config())
rt = RoutingTable.from_network_graph(graph)
spec = rt.get("c0", "c1")
assert spec.path_nodes == ("c0", "c1")
assert spec.via_switch is False


def test_path_nodes_via_switch():
"""跨 board 经 switch: path_nodes 包含中间 switch 节点"""
graph = build_network_graph(_make_clos_config())
rt = RoutingTable.from_network_graph(graph)
spec = rt.get("c0", "c2")
assert spec.via_switch is True
assert len(spec.path_nodes) > 2
# 路径首尾是 chip
assert spec.path_nodes[0] == "c0"
assert spec.path_nodes[-1] == "c2"
# 中间至少有一个 switch
assert any(n.startswith("s") for n in spec.path_nodes[1:-1])


def test_path_nodes_self():
"""同芯片: path_nodes 空元组"""
graph = build_network_graph(_make_clos_config())
rt = RoutingTable.from_network_graph(graph)
spec = rt.get("c0", "c0")
assert spec.path_nodes == ()


def test_path_nodes_symmetry():
"""(A, B) 和 (B, A) 的 path_nodes 应该反序"""
graph = build_network_graph(_make_clos_config())
rt = RoutingTable.from_network_graph(graph)
ab = rt.get("c0", "c2")
ba = rt.get("c2", "c0")
assert ab.path_nodes == tuple(reversed(ba.path_nodes))
  • Step 2: 运行测试确认失败
cd C:/Users/DELL/Documents/code/Tier6-Model
python -m pytest tests/topology_routing/test_path_nodes.py -v

Expected: FAIL — PairCommSpec 没有 path_nodes 属性

  • Step 3: 实现 — 修改 PairCommSpec 和 from_network_graph

perfmodel/arch/topo_routing/routing_table.py 中:

  1. PairCommSpec dataclass 新增字段:
@dataclass(frozen=True)
class PairCommSpec:
"""两个芯片之间的通信规格(路由结果的精简表示)

Attributes:
bandwidth_gbps: 路径瓶颈聚合带宽(= per_rail_bw x rail_count)
latency_us: 路径总延迟(含交换机转发)
hop_count: 跳数(边数,0 表示同芯片)
path_key: 主要链路类型(瓶颈边的 link_type)
via_switch: 路径是否经过交换机节点
rail_count: 瓶颈链路的并行 rail 数,G5 侧可按此展开独立链路
path_count: 可用路径数(ECMP 时 > 1)
aggregate_bw_gbps: 多路径聚合带宽;0.0 表示未计算
edge_ids: 路径经过的物理边 ID 列表
path_nodes: 路径经过的节点 ID 列表 (含首尾 chip 和中间 switch)
"""

bandwidth_gbps: float
latency_us: float
hop_count: int
path_key: str
via_switch: bool
rail_count: int = 1
path_count: int = 1
aggregate_bw_gbps: float = 0.0
edge_ids: tuple[str, ...] = ()
path_nodes: tuple[str, ...] = ()
  1. from_network_graph 中构建 spec 时保存 path_nodes:

self-pair 条目(约第 163 行):

            table[(src, src)] = PairCommSpec(
bandwidth_gbps=float("inf"),
latency_us=0.0,
hop_count=0,
path_key="c2c",
via_switch=False,
path_count=1,
aggregate_bw_gbps=float("inf"),
)

非 self-pair(约第 180 行),在构建 spec 时加上 path_nodes:

                spec = PairCommSpec(
bandwidth_gbps=primary.bottleneck_bw_gbps,
latency_us=primary.total_latency_us,
hop_count=len(primary.edges),
path_key=_dominant_link_type(primary.edges),
via_switch=_has_switch(primary.nodes, graph),
rail_count=_bottleneck_rail_count(primary.edges),
path_count=len(routes),
aggregate_bw_gbps=agg_bw,
edge_ids=tuple(e.id for e in primary.edges),
path_nodes=tuple(primary.nodes),
)

由于 (src, dst)(dst, src) 共用同一个 spec,需要为反向存一个反转的版本:

                spec_rev = PairCommSpec(
bandwidth_gbps=primary.bottleneck_bw_gbps,
latency_us=primary.total_latency_us,
hop_count=len(primary.edges),
path_key=_dominant_link_type(primary.edges),
via_switch=_has_switch(primary.nodes, graph),
rail_count=_bottleneck_rail_count(primary.edges),
path_count=len(routes),
aggregate_bw_gbps=agg_bw,
edge_ids=tuple(e.id for e in primary.edges),
path_nodes=tuple(reversed(primary.nodes)),
)
table[(src, dst)] = spec
table[(dst, src)] = spec_rev
  1. from_topology_spec 中(约第 226 行),给 PairCommSpec 加 path_nodes:

self-pair: 保持默认空 tuple。 非 self-pair:

                spec = PairCommSpec(
bandwidth_gbps=profile.bandwidth_gbps,
latency_us=profile.latency_us * max(hops, 1),
hop_count=hops,
path_key=path_key,
via_switch=False,
path_nodes=(src, dst),
)
table[(src, dst)] = spec
table[(dst, src)] = PairCommSpec(
bandwidth_gbps=profile.bandwidth_gbps,
latency_us=profile.latency_us * max(hops, 1),
hop_count=hops,
path_key=path_key,
via_switch=False,
path_nodes=(dst, src),
)
  • Step 4: 运行测试确认通过
python -m pytest tests/topology_routing/test_path_nodes.py -v

Expected: 4 PASS

  • Step 5: 运行已有路由测试确认无回归
python -m pytest tests/topology_routing/ -v

Expected: 全部 PASS

  • Step 6: Commit
git add perfmodel/arch/topo_routing/routing_table.py tests/topology_routing/test_path_nodes.py
git commit -m "feat(topo_routing): add path_nodes to PairCommSpec for G5 integration"

Task 2: Python — 修复 ECMP 浮点比较 + graph_builder id 校验 + RoutePath docstring

Files:

  • Modify: perfmodel/arch/topo_routing/strategies/ecmp.py:118

  • Modify: perfmodel/arch/topo_routing/graph_builder.py:76-93

  • Modify: perfmodel/arch/topo_routing/graph.py:42-51

  • Test: tests/topology_routing/test_ecmp_epsilon.py

  • Test: tests/topology_routing/test_switch_id_validation.py

  • Step 1: 写 ECMP epsilon 测试

# tests/topology_routing/test_ecmp_epsilon.py
"""测试 ECMP 浮点 epsilon 比较"""

from perfmodel.arch.topo_routing.graph import EdgeSpec, NetworkGraph, NodeSpec
from perfmodel.arch.topo_routing.strategies.ecmp import ECMPStrategy


def test_ecmp_finds_equal_cost_paths():
"""两条路径代价相同但浮点累加顺序不同,ECMP 应找到两条"""
graph = NetworkGraph()
for nid in ["a", "b", "c", "d"]:
graph.add_node(NodeSpec(id=nid, type="chip"))

# Path a->b->d: 0.1 + 0.2 = 0.3
graph.add_edge(EdgeSpec(id="e0", src="a", dst="b", bandwidth_gbps=100, latency_us=0.1, link_type="c2c"))
graph.add_edge(EdgeSpec(id="e1", src="b", dst="d", bandwidth_gbps=100, latency_us=0.2, link_type="c2c"))
# Path a->c->d: 0.15 + 0.15 = 0.3
graph.add_edge(EdgeSpec(id="e2", src="a", dst="c", bandwidth_gbps=100, latency_us=0.15, link_type="c2c"))
graph.add_edge(EdgeSpec(id="e3", src="c", dst="d", bandwidth_gbps=100, latency_us=0.15, link_type="c2c"))

strategy = ECMPStrategy(max_paths=4)
routes = strategy.compute_routes(graph, "a", "d")
assert len(routes) == 2, f"Expected 2 equal-cost paths, got {len(routes)}"
  • Step 2: 写 switch id 校验测试
# tests/topology_routing/test_switch_id_validation.py
"""测试 _build_switch_nodes 的 id 字段校验"""

import pytest
from perfmodel.arch.topo_routing.graph_builder import build_network_graph


def test_switch_missing_id_raises_valueerror():
"""switch 定义缺少 id 时应 raise ValueError,不是 KeyError"""
config = {
"pods": [{"count": 1, "racks": [{"count": 1, "boards": [
{"count": 1, "chips": [{"name": "A", "count": 1}]},
]}]}],
"network": {
"switches": [
{"port_count": 32, "forwarding_latency_ns": 150}, # 缺少 id
],
"links": [],
},
}
with pytest.raises(ValueError, match="missing 'id'"):
build_network_graph(config)
  • Step 3: 运行测试确认失败
python -m pytest tests/topology_routing/test_ecmp_epsilon.py tests/topology_routing/test_switch_id_validation.py -v

Expected: test_ecmp 可能 PASS 或 FAIL(取决于浮点精度),test_switch_id FAIL(当前抛 KeyError 不是 ValueError)

  • Step 4: 修复 ecmp.py 浮点比较

perfmodel/arch/topo_routing/strategies/ecmp.py,将第 118 行:

                elif nd == current_dist:

改为:

                elif abs(nd - current_dist) < 1e-9:
  • Step 5: 修复 graph_builder.py id 校验

perfmodel/arch/topo_routing/graph_builder.py_build_switch_nodes 函数,在第 78 行 for sw in switches_config: 之后,port_count 检查之前,加入 id 检查:

def _build_switch_nodes(graph: NetworkGraph, switches_config: list[dict]) -> None:
"""Add switch nodes to graph. All required fields must be present."""
for sw in switches_config:
if "id" not in sw:
raise ValueError(
f"Switch config missing 'id': {sw}"
)
if "port_count" not in sw:
raise ValueError(
f"Switch '{sw['id']}' missing 'port_count'"
)
if "forwarding_latency_ns" not in sw:
raise ValueError(
f"Switch '{sw['id']}' missing 'forwarding_latency_ns'"
)
graph.add_node(NodeSpec(
id=sw["id"],
type="switch",
switch_tier=sw.get("type"),
port_count=sw["port_count"],
forwarding_latency_us=sw["forwarding_latency_ns"] / 1000.0,
))
  • Step 6: 补充 RoutePath docstring

perfmodel/arch/topo_routing/graph.pyRoutePath docstring:

@dataclass
class RoutePath:
"""Result of a routing query between two chips.

Note: hop_latencies_us contains only edge propagation delays.
Switch forwarding latencies for intermediate nodes are NOT included
in hop_latencies_us but ARE included in total_latency_us.
Therefore: sum(hop_latencies_us) <= total_latency_us when path
traverses switch nodes.
"""

nodes: list[str]
edges: list[EdgeSpec]
hop_latencies_us: list[float]
hop_bandwidths_gbps: list[float]
total_latency_us: float
bottleneck_bw_gbps: float
single_flow_model: bool = True
  • Step 7: 运行测试确认通过
python -m pytest tests/topology_routing/test_ecmp_epsilon.py tests/topology_routing/test_switch_id_validation.py -v

Expected: 全部 PASS

  • Step 8: 运行全部路由测试无回归
python -m pytest tests/topology_routing/ -v

Expected: 全部 PASS

  • Step 9: Commit
git add perfmodel/arch/topo_routing/strategies/ecmp.py perfmodel/arch/topo_routing/graph_builder.py perfmodel/arch/topo_routing/graph.py tests/topology_routing/test_ecmp_epsilon.py tests/topology_routing/test_switch_id_validation.py
git commit -m "fix(topo_routing): ECMP float epsilon, switch id validation, RoutePath docstring"

Task 3: Rust — 修复 SwitchModel iSLIP 公平性

Files:

  • Modify: perfmodel/evaluation/g5/src/tier6/switch.rs:161-215

  • Step 1: 在 switch.rs 底部 tests module 加入公平性测试

    #[test]
fn test_islip_round_robin_fairness() {
// 3 个输入端口都请求同一个输出端口 0
// 连续 3 轮 tick,每轮应选择不同的输入端口(RR 公平)
let config = SwitchConfig {
port_count: 3,
priority_count: 1,
islip_iterations: 1,
forwarding_latency_ns: 0.0,
port_bandwidth_gbps: 1000.0, // 极大带宽,序列化延迟忽略
buffer_capacity_bytes: 1024 * 1024,
alpha: 8.0,
ecn_kmin: 0.5,
ecn_kmax: 0.8,
frame_size_bytes: 64,
};
let mut sw = SwitchModel::new(config);

let mut selected_inputs = Vec::new();
for round in 0..3u32 {
// 每轮 3 个输入端口各发一个包到输出端口 0
for ingress in 0..3u32 {
sw.forward(ingress, 0, 64, "data".to_string(), round as f64 * 100.0);
}
let results = sw.tick(round as f64 * 100.0);
assert_eq!(results.len(), 1, "round {}: should match exactly 1 (single output)", round);
selected_inputs.push(results[0].ingress_port);
}
// 3 轮应该选了 3 个不同的输入端口
selected_inputs.sort();
assert_eq!(selected_inputs, vec![0, 1, 2],
"iSLIP RR should cycle through all 3 inputs: got {:?}", selected_inputs);
}
  • Step 2: 运行测试确认失败
cd perfmodel/evaluation/g5
cargo test test_islip_round_robin_fairness -- --nocapture

Expected: FAIL — 当前 islip_accept 不按 RR 指针轮询

  • Step 3: 重写 islip_schedule / islip_grant / islip_accept

替换 switch.rs 中从 fn islip_schedulefn islip_accept 结束(第 161-215 行):

    fn islip_schedule(&mut self) -> Vec<(u32, u32, u32)> {
let n_ports = self.config.port_count;
let mut matched = Vec::new();
let mut matched_inputs = std::collections::HashSet::new();
let mut matched_outputs = std::collections::HashSet::new();

for iteration in 0..self.config.islip_iterations {
// Step 1: Request — 每个未匹配输入端口向有数据的输出端口发请求
// egress -> [(ingress, priority)]
let mut requests: HashMap<u32, Vec<(u32, u32)>> = HashMap::new();
for (&(ingress, egress, prio), q) in &self.voq {
if q.is_empty()
|| matched_inputs.contains(&ingress)
|| matched_outputs.contains(&egress)
{
continue;
}
requests.entry(egress).or_default().push((ingress, prio));
}

// Step 2: Grant — 每个输出端口从 grant_pointer 开始 RR 选一个输入
// grants: ingress -> (egress, priority)
let mut grants: HashMap<u32, Vec<(u32, u32)>> = HashMap::new();
for (&egress, candidates) in &requests {
let start = self.grant_pointers[egress as usize];
for offset in 0..n_ports {
let candidate_in = (start + offset) % n_ports;
if let Some(&(_, prio)) = candidates.iter()
.find(|(inp, _)| *inp == candidate_in)
{
grants.entry(candidate_in).or_default().push((egress, prio));
break;
}
}
}

// Step 3: Accept — 每个输入端口从 accept_pointer 开始 RR 选一个输出
for (&ingress, offered) in &grants {
if matched_inputs.contains(&ingress) {
continue;
}
let start = self.accept_pointers[ingress as usize];
for offset in 0..n_ports {
let candidate_out = (start + offset) % n_ports;
if let Some(&(_, prio)) = offered.iter()
.find(|(eg, _)| *eg == candidate_out)
{
if matched_outputs.contains(&candidate_out) {
continue;
}
matched.push((ingress, candidate_out, prio));
matched_inputs.insert(ingress);
matched_outputs.insert(candidate_out);
// 只在第一轮迭代更新指针 (标准 iSLIP)
if iteration == 0 {
self.grant_pointers[candidate_out as usize] = (ingress + 1) % n_ports;
self.accept_pointers[ingress as usize] = (candidate_out + 1) % n_ports;
}
break;
}
}
}
}

matched
}

同时删除旧的 islip_requestislip_accept 方法(第 177-215 行),它们的逻辑已内联到新的 islip_schedule 中。

  • Step 4: 运行测试确认通过
cargo test test_islip -- --nocapture

Expected: test_islip_round_robin_fairness PASS,test_forward_and_tick PASS,test_classify_priority PASS

  • Step 5: Commit
git add perfmodel/evaluation/g5/src/tier6/switch.rs
git commit -m "fix(g5/switch): implement true iSLIP RR grant/accept scheduling"

Task 4: Rust — SwitchModel VOQ 准入控制 + ECN 标记

Files:

  • Modify: perfmodel/evaluation/g5/src/tier6/switch.rs:9-20 (SwitchConfig)

  • Modify: perfmodel/evaluation/g5/src/tier6/switch.rs:67-108 (SwitchModel::new, forward)

  • Modify: perfmodel/evaluation/g5/src/tier6/switch.rs:111-154 (tick, SwitchForwardResult)

  • Modify: perfmodel/evaluation/g5/src/tier6/switch.rs:232-238 (SwitchForwardResult struct)

  • Step 1: 在 tests module 加入准入控制和 ECN 测试

    #[test]
fn test_voq_admission_control_drops() {
let config = SwitchConfig {
port_count: 2,
priority_count: 1,
buffer_capacity_bytes: 200, // 很小的缓冲
alpha: 1.0,
forwarding_latency_ns: 0.0,
port_bandwidth_gbps: 1000.0,
islip_iterations: 1,
ecn_kmin: 0.5,
ecn_kmax: 0.8,
frame_size_bytes: 64,
};
let mut sw = SwitchModel::new(config);

// 第一个包: 150 bytes, 应该入队 (150 < 200)
let ok = sw.forward(0, 1, 150, "op1".to_string(), 0.0);
assert!(ok, "first packet should be admitted");

// 第二个包: 150 bytes, 总共 300 > 200, 应该被拒绝
let ok = sw.forward(0, 1, 150, "op2".to_string(), 0.0);
assert!(!ok, "second packet should be dropped (buffer full)");
}

#[test]
fn test_ecn_marking() {
let config = SwitchConfig {
port_count: 2,
priority_count: 1,
buffer_capacity_bytes: 1000,
alpha: 100.0, // 大 alpha,不触发准入控制
forwarding_latency_ns: 0.0,
port_bandwidth_gbps: 1000.0,
islip_iterations: 1,
ecn_kmin: 0.3, // 30% 开始标记
ecn_kmax: 0.6, // 60% 必标记
frame_size_bytes: 64,
};
let mut sw = SwitchModel::new(config);

// 填充到 70% (700 bytes) — 超过 ecn_kmax
sw.forward(0, 1, 700, "op1".to_string(), 0.0);
let results = sw.tick(0.0);
assert_eq!(results.len(), 1);
assert!(results[0].ecn_marked, "should be ECN marked when buffer > kmax");
}
  • Step 2: 运行测试确认失败
cargo test test_voq_admission test_ecn_marking -- --nocapture

Expected: FAIL — forward 当前返回 () 不是 boolSwitchForwardResult 没有 ecn_marked

  • Step 3: 修改 forward() 加入准入控制,返回 bool
    /// 入口:帧入队到 VOQ,返回 true=入队成功,false=丢包
pub fn forward(&mut self, ingress_port: u32, egress_port: u32,
data_bytes: u64, op_id: String, now_ns: f64) -> bool {
let priority = Self::classify_priority(&op_id);
let key = (ingress_port, egress_port, priority);

// 动态阈值准入控制
let queue_depth = *self.per_queue_bytes.get(&key).unwrap_or(&0);
let shared_available = self.config.buffer_capacity_bytes.saturating_sub(self.buffer_used);
let dynamic_quota = (self.config.alpha * shared_available as f64) as u64;
if queue_depth + data_bytes > dynamic_quota {
self.drops += 1;
return false;
}

let entry = VOQEntry {
ingress_port,
egress_port,
priority,
data_bytes,
op_id,
enqueue_ns: now_ns,
};

self.voq.entry(key).or_default().push_back(entry);
*self.per_queue_bytes.entry(key).or_insert(0) += data_bytes;
self.buffer_used += data_bytes;
true
}
  • Step 4: SwitchForwardResult 增加 ecn_marked 字段
pub struct SwitchForwardResult {
pub ingress_port: u32,
pub egress_port: u32,
pub priority: u32,
pub data_bytes: u64,
pub op_id: String,
pub finish_ns: f64,
pub ecn_marked: bool,
}
  • Step 5: tick() 出队时加入 ECN 标记逻辑

tick() 方法中,构造 SwitchForwardResult 之前,加入 ECN 判断:

    pub fn tick(&mut self, now_ns: f64) -> Vec<SwitchForwardResult> {
let matched = self.islip_schedule();
let mut results = Vec::new();

for (ingress, egress, _prio) in matched {
let key = (ingress, egress, _prio);
if let Some(q) = self.voq.get_mut(&key) {
if let Some(entry) = q.pop_front() {
*self.per_queue_bytes.entry(key).or_insert(0) =
self.per_queue_bytes[&key].saturating_sub(entry.data_bytes);
self.buffer_used = self.buffer_used.saturating_sub(entry.data_bytes);

let bw = self.config.port_bandwidth_gbps;
let serialization_ns = if bw > 0.0 {
entry.data_bytes as f64 / bw
} else {
0.0
};

let port_busy = self.port_busy_until.get(&egress).copied().unwrap_or(0.0);
let contention_wait = (port_busy - now_ns).max(0.0);

let total_latency = contention_wait
+ self.config.forwarding_latency_ns
+ serialization_ns;
let finish_ns = now_ns + total_latency;

self.port_busy_until.insert(egress, now_ns + contention_wait + serialization_ns);
self.packets_forwarded += 1;

// ECN 标记: buffer 使用率超过 kmax 必标记,kmin~kmax 之间标记
let utilization = if self.config.buffer_capacity_bytes > 0 {
self.buffer_used as f64 / self.config.buffer_capacity_bytes as f64
} else {
0.0
};
let ecn_marked = utilization >= self.config.ecn_kmin;

results.push(SwitchForwardResult {
ingress_port: ingress,
egress_port: egress,
priority: _prio,
data_bytes: entry.data_bytes,
op_id: entry.op_id,
finish_ns,
ecn_marked,
});
}
}
}

results
}
  • Step 6: 运行测试确认通过
cargo test test_voq_admission test_ecn_marking test_forward_and_tick test_islip -- --nocapture

Expected: 全部 PASS

  • Step 7: Commit
git add perfmodel/evaluation/g5/src/tier6/switch.rs
git commit -m "feat(g5/switch): VOQ admission control + ECN marking"

Task 5: Rust — Event 类型改造 + 删除 CLE

Files:

  • Modify: perfmodel/evaluation/g5/src/types.rs:285-341

  • Delete: perfmodel/evaluation/g5/src/tier6/cle.rs

  • Modify: perfmodel/evaluation/g5/src/tier6/mod.rs

  • Step 1: 修改 Event enum

types.rs 中:

  1. SwitchTick 加上 switch_id:
    /// Switch iSLIP tick (需求驱动: 有包入 VOQ 时安排)
SwitchTick {
switch_id: String,
},
  1. C2CLinkDone 增加 arrived_at_node 字段区分 chip/switch:
    /// C2C Link 传输完成,包到达下一跳节点
/// arrived_at_node: 当前跳到达的节点 ID (chip 或 switch)
/// final_dst: 最终目的地芯片
C2CLinkDone {
src_chip: u32,
arrived_at_node: String,
final_dst: u32,
slot_idx: u16,
psn: u16,
payload_bytes: u32,
txn_id: u64,
qp_id: u16,
vc_id: u32,
},
  1. 删除 SwitchDone variant(统一用 C2CLinkDone + 节点类型分派)。
  • Step 2: 删除 cle.rs,修改 mod.rs

删除文件 perfmodel/evaluation/g5/src/tier6/cle.rs

修改 perfmodel/evaluation/g5/src/tier6/mod.rs:

pub mod c2c_link;
pub mod c2c_network;
pub mod paxi;
pub mod paxi_core;
pub mod rc_link_rx;
pub mod rc_link_tx;
pub mod switch;

(移除 pub mod cle;

  • Step 3: 编译确认类型错误(暂不修复引用方)
cargo check 2>&1 | head -50

Expected: 编译错误在 multi_chip.rsevent_handlers.rsc2c_network.rssingle_chip.rs 中,因为它们引用了旧的 Event variant 和 cle 模块。这些会在后续 Task 中修复。

  • Step 4: Commit(允许编译不通过的中间状态)
git add perfmodel/evaluation/g5/src/types.rs perfmodel/evaluation/g5/src/tier6/mod.rs
git rm perfmodel/evaluation/g5/src/tier6/cle.rs
git commit -m "refactor(g5): reshape Event types for switch integration, remove CLE"

Task 6: Rust — 重写 C2CNetwork

Files:

  • Rewrite: perfmodel/evaluation/g5/src/tier6/c2c_network.rs

  • Step 1: 重写 c2c_network.rs

完全替换 c2c_network.rs 内容:

// C2C Network: 从 Python RoutingTable 构建的物理网络模型
//
// 支持两类节点: chip 和 switch。
// 路由表从 Python 预计算的 path_nodes 导入,不做路由计算。
// 物理链路建模: 序列化延迟 + 争用 (busy_until)。

use std::collections::HashMap;

/// 物理链路 (单向)
struct PhyLink {
bandwidth_bytes_per_ns: f64,
base_latency_ns: f64,
busy_until_ns: f64,
}

impl PhyLink {
fn new(bandwidth_bytes_per_ns: f64, base_latency_ns: f64) -> Self {
Self { bandwidth_bytes_per_ns, base_latency_ns, busy_until_ns: 0.0 }
}

fn transmit(&mut self, wire_bytes: u64, now_ns: f64) -> f64 {
let serialization_ns = if self.bandwidth_bytes_per_ns > 0.0 {
wire_bytes as f64 / self.bandwidth_bytes_per_ns
} else {
0.0
};
let contention_wait = (self.busy_until_ns - now_ns).max(0.0);
let start_ns = now_ns + contention_wait;
self.busy_until_ns = start_ns + serialization_ns;
contention_wait + serialization_ns + self.base_latency_ns
}
}

/// 节点类型
#[derive(Debug, Clone)]
pub enum NodeType {
Chip { chip_id: u32 },
Switch,
}

/// Switch 节点配置 (用于创建 SwitchModel)
#[derive(Debug, Clone)]
pub struct SwitchNodeConfig {
pub port_count: u32,
pub forwarding_latency_ns: f64,
}

/// C2C 网络 (从 Python 路由数据构建)
pub struct C2CNetwork {
// 物理链路: (src_node_id, dst_node_id) -> PhyLink
links: HashMap<(String, String), PhyLink>,
// 预计算路由: (src_chip, dst_chip) -> [node_id_0, node_id_1, ...]
routes: HashMap<(u32, u32), Vec<String>>,
// 节点类型查找
node_types: HashMap<String, NodeType>,
// chip short_id -> chip_id 映射 ("c0" -> 0)
chip_name_to_id: HashMap<String, u32>,
}

pub struct TransmitResult {
pub arrive_ns: f64,
pub next_node: String,
}

impl C2CNetwork {
/// 从提取的路由数据构建网络
///
/// nodes: [(id, type_str, chip_id_opt)]
/// edges: [(src, dst, bandwidth_gbps, latency_us)]
/// routes: [(src_chip_id, dst_chip_id, [node_id, ...])]
pub fn build(
nodes: Vec<(String, String, Option<u32>)>,
edges: Vec<(String, String, f64, f64)>,
routes: Vec<(u32, u32, Vec<String>)>,
) -> Self {
let mut node_types = HashMap::new();
let mut chip_name_to_id = HashMap::new();

for (id, type_str, chip_id_opt) in &nodes {
match type_str.as_str() {
"chip" => {
let cid = chip_id_opt.unwrap_or_else(|| {
// 从 "c0", "c1" 等提取数字
id.trim_start_matches('c').parse::<u32>()
.unwrap_or_else(|_| panic!("Cannot parse chip_id from '{}'", id))
});
node_types.insert(id.clone(), NodeType::Chip { chip_id: cid });
chip_name_to_id.insert(id.clone(), cid);
}
"switch" => {
node_types.insert(id.clone(), NodeType::Switch);
}
other => panic!("Unknown node type: '{}'", other),
}
}

let mut links = HashMap::new();
for (src, dst, bw_gbps, lat_us) in edges {
let bw_bytes_per_ns = bw_gbps / 8.0; // GB/s = bytes/ns
let lat_ns = lat_us * 1000.0;
// 双向链路
links.insert(
(src.clone(), dst.clone()),
PhyLink::new(bw_bytes_per_ns, lat_ns),
);
links.insert(
(dst.clone(), src.clone()),
PhyLink::new(bw_bytes_per_ns, lat_ns),
);
}

let mut route_map = HashMap::new();
for (src_cid, dst_cid, path) in routes {
route_map.insert((src_cid, dst_cid), path);
}

Self { links, routes: route_map, node_types, chip_name_to_id }
}

/// 在物理链路上传输数据
pub fn transmit(
&mut self, from_node: &str, to_node: &str, wire_bytes: u64, now_ns: f64,
) -> Option<TransmitResult> {
let key = (from_node.to_string(), to_node.to_string());
let link = self.links.get_mut(&key)?;
let latency = link.transmit(wire_bytes, now_ns);
Some(TransmitResult {
arrive_ns: now_ns + latency,
next_node: to_node.to_string(),
})
}

/// 查路由: 给定 (src_chip, dst_chip, 当前所在节点),返回下一跳节点 ID
pub fn next_hop(&self, src_chip: u32, dst_chip: u32, current_node: &str) -> Option<String> {
let path = self.routes.get(&(src_chip, dst_chip))?;
let pos = path.iter().position(|n| n == current_node)?;
if pos + 1 < path.len() {
Some(path[pos + 1].clone())
} else {
None // 已到终点
}
}

/// 获取完整路径
pub fn get_path(&self, src_chip: u32, dst_chip: u32) -> Option<&Vec<String>> {
self.routes.get(&(src_chip, dst_chip))
}

/// 判断节点是否是 switch
pub fn is_switch(&self, node_id: &str) -> bool {
matches!(self.node_types.get(node_id), Some(NodeType::Switch))
}

/// 获取节点对应的 chip_id (仅 chip 节点)
pub fn chip_id_of(&self, node_name: &str) -> Option<u32> {
match self.node_types.get(node_name) {
Some(NodeType::Chip { chip_id }) => Some(*chip_id),
_ => None,
}
}

/// chip_id -> node_name
pub fn chip_name(&self, chip_id: u32) -> Option<&str> {
self.chip_name_to_id.iter()
.find(|(_, &id)| id == chip_id)
.map(|(name, _)| name.as_str())
}

/// 获取所有 switch 节点 ID
pub fn switch_ids(&self) -> Vec<String> {
self.node_types.iter()
.filter(|(_, t)| matches!(t, NodeType::Switch))
.map(|(id, _)| id.clone())
.collect()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_direct_chip_routing() {
let nodes = vec![
("c0".into(), "chip".into(), Some(0u32)),
("c1".into(), "chip".into(), Some(1)),
];
let edges = vec![
("c0".into(), "c1".into(), 448.0, 0.2),
];
let routes = vec![
(0, 1, vec!["c0".into(), "c1".into()]),
(1, 0, vec!["c1".into(), "c0".into()]),
];
let mut net = C2CNetwork::build(nodes, edges, routes);

let next = net.next_hop(0, 1, "c0").unwrap();
assert_eq!(next, "c1");

let result = net.transmit("c0", "c1", 1000, 0.0).unwrap();
assert!(result.arrive_ns > 0.0);
}

#[test]
fn test_via_switch_routing() {
let nodes = vec![
("c0".into(), "chip".into(), Some(0u32)),
("s0".into(), "switch".into(), None),
("c1".into(), "chip".into(), Some(1)),
];
let edges = vec![
("c0".into(), "s0".into(), 400.0, 1.0),
("s0".into(), "c1".into(), 400.0, 1.0),
];
let routes = vec![
(0, 1, vec!["c0".into(), "s0".into(), "c1".into()]),
];
let net = C2CNetwork::build(nodes, edges, routes);

assert!(!net.is_switch("c0"));
assert!(net.is_switch("s0"));

let hop1 = net.next_hop(0, 1, "c0").unwrap();
assert_eq!(hop1, "s0");
let hop2 = net.next_hop(0, 1, "s0").unwrap();
assert_eq!(hop2, "c1");
}

#[test]
fn test_link_contention() {
let nodes = vec![
("c0".into(), "chip".into(), Some(0u32)),
("c1".into(), "chip".into(), Some(1)),
];
let edges = vec![("c0".into(), "c1".into(), 448.0, 0.2)];
let routes = vec![(0, 1, vec!["c0".into(), "c1".into()])];
let mut net = C2CNetwork::build(nodes, edges, routes);

let r1 = net.transmit("c0", "c1", 1000, 0.0).unwrap();
let r2 = net.transmit("c0", "c1", 1000, 0.0).unwrap();
assert!(r2.arrive_ns > r1.arrive_ns, "second transfer should see contention");
}
}
  • Step 2: 编译测试
cargo test c2c_network -- --nocapture

Expected: 3 PASS(如果有 cle 引用错误,暂时注释掉引用方的 use cle

  • Step 3: Commit
git add perfmodel/evaluation/g5/src/tier6/c2c_network.rs
git commit -m "refactor(g5): rewrite C2CNetwork to use pre-computed routes with switch support"

Task 7: Rust — input.rs 新增路由数据提取

Files:

  • Modify: perfmodel/evaluation/g5/src/input.rs

  • Step 1: 在 input.rs 底部新增路由数据提取结构和函数

// ---------------------------------------------------------------------------
// 路由数据提取 (从 Python NetworkGraph + RoutingTable)
// ---------------------------------------------------------------------------

/// 从 Python 提取的路由数据 (一次性, 仿真启动前)
pub struct ExtractedRoutingData {
/// (id, type_str "chip"/"switch", chip_id for chip nodes)
pub nodes: Vec<(String, String, Option<u32>)>,
/// (src_id, dst_id, bandwidth_gbps, latency_us)
pub edges: Vec<(String, String, f64, f64)>,
/// (src_chip_id, dst_chip_id, path_node_ids)
pub routes: Vec<(u32, u32, Vec<String>)>,
/// switch configs: (switch_id, port_count, forwarding_latency_ns)
pub switch_configs: Vec<(String, u32, f64)>,
}

/// 从 Python NetworkGraph + RoutingTable 提取路由数据
pub fn extract_routing_data(
network_graph: &Bound<'_, PyAny>,
routing_table: &Bound<'_, PyAny>,
) -> PyResult<ExtractedRoutingData> {
// --- 提取 nodes ---
let py_nodes = network_graph.getattr("nodes")?; // dict[str, NodeSpec]
let mut nodes = Vec::new();
let mut switch_configs = Vec::new();

for item in py_nodes.call_method0("values")?.try_iter()? {
let node = item?;
let id: String = node.getattr("id")?.extract()?;
let node_type: String = node.getattr("type")?.extract()?;

let chip_id: Option<u32> = if node_type == "chip" {
// 从 "c0", "c1" 等提取数字
id.trim_start_matches('c').parse::<u32>().ok()
} else {
None
};

if node_type == "switch" {
let port_count: u32 = node.getattr("port_count")?
.extract::<Option<u32>>()?.unwrap_or(32);
let fwd_lat_us: f64 = node.getattr("forwarding_latency_us")?.extract()?;
switch_configs.push((id.clone(), port_count, fwd_lat_us * 1000.0));
}

nodes.push((id, node_type, chip_id));
}

// --- 提取 edges ---
let py_edges = network_graph.getattr("edges")?; // dict[str, EdgeSpec]
let mut edges = Vec::new();

for item in py_edges.call_method0("values")?.try_iter()? {
let edge = item?;
let src: String = edge.getattr("src")?.extract()?;
let dst: String = edge.getattr("dst")?.extract()?;
let bw: f64 = edge.getattr("bandwidth_gbps")?.extract()?;
let lat: f64 = edge.getattr("latency_us")?.extract()?;
edges.push((src, dst, bw, lat));
}

// --- 提取路由 ---
let py_table = routing_table.getattr("_table")?; // dict[(str,str), PairCommSpec]
let mut routes = Vec::new();
let mut seen = std::collections::HashSet::new();

for item in py_table.call_method0("items")?.try_iter()? {
let pair = item?;
let key = pair.get_item(0)?;
let spec = pair.get_item(1)?;

let src_name: String = key.get_item(0)?.extract()?;
let dst_name: String = key.get_item(1)?.extract()?;

// 跳过 self-pair 和已处理的 pair
if src_name == dst_name {
continue;
}
let pair_key = if src_name < dst_name {
(src_name.clone(), dst_name.clone())
} else {
(dst_name.clone(), src_name.clone())
};
if seen.contains(&pair_key) {
// 反向 pair 也需要提取 (path_nodes 不同)
}

let path_nodes: Vec<String> = spec.getattr("path_nodes")?.extract()?;
if path_nodes.is_empty() {
continue;
}

// 从 node name 提取 chip_id
let src_id = src_name.trim_start_matches('c').parse::<u32>()
.map_err(|_| PyErr::new::<pyo3::exceptions::PyValueError, _>(
format!("Cannot parse chip_id from '{}'", src_name)
))?;
let dst_id = dst_name.trim_start_matches('c').parse::<u32>()
.map_err(|_| PyErr::new::<pyo3::exceptions::PyValueError, _>(
format!("Cannot parse chip_id from '{}'", dst_name)
))?;

routes.push((src_id, dst_id, path_nodes));
}

Ok(ExtractedRoutingData { nodes, edges, routes, switch_configs })
}
  • Step 2: 编译检查
cargo check 2>&1 | grep "extract_routing"

Expected: 无错误(函数定义正确,但尚未被调用)

  • Step 3: Commit
git add perfmodel/evaluation/g5/src/input.rs
git commit -m "feat(g5/input): add extract_routing_data for PyO3 route extraction"

Task 8: Rust — lib.rs 签名改造 + multi_chip.rs 集成

Files:

  • Modify: perfmodel/evaluation/g5/src/lib.rs:64-103
  • Modify: perfmodel/evaluation/g5/src/top/multi_chip.rs (大面积重写)
  • Modify: perfmodel/evaluation/g5/src/top/event_handlers.rs (重写关键 handler)
  • Modify: perfmodel/evaluation/g5/src/top/single_chip.rs (适配新 Event variant)

这是最大的 Task,包含核心集成逻辑。

  • Step 1: 修改 lib.rs simulate_multi_chip 签名
/// 多芯片仿真入口
#[pyfunction]
#[pyo3(signature = (chips_dict, program, network_graph, routing_table, cancel_check=None))]
fn simulate_multi_chip<'py>(
py: Python<'py>,
chips_dict: &Bound<'py, PyAny>,
program: &Bound<'py, PyAny>,
network_graph: &Bound<'py, PyAny>,
routing_table: &Bound<'py, PyAny>,
cancel_check: Option<&Bound<'py, PyAny>>,
) -> PyResult<(Bound<'py, PyList>, Bound<'py, PyDict>)> {
let chips = input::parse_chip_map(chips_dict)?;
let routing_data = input::extract_routing_data(network_graph, routing_table)?;
let multi_program = input::parse_multi_chip_program(program)?;

let cancel_flag = Arc::new(AtomicBool::new(false));
if let Some(cb) = cancel_check {
let result = cb.call0()?;
if result.is_truthy()? {
cancel_flag.store(true, Ordering::Relaxed);
}
}

let sim = top::multi_chip::MultiChipSim::from_routing_data(chips, routing_data);
let cancel_ref = cancel_flag.clone();
let result = py.allow_threads(move || sim.simulate(&multi_program, &cancel_ref));

let records_py = output::sim_records_to_pylist(py, &result.records)?;
let stats_py = output::stats_to_pydict(py, result.stats.dump())?;
Ok((records_py, stats_py))
}
  • Step 2: 重写 MultiChipSim 构造 + switches 字段

multi_chip.rs 中,替换 MultiChipSim 结构体和构造函数:

use crate::input::ExtractedRoutingData;
use crate::tier6::switch::{SwitchModel, SwitchConfig as SwitchModelConfig};

pub struct MultiChipSim {
chips: HashMap<u32, ChipSpec>,
routing_data: ExtractedRoutingData,
}

impl MultiChipSim {
pub fn from_routing_data(
chips: HashMap<u32, ChipSpec>,
routing_data: ExtractedRoutingData,
) -> Self {
Self { chips, routing_data }
}

删除旧的 TopologyInput 结构体、MultiChipSim::new()resolve_links()resolve_link_for_pair()

  • Step 3: 重写 simulate() 中的网络创建和 switch 实例化

simulate() 方法中,替换 create_paxiscreate_c2c_network:

    pub fn simulate(&self, program: &MultiChipProgram, cancel_flag: &AtomicBool) -> MultiSimResult {
// ... kernel, cores, cdma_units 初始化同前 ...

// 从路由数据构建 C2CNetwork
let mut c2c_net = C2CNetwork::build(
self.routing_data.nodes.clone(),
self.routing_data.edges.clone(),
self.routing_data.routes.clone(),
);

// 创建 SwitchModel 实例
let mut switches: HashMap<String, SwitchModel> = HashMap::new();
for (sw_id, port_count, fwd_lat_ns) in &self.routing_data.switch_configs {
let config = SwitchModelConfig {
port_count: *port_count,
forwarding_latency_ns: *fwd_lat_ns,
..SwitchModelConfig::default()
};
switches.insert(sw_id.clone(), SwitchModel::new(config));
}

// PAXI 桥
let mut paxis = self.create_paxis(&chip_ids);

// 事件驱动主循环 ...
  • Step 4: 重写事件循环中的 C2CLinkDone / SwitchTick 分派

simulate() 的 match event 块中:

                Event::RcPackDone { chip_id, slot_idx, dst, wire_bytes, qp_id } => {
handle_rc_pack_done(
chip_id, slot_idx, dst, wire_bytes, qp_id, now,
&mut paxis, &mut c2c_net, &mut kernel,
);
}

Event::C2CLinkDone { src_chip, arrived_at_node, final_dst,
slot_idx, psn, payload_bytes, txn_id, qp_id, vc_id } => {
if c2c_net.is_switch(&arrived_at_node) {
// 包到达 switch: 入 VOQ,安排 SwitchTick
handle_switch_ingress(
src_chip, &arrived_at_node, final_dst,
slot_idx, psn, payload_bytes, txn_id, qp_id, vc_id, now,
&mut switches, &mut c2c_net, &mut kernel,
);
} else {
let arrived_chip = c2c_net.chip_id_of(&arrived_at_node).unwrap_or(final_dst);
if arrived_chip == final_dst {
// 到达最终目的地
handle_c2c_done(
src_chip, arrived_chip, slot_idx, psn,
payload_bytes, txn_id, qp_id, vc_id, now,
&mut paxis, &mut c2c_net, &mut kernel,
);
} else {
// 中继 chip: 继续转发
handle_chip_relay(
src_chip, arrived_chip, final_dst,
slot_idx, psn, payload_bytes, txn_id, qp_id, vc_id, now,
&mut c2c_net, &mut kernel,
);
}
}
}

Event::SwitchTick { ref switch_id } => {
handle_switch_tick(
switch_id, now,
&mut switches, &mut c2c_net, &mut kernel,
);
}
  • Step 5: 实现新的 event handler 函数

event_handlers.rs 中新增:

/// 包到达 switch: 入 VOQ, 安排 SwitchTick
pub fn handle_switch_ingress(
src_chip: u32, switch_id: &str, final_dst: u32,
slot_idx: u16, psn: u16, payload_bytes: u32, txn_id: u64, qp_id: u16, vc_id: u32, now: f64,
switches: &mut HashMap<String, SwitchModel>,
c2c_net: &mut C2CNetwork,
kernel: &mut SimKernel,
) {
let switch = match switches.get_mut(switch_id) {
Some(sw) => sw,
None => return,
};

// 确定 ingress/egress 端口 (简化: 用 src_chip 和 final_dst 的 hash 映射)
let ingress_port = src_chip % switch.config().port_count;
let next = c2c_net.next_hop(src_chip, final_dst, switch_id);
let egress_port = if let Some(ref next_node) = next {
c2c_net.chip_id_of(next_node).unwrap_or(0) % switch.config().port_count
} else {
0
};

let op_id = format!("{}_{}_{}_{}", src_chip, final_dst, psn, txn_id);
let admitted = switch.forward(ingress_port, egress_port, payload_bytes as u64, op_id, now);

if admitted {
// 安排 SwitchTick (如果 switch 当前没有 pending tick)
kernel.schedule_at(
(now + switch.time_slot_ns()) as u64,
Event::SwitchTick { switch_id: switch_id.to_string() },
);
}
// 丢包时不做处理 (Go-Back-N 重传会恢复)
}

/// SwitchTick: 执行 iSLIP 调度,转发出队的包
pub fn handle_switch_tick(
switch_id: &str, now: f64,
switches: &mut HashMap<String, SwitchModel>,
c2c_net: &mut C2CNetwork,
kernel: &mut SimKernel,
) {
let switch = match switches.get_mut(switch_id) {
Some(sw) => sw,
None => return,
};

let results = switch.tick(now);

for result in results {
// 从 op_id 解析 src_chip 和 final_dst
let parts: Vec<&str> = result.op_id.split('_').collect();
if parts.len() < 4 { continue; }
let src_chip: u32 = parts[0].parse().unwrap_or(0);
let final_dst: u32 = parts[1].parse().unwrap_or(0);
let psn: u16 = parts[2].parse().unwrap_or(0);
let txn_id: u64 = parts[3].parse().unwrap_or(0);

// 从 switch 发往下一跳
let next = match c2c_net.next_hop(src_chip, final_dst, switch_id) {
Some(n) => n,
None => continue,
};

if let Some(tx_result) = c2c_net.transmit(switch_id, &next, result.data_bytes, result.finish_ns) {
kernel.schedule_at(tx_result.arrive_ns as u64, Event::C2CLinkDone {
src_chip,
arrived_at_node: next,
final_dst,
slot_idx: 0, // switch 不保存 slot_idx
psn,
payload_bytes: result.data_bytes as u32,
txn_id,
qp_id: 0,
vc_id: 0,
});
}
}

// 还有 pending 包就继续安排 tick
if switch.has_pending() {
kernel.schedule_at(
(now + switch.time_slot_ns()) as u64,
Event::SwitchTick { switch_id: switch_id.to_string() },
);
}
}

/// Chip relay: 包到达中间 chip, 继续转发
pub fn handle_chip_relay(
src_chip: u32, relay_chip: u32, final_dst: u32,
slot_idx: u16, psn: u16, payload_bytes: u32, txn_id: u64, qp_id: u16, vc_id: u32, now: f64,
c2c_net: &mut C2CNetwork,
kernel: &mut SimKernel,
) {
let relay_name = match c2c_net.chip_name(relay_chip) {
Some(n) => n.to_string(),
None => return,
};
let next = match c2c_net.next_hop(src_chip, final_dst, &relay_name) {
Some(n) => n,
None => return,
};
let wire_bytes = payload_bytes as u64 + super::multi_chip::ACK_WIRE_BYTES;
if let Some(result) = c2c_net.transmit(&relay_name, &next, wire_bytes, now) {
kernel.schedule_at(result.arrive_ns as u64, Event::C2CLinkDone {
src_chip,
arrived_at_node: next,
final_dst,
slot_idx, psn, payload_bytes, txn_id, qp_id, vc_id,
});
}
}
  • Step 6: 修改 handle_rc_pack_done 使用新的路由
pub fn handle_rc_pack_done(
chip_id: u32, slot_idx: u16, dst: u32, wire_bytes: u64, qp_id: u16, now: f64,
paxis: &mut HashMap<u32, PAXIBridge>,
c2c_net: &mut C2CNetwork,
kernel: &mut SimKernel,
) {
let (psn, payload_bytes, txn_id, vc_id) = {
let paxi = &paxis[&chip_id];
let slot = paxi.tx.get_slot(slot_idx);
(slot.psn, slot.payload_bytes, slot.txn_id, slot.vc_id)
};

// 查路由: 找到 src chip 的下一跳
let src_name = match c2c_net.chip_name(chip_id) {
Some(n) => n.to_string(),
None => return,
};
let next = match c2c_net.next_hop(chip_id, dst, &src_name) {
Some(n) => n,
None => return,
};

if let Some(result) = c2c_net.transmit(&src_name, &next, wire_bytes, now) {
kernel.schedule_at(result.arrive_ns as u64, Event::C2CLinkDone {
src_chip: chip_id,
arrived_at_node: next,
final_dst: dst,
slot_idx, psn, payload_bytes, txn_id, qp_id, vc_id,
});
}

// 释放 TX datapath
if let Some(paxi) = paxis.get_mut(&chip_id) {
if let Some(evt) = paxi.on_tx_done(now) {
kernel.schedule_at(evt.0 as u64, evt.1);
}
if paxi.tx.should_arm_timeout(dst, qp_id) {
let timeout_ns = now + paxi.tx.retry_timeout_ns();
let timer_id = kernel.schedule_timer(timeout_ns as u64, Event::RetryTimeout {
chip_id, dst, qp_id,
});
paxi.tx.arm_timeout(dst, qp_id, timer_id);
}
}
}
  • Step 7: 修改 handle_c2c_done 中 ACK/Credit 反向传输

ACK/NAK 和 CreditReturn 需要用 node name 而非 chip_id 传输:

pub fn handle_c2c_done(
src_chip: u32, dst_chip: u32, _slot_idx: u16,
psn: u16, payload_bytes: u32, txn_id: u64, qp_id: u16, vc_id: u32, now: f64,
paxis: &mut HashMap<u32, PAXIBridge>,
c2c_net: &mut C2CNetwork,
kernel: &mut SimKernel,
) {
let rx_result = match paxis.get_mut(&dst_chip) {
Some(paxi) => paxi.receive_packet(src_chip, qp_id, psn, payload_bytes, vc_id, txn_id, now),
None => return,
};

let dst_name = c2c_net.chip_name(dst_chip).unwrap_or("").to_string();
let src_name = c2c_net.chip_name(src_chip).unwrap_or("").to_string();

for (t, evt) in rx_result.events {
match &evt {
Event::AckArrived { .. } | Event::CreditReturn { .. } => {
// ACK 通过反向链路传回 (简化: 直接用物理链路)
let arrive = if let Some(r) = c2c_net.transmit(&dst_name, &src_name, ACK_WIRE_BYTES, t) {
r.arrive_ns as u64
} else {
t as u64
};
kernel.schedule_at(arrive, evt);
}
_ => {
kernel.schedule_at(t as u64, evt);
}
}
}
}
  • Step 8: 适配 single_chip.rs

single_chip.rs 中,将引用旧 SwitchDoneSwitchTick 的 match arm 更新为新签名:

                | Event::SwitchTick { .. }
| Event::C2CLinkDone { .. }
  • Step 9: 给 SwitchModel 添加 config() 和 time_slot_ns() 公开方法

switch.rs 中:

    pub fn config(&self) -> &SwitchConfig {
&self.config
}

pub fn time_slot_ns(&self) -> f64 {
self.time_slot_ns
}
  • Step 10: 编译通过
cargo check

Expected: 无错误

  • Step 11: 运行所有 Rust 测试
cargo test

Expected: 全部 PASS

  • Step 12: Commit
git add perfmodel/evaluation/g5/src/
git commit -m "feat(g5): integrate RoutingTable + SwitchModel into multi-chip simulation"

Task 9: Python — sim_engine.py + pipeline.py 适配新签名

Files:

  • Modify: perfmodel/evaluation/g5/sim_engine.py

  • Modify: perfmodel/evaluation/g5/pipeline.py

  • Step 1: 修改 sim_engine.py

"""事件驱动仿真调度器 (G5 指令级仿真模式)

仿真内核由 g5_rs (Rust) 提供,通过 PyO3 调用。
外部 API (G5SimEngine.simulate) 保持不变。

同步机制 (4 引擎):
tiu_sync_id: TIU 完成时写入 cmd_id
tdma_sync_id: DMA 完成时写入 cmd_id
sdma_sync_id: SDMA 完成时写入 cmd_id
hau_sync_id: HAU 完成时写入 cmd_id

依赖解析:
TIU: cmd.cmd_id_dep <= tdma_sync_id (固定依赖 DMA)
DMA: cmd.cmd_id_dep <= tiu_sync_id (固定依赖 TIU)
SDMA: cmd.dep_engine 指定依赖引擎 ("tiu"/"hau"/"sdma"/"tdma")
HAU: cmd.dep_engine 指定依赖引擎 ("tiu"/"tdma")
"""

from __future__ import annotations

from typing import Any, Callable

import g5_rs as _g5_rs

from perfmodel.arch.chip import ChipSpecImpl
from perfmodel.arch.topo_routing.graph import NetworkGraph
from perfmodel.arch.topo_routing.routing_table import RoutingTable
from perfmodel.mapping.g5.program import CoreProgram


class G5SimEngine:
"""G5 事件驱动仿真引擎 (Rust 后端)

支持单核和多核仿真。
"""

def __init__(self, chip: ChipSpecImpl) -> None:
self._chip = chip
self._last_stats: dict[str, Any] = {}

def simulate(
self,
program: CoreProgram,
cancel_check: Callable[[], bool] | None = None,
) -> list[_g5_rs.SimRecord]:
"""单芯片仿真

Args:
program: 多核程序
cancel_check: 取消检查函数

Returns:
SimRecord 列表
"""
self._last_stats = {}

if not program.cores:
return []

records, stats = _g5_rs.simulate_single_chip(self._chip, program, cancel_check)
self._last_stats = dict(stats)
return list(records)

def simulate_multi_chip(
self,
chips: dict[int, ChipSpecImpl],
program: CoreProgram,
network_graph: NetworkGraph,
routing_table: RoutingTable,
cancel_check: Callable[[], bool] | None = None,
) -> list[_g5_rs.SimRecord]:
"""多芯片仿真

Args:
chips: chip_id -> ChipSpecImpl 映射
program: 多芯片程序
network_graph: 拓扑图 (含 chip + switch 节点)
routing_table: 预计算路由表
cancel_check: 取消检查函数

Returns:
SimRecord 列表
"""
self._last_stats = {}

records, stats = _g5_rs.simulate_multi_chip(
chips, program, network_graph, routing_table, cancel_check,
)
self._last_stats = dict(stats)
return list(records)

def get_stats(self) -> dict[str, Any]:
"""获取最近一次仿真的统计数据"""
return dict(self._last_stats)
  • Step 2: 修改 pipeline.py 传入路由数据
"""G5 指令级仿真管线封装

DistributedModel -> G5InstructionEmitter -> G5SimEngine -> G5ResultAdapter -> EngineResult

将 L3.g5 和 L4.g5 组件串联为单一调用入口,供 engine.py 的 G5 路由使用。
"""

from __future__ import annotations

from typing import Any, Callable

from perfmodel.arch.chip import ChipSpecImpl
from perfmodel.arch.topo_routing.graph import NetworkGraph
from perfmodel.arch.topo_routing.routing_table import RoutingTable
from perfmodel.mapping.common.plan.distributed_model import DistributedModel
from perfmodel.mapping.g5.instruction_emitter import G5InstructionEmitter
from perfmodel.evaluation.common.metrics import EngineResult
from perfmodel.evaluation.g5.sim_engine import G5SimEngine
from perfmodel.evaluation.g5.adapter import G5ResultAdapter


def run_g5_pipeline(
dist_model: DistributedModel,
chip: ChipSpecImpl,
network_graph: NetworkGraph | None = None,
routing_table: RoutingTable | None = None,
progress_callback: Callable[[float], None] | None = None,
cancel_check: Callable[[], bool] | None = None,
) -> EngineResult:
"""G5 指令级仿真管线

Args:
dist_model: 分布式模型 (ParallelismPlanner 产出)
chip: 芯片规格
network_graph: 拓扑图 (多芯片仿真需要)
routing_table: 预计算路由表 (多芯片仿真需要)
progress_callback: 进度回调 (0.0 ~ 1.0)
cancel_check: 取消检查函数

Returns:
EngineResult
"""
# L3.g5: DistributedOp -> CoreProgram
emitter = G5InstructionEmitter(chip)
program = emitter.emit(dist_model.ops)

if progress_callback:
progress_callback(0.5)

# L4.g5: CoreProgram -> SimRecord[]
engine = G5SimEngine(chip)
records = engine.simulate(program, cancel_check=cancel_check)
stats = engine.get_stats()

if progress_callback:
progress_callback(0.8)

# L4.g5: SimRecord[] -> EngineResult
adapter = G5ResultAdapter(chip)
result = adapter.convert(records, stats=stats)

if progress_callback:
progress_callback(1.0)

return result
  • Step 3: Commit
git add perfmodel/evaluation/g5/sim_engine.py perfmodel/evaluation/g5/pipeline.py
git commit -m "feat(g5): update Python API to pass NetworkGraph + RoutingTable to Rust"

Task 10: 端到端集成测试

Files:

  • Modify: docs/research/comm-benchmarks/scripts/test_rust_math_compare.py (或新建测试)

  • Step 1: 写端到端测试脚本

在已有的 test_rust_math_compare.py 同目录下新建:

# docs/research/comm-benchmarks/scripts/test_switch_integration.py
"""端到端: 通过 Clos 拓扑 (含 switch) 运行 G5 多芯片仿真"""

import sys
from pathlib import Path

PROJECT_ROOT = Path(__file__).resolve().parents[4]
sys.path.insert(0, str(PROJECT_ROOT))

from perfmodel.arch.chip import ChipSpecImpl
from perfmodel.arch.dma import CDMASpec
from perfmodel.arch.topo_routing.graph_builder import build_network_graph
from perfmodel.arch.topo_routing.routing_table import RoutingTable
from perfmodel.mapping.g5.program import CommOp, CoreProgram, MultiChipProgram
import g5_rs


def make_chips(n):
cdma = CDMASpec(
bandwidth_per_cdma_gbps=400.0,
startup_latency_ns=800.0,
efficiency=0.95,
dies_per_chip=1,
cdmas_per_die=1,
threads_per_cdma=8,
max_outstanding=128,
)
return {i: ChipSpecImpl(
name=f"chip_{i}", core_count=1,
frequency_ghz=1.0, tiu_frequency_ghz=1.0,
dma_engines={"cdma": cdma},
noc_config={"mesh_cols": 1, "mesh_rows": 1, "latency_ns": 50.0},
) for i in range(n)}


def make_clos_topo():
"""4 chips, 2 ToR, 1 Spine — 和 clos-2tier-4chip.yaml 同构"""
return {
"pods": [
{"count": 1, "racks": [
{"count": 1, "boards": [
{"count": 1, "chips": [{"name": "A", "count": 2}]},
]},
{"count": 1, "boards": [
{"count": 1, "chips": [{"name": "A", "count": 2}]},
]},
]},
],
"network": {
"switches": [
{"id": "s0", "type": "tor", "port_count": 32, "forwarding_latency_ns": 150},
{"id": "s1", "type": "tor", "port_count": 32, "forwarding_latency_ns": 150},
{"id": "s2", "type": "spine", "port_count": 64, "forwarding_latency_ns": 200},
],
"link_types": {
"c2c": {"bandwidth_gbps": 448, "latency_us": 0.2},
"b2b": {"bandwidth_gbps": 400, "latency_us": 1.0},
"tor2spine": {"bandwidth_gbps": 800, "latency_us": 2.0},
},
"links": [
{"from": "c0", "to": "c1", "type": "c2c"},
{"from": "c2", "to": "c3", "type": "c2c"},
{"from": ["c0", "c1"], "to": "s0", "type": "b2b"},
{"from": ["c2", "c3"], "to": "s1", "type": "b2b"},
{"from": ["s0", "s1"], "to": "s2", "type": "tor2spine"},
],
},
}


def test_clos_allreduce():
"""4-chip Clos 拓扑下的 AllReduce,验证仿真能正常跑完"""
chips = make_chips(4)
config = make_clos_topo()
graph = build_network_graph(config)
rt = RoutingTable.from_network_graph(graph)

# 验证路由表有 switch 路径
spec_02 = rt.get("c0", "c2")
assert spec_02.via_switch, "c0->c2 should go via switch"
print(f"c0->c2 path: {spec_02.path_nodes}")

program = MultiChipProgram(
chip_programs={i: CoreProgram(cores=[]) for i in range(4)},
comm_schedule=[CommOp(
op_id="ar", comm_type="allreduce",
participants=[0, 1, 2, 3], data_bytes=1024 * 1024,
algorithm="ring", source_op_id="bench",
)],
)

records, stats = g5_rs.simulate_multi_chip(chips, program, graph, rt)
print(f"[PASS] Clos AllReduce: {len(records)} records, "
f"total_time={stats.get('kernel.total_sim_time_ns', 0):.0f} ns")
assert len(records) > 0, "Should produce simulation records"


if __name__ == "__main__":
test_clos_allreduce()
print("[PASS] All switch integration tests passed")
  • Step 2: 运行端到端测试
cd C:/Users/DELL/Documents/code/Tier6-Model
python docs/research/comm-benchmarks/scripts/test_switch_integration.py

Expected: [PASS] All switch integration tests passed

  • Step 3: 运行原有对比测试确认无回归
python docs/research/comm-benchmarks/scripts/test_rust_math_compare.py

注意: 这个测试可能需要适配新的 simulate_multi_chip 签名(传入 graph + routing_table 代替 topology_dict)。如果失败,更新 make_topo() 为构建 NetworkGraph + RoutingTable 的方式。

  • Step 4: Commit
git add docs/research/comm-benchmarks/scripts/test_switch_integration.py
git commit -m "test(g5): end-to-end switch integration test with Clos topology"

Task 11: Cargo build 验证 + 全量回归

  • Step 1: Rust 全量编译和测试
cd perfmodel/evaluation/g5
cargo build --release
cargo test

Expected: 编译成功,所有 Rust 测试 PASS

  • Step 2: Python 全量测试
cd C:/Users/DELL/Documents/code/Tier6-Model
python -m pytest tests/ -v --timeout=60

Expected: 全部 PASS

  • Step 3: 最终 Commit
git add -A
git commit -m "chore: final verification pass for G5 switch routing integration"