跳转到主要内容

概述

PI05Processor 位于 phyai_utils_tools.models.pi05。它负责把真实机器人侧的数据转换成 PI05Request 需要的 canonical tensor,也负责把模型输出的 action chunk 转回数据集真实 action 维度。 PhyAI 的 pi0.5 scheduler 不做图像 resize、文本 tokenization、state 离散化或 action unnormalize。这些逻辑都在 processor 里完成:
阶段输入输出
preprocessimagestaskstatePI05ProcessedInputs(pixel_values, input_ids, lang_lens)
engine.stepPI05Request(B, chunk_size, max_action_dim)
postprocessraw action chunk(B, chunk_size, action_dim)
pi05_base 的公开 checkpoint 里 normalizer 的 features 为空,所以默认 state/action normalize 是 no-op。若你的 lerobot checkpoint 带有 dataset stats,from_pretrained 会加载这些 stats sidecar,并在 pre/postprocess 中使用。

输入规范

preprocess 接收一个 transition dict。常用字段如下:
字段类型备注
imageslist[torch.Tensor]torch.Tensor每路相机是 (B, C, H, W);也可以传 stacked (B, num_images, C, H, W)
tasklist[str]str每个 batch 样本一条任务文本
statetorch.Tensor(B, state_dim),pi0.5 prompt 使用 [-1, 1] 范围内的状态值
输出的 PI05ProcessedInputs 字段可以直接映射到 PI05Request
字段Shape备注
pixel_values(B, num_images, C, image_size, image_size)默认 num_images=3C=3image_size=224
input_ids(B, tokenizer_max_length) int64默认 tokenizer 是 google/paligemma-3b-pt-224,右侧 padding
lang_lens(B,) int64每条 prompt 的真实 token 长度
图像会先按比例 resize 到正方形并 padding,再 stack 成 scheduler 需要的 (B, num_images, C, H, W)normalize_pixels=True 时,processor 会把 [0, 1] 像素映射到 [-1, 1]

从 checkpoint 构造

如果你的 checkpoint 目录里有 lerobot 格式的 policy_preprocessor.jsonpolicy_postprocessor.json,优先用 from_pretrained。这条路径会保留 checkpoint 里记录的 processor step、normalizer 配置和 stats sidecar,同时补上 PhyAI 推理侧需要的 vision resize 和 action slice。
from pathlib import Path

import torch

from phyai_utils_tools.models.pi05 import PI05Processor

processor = PI05Processor.from_pretrained(
    Path("/path/to/pi05_base"),
    image_size=224,
    num_channels=3,
    num_images=3,
    action_dim=7,
    device="cuda",
    params_dtype=torch.bfloat16,
)
这条构造路径的行为:
  • 加载 policy_preprocessor.jsonpolicy_postprocessor.json
  • 给 tokenizer step 注入 HuggingFace tokenizer 对象。
  • 把 preprocess 的 device_processor 指向 device,让模型输入落在推理设备上。
  • postprocess 默认按 checkpoint 配置返回 CPU tensor;pi05_base 的 postprocessor 就是这样配置的。
  • 在 loaded preprocessor 前追加 resize / optional pixel normalize。
  • 在 loaded postprocessor 后追加 SliceActionStep(action_dim=action_dim)

手工构造

没有 processor json,或者只想用 pi05_base 默认行为时,可以直接构造 PI05Processor
import torch

from phyai_utils_tools.models.pi05 import PI05Processor

processor = PI05Processor(
    image_size=224,
    num_channels=3,
    num_images=3,
    tokenizer_max_length=200,
    action_dim=7,
    device="cuda",
    params_dtype=torch.bfloat16,
)
手工构造的 preprocess pipeline 顺序是:
1

Resize cameras

ResizeWithPadStep 读取 images,校验相机数量和通道数,把每路相机 resize/pad 到 image_size × image_size
2

Normalize state

NormalizerStep 根据 dataset_statsPI05_NORM_MAP 处理 state。没有 stats 时是 no-op。
3

Build prompt

StateTokenizerPrepareStepstate 离散成 256 个 bins,并组装成 Task: <task>, State: <bins>;\nAction:
4

Tokenize

TokenizerStep 使用 PaliGemma tokenizer,把 prompt 编码成 input_idslang_lens
5

Move tensors

DeviceStep 把 tensor 移到 device,并把浮点 tensor 转成 params_dtype
postprocess pipeline 先做 action unnormalize,再按 action_dim 裁掉模型内部 padding 的 action 维度,最后把结果放回 CPU。

