L4 Evaluation -- Math 路径:评估引擎
功能概述
L4 Math 路径基于代数代价模型做性能评估,负责:
- 基于
ExecPlan + HardwareSpec + TopologySpec做统一口径性能评估 - 按
Granularity (Chip/Core)切换评估精度 - 按 OpType 路由到子评估器 (Compute/Comm/Fallback)
- Step 级时延分解与瓶颈归因
- 输出 TTFT/TPOT/TPS/MFU/MBU 等聚合指标
- 为 L3/math TilingPlanner 提供精评估出口 (PreciseTileEvaluator)
不在范围: 不做切分/布局/调度,不新增通信节点。
模块清单
| 模块 | 职责 |
|---|---|
math/engine.py | EvaluationEngine (统一入口) |
common/metrics.py | HardwareSpec, StepMetrics, Aggregates, EngineResult |
math/cost_models/base.py | BaseCostModel |
math/cost_models/chip.py | ChipCostModel (Roofline) |
math/cost_models/core.py | CoreCostModel (多核并行) |
math/cost_models/comm_protocol.py | CommProtocolCostModel |
math/evaluators/compute.py | ComputeEvaluator |
math/evaluators/comm.py | CommEvaluator |
math/evaluators/precise/ | PreciseTileEvaluator (精评估) |
math/calibration.py | Calibration (校准) |
整体架构
ExecPlan + hardware dict (merge_specs 输出)
|
v
EvaluationEngine
|
+-------+----------+
v v v
OpTypeRouter CostModelRegistry Calibration
(类型路由) (粒度选择) (可选校准)
| | |
+-------+----------+
v
StepMetrics + Aggregates
|
v
EngineResult
EvaluationEngine
接口
def evaluate(
self,
exec_plan: ExecPlan,
distributed_model: DistributedModel,
hardware: dict[str, float],
granularity: Granularity = Granularity.CHIP,
calibration: CalibrationConfig | None = None,
output_tokens: int = 1,
prefill_ops: set[str] | None = None, # Prefill 阶段的 op_id 集合
deployment_config: dict[str, object] | None = None,
is_prefill: bool = True, # 影响 TTFT/TPOT 的计算
) -> EngineResult
计算流程
- 口径校验: 检查 timeline 非空、hardware 字段完整
- 模型选择: 按 granularity 获取 CostModel (Chip/Core)
- 遍历 timeline: 对每个 event:
- 获取 Op 定义 (from DistributedModel.op_map)
- 类型路由: op_type -> Evaluator (compute/comm/fallback)
- Step 估时: t_compute, t_comm, t_wait
- 记录 bottleneck (COMPUTE_BOUND / BW_BOUND / LATENCY_BOUND)
- MoE TBO 重叠:
enable_tbo=True时,MoE dispatch/combine 与相邻计算重叠 - Ring Attention 重叠:
enable_ring_attention=True且tp > 1时,Attention 计算与通信重叠 - 指标聚合: sum times -> TPS/TTFT/TPOT/MFU/MBU
三级 Overlap 模型
| 级别 | 位置 | 说明 |
|---|---|---|
| Tile 级 | PreciseTileEvaluator | compute vs DMA (compute_dma_overlap_rate) |
| Layer 级 | _apply_ring_attn_overlap | Attention 计算 vs 通信,需 tp > 1 |
| Model 级 | _apply_moe_compute_overlap | MoE dispatch/combine vs 相邻计算,需 enable_tbo=True |
代价模型
ChipCostModel
芯片当黑盒,Roofline 模型:
def estimate_compute(op_type, local_shape, hardware) -> float:
flops = estimate_flops(op_type, local_shape)
bytes = estimate_bytes(op_type, local_shape)
t_compute = flops / (compute_tflops * 1e12)
t_memory = bytes / (memory_bw_gbps * 1e9)
return max(t_compute, t_memory) # Roofline
def estimate_comm(comm_bytes, path_key, participants, hardware) -> float:
bw_gbps = hardware[f"{path_key}_bandwidth_gbps"] # 无默认值!
ring_factor = 2 * (N-1) / N
return comm_bytes * ring_factor / (bw_gbps * 1e9)
CoreCostModel
考虑多核并行与 SRAM 层级:
def estimate_compute(op_type, local_shape, hardware) -> float:
data_to_sram_ratio = total_bytes / total_sram_bytes
efficiency = 0.9 if ratio <= 1 else 0.7 if ratio <= 2 else 0.5
t_compute = flops / (tflops * 1e12 * efficiency)
t_memory = bytes / (memory_bw * 1e9)
return max(t_compute, t_memory)
CommProtocolSpec
通信协议参数(从 topology YAML comm_params 加载):
@dataclass
class CommProtocolSpec:
sync_lat_us: float = 0.0 # 同步延迟
bw_utilization: float = 0.95 # 带宽利用率
cpu_fetch_delay_us: float = 0.0 # CPU 拉取延迟
moe_topk: float = 8.0 # MoE TopK 激活专家数
prefill_topk_factor: float = 8/128 # Prefill 阶段 TopK 因子
CommProtocolCostModel
精确通信协议建模(DS_TPU 口径):
class CommProtocolCostModel:
def allreduce(tp, comm_bytes, comm_protocol) -> (latency_us, comm_size):
"""分层 AllReduce:
- tp in {8, 16, 32}: 3 阶段 (板内 ring + 板间 + 板内广播)
- 其他: 标准 ring allreduce
"""
def allgather(tp, comm_bytes, comm_protocol) -> (latency_us, comm_size)
def reducescatter(tp, comm_bytes, comm_protocol) -> (latency_us, comm_size)
def dispatch(moe_tp, ep, comm_bytes, ...) -> (latency_us, comm_size)
def combine(moe_tp, ep, comm_bytes, ...) -> (latency_us, comm_size)
评估器
ComputeEvaluator
处理计算类 Op: matmul, softmax, layernorm, attention, embedding, lmhead
def evaluate(op_id, op_type, local_shape, attrs, hardware, cost_model) -> StepMetrics:
t_compute = cost_model.estimate_compute(op_type, local_shape, hardware)
# 若 TilingPlanner 已给出精确值,直接使用
if "t_compute_ms" in attrs:
t_compute = float(attrs["t_compute_ms"])
return StepMetrics(t_compute=t_compute, ...)
CommEvaluator
处理通信类 Op: allreduce, allgather, all2all, p2p
def evaluate(...) -> StepMetrics:
intra_bw = hardware["c2c_bandwidth_gbps"] * 1e9
inter_bw = hardware["b2b_bandwidth_gbps"] * 1e9
if comm_type == "allreduce":
latency_us, comm_size = model.allreduce(tp, comm_bytes, protocol)
elif comm_type == "all2all":
if "dispatch" in reason:
latency_us, comm_size = model.dispatch(...)
elif "combine" in reason:
latency_us, comm_size = model.combine(...)
PreciseTileEvaluator
为 L3/math TilingPlanner 提供精确评估:
class PreciseTileEvaluator:
def evaluate_tile(op, tile_config, chip) -> dict:
"""计算精确 traffic/urate/执行时间
- MatMul: 枚举 loop-order (mnk/nkm/mkn), 最小 traffic
- Attention: FlashAttention-2 风格 Q/K/V buffer
- Elementwise: memory-bound (2 * elements * dtype_bytes)
"""
StepMetrics 与 Aggregates
StepMetrics (per op)
@dataclass
class StepMetrics:
op_id: str
t_compute: float = 0.0 # 计算时间 (ms)
t_comm: float = 0.0 # 通信时间 (ms)
t_wait: float = 0.0 # 等待时间 (ms)
t_total: float = 0.0 # 总时间 (ms)
bottleneck_tag: BottleneckTag = UNKNOWN
flops: int = 0
bytes_read: int = 0
bytes_write: int = 0
meta: dict = {}
Aggregates (end-to-end)
@dataclass
class Aggregates:
ttft: float # Time To First Token (ms)
tpot: float # Time Per Output Token (ms)
tps: float # Tokens Per Second
mfu: float # Model FLOPS Utilization
mbu: float # Memory Bandwidth Utilization
memory_peak: int # 内存峰值 (bytes)
total_time: float # 总时间 (ms)
total_compute_time: float
total_comm_time: float
total_wait_time: float
total_flops: int
total_bytes: int
num_steps: int
bottleneck_summary: dict[str, int]
聚合公式
# MFU
peak_flops = hardware["compute_tflops"] * 1e12
achieved_flops = total_flops / (total_time / 1000)
mfu = achieved_flops / peak_flops
# MBU
peak_bw = hardware["memory_bandwidth_gbps"] * 1e9
achieved_bw = total_bytes / (total_time / 1000)
mbu = achieved_bw / peak_bw
# TPS = output_tokens / decode_time_s
# TTFT = prefill_time (ms)
# TPOT = decode_time / output_tokens (ms)
无默认值规则
所有模块从 hardware dict 读取参数时禁止使用默认值:
# [FAIL]
bw_gbps = hardware.get("c2c_bandwidth_gbps", 400.0)
# [PASS] 缺失时 KeyError,明确报错
bw_gbps = hardware["c2c_bandwidth_gbps"]
Calibration (可选)
@dataclass
class CalibrationConfig:
effective_bw_factor: float = 1.0 # 有效带宽系数 (0-1)
congestion_factor: float = 1.0 # 拥塞系数 (>=1)
startup_overhead_ms: float = 0.0 # 启动开销 (ms)
overlap_efficiency: float = 1.0 # 重叠效率 (0-1)
compute_efficiency: float = 1.0 # 计算效率 (0-1)
修正公式:
t_compute = t_compute / compute_efficiency
t_comm = t_comm * congestion_factor / effective_bw_factor + startup_overhead_ms