跳转到主要内容

概述

pi0 是一个视觉-语言-动作模型。它把 PaliGemma 风格的图像和语言 prefix 与 Gemma action expert 结合起来。在 PhyAI 中,ws1 路径在单张 GPU 上运行完整的端到端推理循环:编码相机图像和任务文本,缓存 prefix,使用机器人 state 条件化 expert,并通过 flow matching 积分得到 action chunk。 本页描述单卡实现。PI0WS1Scheduler 不包含 tensor parallel、continuous batching 或 preemption。
pi0 和 pi0.5 的 robot state 进入模型方式不同。pi0 保留一个数值形式的 expert-side state token;pi0.5 则把离散化后的 state bins 放进语言 prompt。

架构

PhyAI 的 pi0 路径沿用其他模型集成的 。实现分布在配置、模型模块、runner 和 scheduler 中:
phyai/src/phyai/models/pi0
main_pi0.py
scheduler_ws1_pi0.py
model_runner_pi0.py
modeling_pi0.py
configuration_pi0.py
组件职责
PI0Entry注册 "pi0" engine plugin,构造 PI0Model,加载权重,并创建 scheduler
PI0Config保存 vision、text、expert、action chunk、tokenizer 和相机数量相关的几何配置
PI0Model持有 SigLIP/PaliGemma vision tower、PaliGemma text stack、Gemma expert stack、RoPE 以及 action/time heads
PI0VisionRunner运行 vision tower,并支持 CUDA graph capture
PI0LLMRunner运行 PaliGemma prefix pass,并把 prefix K/V 写入共享 cache
PI0ExpertRunner在每个 flow-matching step 中运行 expert 的 state/action pass
PI0WS1Scheduler在单张 GPU 上编排一次完整的 inference request

模型布局

PI0Model 由三组主要 stack 组成:
Stack默认形状备注
VisionSigLIP,27 层,224×224 图像,14×14 patch生成图像 tokens,并投影到 PaliGemma text width
TextPaliGemma/Gemma,18 层,hidden size 2048处理 image + language prefix,并写入 prefix K/V
ExpertGemma action expert,18 层,hidden size 1024处理一个 state token 和完整 action chunk
顶层配置默认值如下:
字段默认值含义
chunk_size50每次 engine step 返回的 action token 数
max_state_dim32padding 后的 robot state 宽度
max_action_dim32padding 后的 action 宽度
num_inference_steps10Flow-matching Euler steps
tokenizer_max_length48右 padding 后的 PaliGemma task prompt 长度
empty_cameras0num_images = 3 - empty_cameras;pi0 支持 2 或 3 路相机
模型用 params_dtype 控制 language 和 expert stacks 的参数精度。Vision tower 有独立的 vision_params_dtype,默认是 fp32,用于和参考实现对齐。只有当你明确希望 vision 以 bf16 执行时,才设置 PI0Args(vision_params_dtype=torch.bfloat16)

Request contract

PI0Request 是 scheduler 的 canonical input:
字段Shape备注
pixel_values(B, num_images, 3, image_size, image_size)已经 resize 和 normalize 的相机 tensor
input_ids(B, tokenizer_max_length) int64右 padding 后的 PaliGemma token ids
lang_lens(B,) int64每个样本真实的 task prompt 长度
state(B, max_state_dim)数值形式的 robot state,在进入 expert 前完成 padding
noise(B, chunk_size, max_action_dim)None可选的初始 action noise;为 None 时 scheduler 内部采样 Gaussian noise
B 可以是 [1, max_batch_size] 范围内任意值。Scheduler 会在内部把小 batch padding 到 max_batch_size,并在返回前切回 actual_B

Scheduler phases