和 Engine 串起来

下面的例子展示 raw 相机、任务文本和 state 如何经过 PI05Processor 变成 PI05Request,再交给 Engine 推理。
from pathlib import Path

import torch

from phyai.engine import Engine, EngineArgs
from phyai.engine_config import DeviceConfig, EngineConfig, RuntimeConfig
from phyai.models.pi05.configuration_pi05 import PI05Config
from phyai.models.pi05.main_pi05 import PI05Args
from phyai.models.pi05.scheduler_ws1_pi05 import PI05Request
from phyai.utils import load_config
from phyai_utils_tools.models.pi05 import PI05Processor

checkpoint_dir = Path("/path/to/pi05_base")
cfg = load_config(checkpoint_dir, PI05Config)
device = torch.device("cuda")
dtype = torch.bfloat16
batch_size = 1
action_dim = 7

processor = PI05Processor.from_pretrained(
    checkpoint_dir,
    image_size=cfg.vision.image_size,
    num_channels=cfg.vision.num_channels,
    num_images=3,
    action_dim=action_dim,
    device=device,
    params_dtype=dtype,
)

engine = Engine(
    EngineArgs(
        plugin="pi05",
        plugin_args=PI05Args(
            checkpoint_dir=checkpoint_dir,
            max_batch_size=batch_size,
        ),
        config=EngineConfig(
            device=DeviceConfig(target="cuda", params_dtype=dtype),
            runtime=RuntimeConfig(use_cuda_graph=True),
        ),
    )
)

try:
    raw = {
        "images": [
            torch.rand(batch_size, 3, 480, 640, device=device),
            torch.rand(batch_size, 3, 480, 640, device=device),
            torch.rand(batch_size, 3, 480, 640, device=device),
        ],
        "task": ["pick up the cup"],
        "state": torch.rand(batch_size, 7, device=device) * 2 - 1,
    }

    processed = processor.preprocess(raw)
    request = PI05Request(
        pixel_values=processed.pixel_values,
        input_ids=processed.input_ids,
        lang_lens=processed.lang_lens,
    )

    raw_actions = engine.step(request)
    actions = processor.postprocess(raw_actions)
    print(actions.shape)
finally:
    engine.close()
如果只想测 engine 本身的 latency,可以跳过 processor,直接构造已经 resize/tokenize 好的 PI05Requestexamples/pi05/run_pi05.py --raw 就是这条路径。

保存和加载

手工构造的 processor 可以保存成 lerobot 兼容的 json:
processor.save_pretrained("/tmp/pi05_processor")
保存结果包含:
文件内容
policy_preprocessor.jsonnormalizer、pi0.5 prompt step、tokenizer、device step
policy_postprocessor.jsonunnormalizer、device step
*.safetensors仅当 normalizer / unnormalizer 有 stats 时生成
PhyAI 侧的 vision resize、optional pixel normalize 和 action slice 不写进 json;它们在 PI05Processor.from_pretrained(...) 里根据构造参数重新补上。这和 lerobot 的边界一致:图像 resize 和 action 裁剪属于推理侧模型胶水,不属于 checkpoint json 的通用 processor core。

常见问题

images shape 不匹配

num_imagesnum_channels 必须和 processor 构造参数一致。默认 pi05_base 是 3 路 RGB 相机,所以 list 输入需要 3 个 (B, 3, H, W) tensor,stacked 输入需要 (B, 3, 3, H, W)

state 是否必须传

StateTokenizerPrepareStep 支持没有 state 的路径;此时 prompt 只包含任务文本,不包含 state bins。但 pi0.5 的常规机器人推理路径应传入 proprioceptive state。

action 输出为什么回到 CPU

PI05Processor.from_pretrained 不会覆盖 checkpoint postprocessor 里的 device_processorpi05_base 的 postprocessor 配置会把 action 返回 CPU,便于交给机器人控制侧或后续评估代码。

tokenizer 是否会联网

默认 tokenizer 名是 google/paligemma-3b-pt-224。如果本地 HuggingFace cache 没有这个 tokenizer,首次构造 processor 可能会触发下载。离线环境下可以传入已经准备好的 tokenizer 对象:
processor = PI05Processor(
    tokenizer=my_tokenizer,
    image_size=224,
    num_images=3,
    tokenizer_max_length=200,
)