一次 engine.step(request) 会映射到下面这些 scheduler phase:
Phase工作内容
pi0.vision_loop把相机 tensor 移到 vision dtype,并对真实 batch 中每个样本运行一次 PI0VisionRunner
pi0.lang_pack嵌入 language ids,然后把 image tokens 和 language tokens pack 到每个样本的 prefix buffer
pi0.llm_prefix_plan重置 static caches,并准备 ragged prefix attention metadata
pi0.llm_prefix_fwd运行 PaliGemma text stack,并把 prefix K/V 写入 KVCachePool
pi0.expert_plan准备覆盖 prefix + suffix slots 的 state/action expert attention metadata
pi0.expert_loop初始化或复制 action noise,并运行 flow-matching integration
pi0.expert_steppi0.expert_loop 内部的一次 expert velocity prediction 和 Euler update
Prefix tokens 每个 request 只缓存一次。随后 expert 会 attend 到:
state query  -> prefix + state
action query -> prefix + state + action chunk
这也是为什么 pi0 的 suffix length 是 1 + chunk_size:一个 state token 后面跟着 action tokens。

CUDA graphs

RuntimeConfig(use_cuda_graph=True) 时,pi0 runners 会在 scheduler.setup() 期间捕获 CUDA graph:
Runner捕获形状
PI0VisionRunner(num_images, 3, image_size, image_size)
PI0LLMRunner(max_batch_size * n_per_sample, text_hidden_size)
PI0ExpertRunner固定 max_batch_size 下的 statex_ttime buffers
scheduler.step() 期间,runners 会更新 static graph input buffers 并 replay 已捕获的 graph。Attention metadata 会通过 attention backend 的 capture-aware metadata buffers 在 captured region 外部完成 staging。
如果想在 Nsight Systems 里看到更展开的 trace,可以关闭 CUDA graph:
uv run python benchmark/bench_n_batch_ws1_pi0.py \
    --batch-sizes 4 \
    --no-cuda-graph

运行 pi0

1

准备权重

准备一份 HF-style pi0 PyTorch checkpoint 目录,里面包含 config.jsonmodel.safetensors 文件。随机权重 smoke test 可以省略 --checkpoint
2

构造 Engine

插件名是 "pi0"。Engine 负责 setup、可选权重加载、runner setup 和 CUDA graph capture。
import torch
from pathlib import Path

from phyai.engine import Engine, EngineArgs
from phyai.engine_config import DeviceConfig, EngineConfig, RuntimeConfig
from phyai.models.pi0.main_pi0 import PI0Args

engine = Engine(
    EngineArgs(
        plugin="pi0",
        plugin_args=PI0Args(
            checkpoint_dir=Path("/path/to/pi0_pytorch"),
            max_batch_size=4,
            vision_params_dtype=torch.float32,
        ),
        config=EngineConfig(
            device=DeviceConfig(target="cuda", params_dtype=torch.bfloat16),
            runtime=RuntimeConfig(use_cuda_graph=True),
        ),
    )
)
max_batch_size 会固定 captured graph 的形状。如果需要不同的最大 batch,需要重建 engine。
3

构造请求

使用 PI0Processor 把原始机器人观测转换为模型需要的 tensor。Processor 位于 engine 之外,属于 phyai-utils-tools
from phyai.models.pi0.scheduler_ws1_pi0 import PI0Request
from phyai_utils_tools.models.pi0 import PI0Processor

processor = PI0Processor(
    image_size=224,
    num_channels=3,
    num_images=3,
    tokenizer_max_length=48,
    max_state_dim=32,
    action_dim=7,
    device="cuda",
    params_dtype=torch.bfloat16,
)

processed = processor.preprocess(
    {
        "images": [cam0, cam1, cam2],
        "task": ["pick up the object"],
        "state": state,
    }
)

request = PI0Request(
    pixel_values=processed.pixel_values,
    input_ids=processed.input_ids,
    lang_lens=processed.lang_lens,
    state=processed.state,
)
4

运行一步

actions = engine.step(request)  # (B, chunk_size, max_action_dim)
如果构造 processor 时传入了 action_dim,可以调用 processor.postprocess(actions) 裁掉 padding 的 action 宽度;当存在 dataset stats 时,它也会 unnormalize actions。
5

关闭 Engine

engine.close()

端到端示例

examples/pi0/run_pi0.py 同时覆盖 raw request 路径和 processor-backed request 路径:
uv run python examples/pi0/run_pi0.py \
    --checkpoint /path/to/pi0_pytorch \
    --batch-size 1
随机权重 smoke test 可以省略 --checkpoint
uv run python examples/pi0/run_pi0.py --raw --batch-size 1
当 checkpoint 使用一个 empty camera 时,传入 --num-images 2
uv run python examples/pi0/run_pi0.py \
    --checkpoint /path/to/pi0_pytorch \
    --num-images 2

Benchmarking 和 profiling

benchmark/bench_n_batch_ws1_pi0.py 可以扫 batch size,也可以为 Nsight Systems 打开一个短 profile window:
uv run python benchmark/bench_n_batch_ws1_pi0.py \
    --batch-sizes 1 2 4 \
    --n-warmup 5 \
    --n-timed 30 \
    --result-file ./pi0_ws1_results.jsonl
Nsight Systems capture:
nsys profile \
    --capture-range=cudaProfilerApi \
    --capture-range-end=stop \
    -o ./prof/pi0_ws1 \
    uv run python benchmark/bench_n_batch_ws1_pi0.py \
        --batch-sizes 4 \
        --profile-backend nsys \
        --profile-start-step 5 \
        --profile-num-steps 3
只有当你明确想测 bf16 vision latency 时,才设置 --vision-dtype bfloat16。默认会让 vision tower 保持 fp32。

当前限制

  • 这条路径只支持单卡。
  • max_batch_size 在 engine 构造时固定。
  • Vision tower 会对真实 batch 中每个样本 replay 一次。
  • Scheduler 期望输入已经完成 preprocess。图像 resize、tokenization、state padding 和 action unnormalize 属于 PI0Processor 的职责。
  • CUDA graph capture 的形状是固定的。修改相机数量、图像尺寸、tokenizer 长度或最大 batch 后,需要重建 engine。

完整代码

from pathlib import Path

import torch

from phyai.engine import Engine, EngineArgs
from phyai.engine_config import DeviceConfig, EngineConfig, RuntimeConfig
from phyai.models.pi0.configuration_pi0 import PI0Config
from phyai.models.pi0.main_pi0 import PI0Args
from phyai.models.pi0.scheduler_ws1_pi0 import PI0Request
from phyai.utils import load_config

CHECKPOINT_DIR = Path("/path/to/pi0_pytorch")
BATCH_SIZE = 1

cfg = load_config(CHECKPOINT_DIR, PI0Config)
device = torch.device("cuda")
dtype = torch.bfloat16

engine = Engine(
    EngineArgs(
        plugin="pi0",
        plugin_args=PI0Args(
            checkpoint_dir=CHECKPOINT_DIR,
            max_batch_size=BATCH_SIZE,
            vision_params_dtype=torch.float32,
        ),
        config=EngineConfig(
            device=DeviceConfig(target="cuda", params_dtype=dtype),
            runtime=RuntimeConfig(use_cuda_graph=True),
        ),
    )
)

try:
    input_ids = torch.zeros(
        BATCH_SIZE, cfg.tokenizer_max_length, dtype=torch.int64, device=device
    )
    input_ids[:, 0] = 2

    request = PI0Request(
        pixel_values=torch.rand(
            BATCH_SIZE,
            cfg.num_images,
            cfg.vision.num_channels,
            cfg.vision.image_size,
            cfg.vision.image_size,
            dtype=torch.float32,
            device=device,
        ),
        input_ids=input_ids,
        lang_lens=torch.ones(BATCH_SIZE, dtype=torch.int64, device=device),
        state=torch.rand(BATCH_SIZE, cfg.max_state_dim, dtype=dtype, device=device),
    )

    actions = engine.step(request)
    print(f"action chunk shape={tuple(actions.shape)}, dtype={actions.dtype}")
finally:
    engine.close()