> ## Documentation Index
> Fetch the complete documentation index at: https://phyai.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

# PI0.5 Processors

> 使用 lerobot 格式兼容的 pi0.5 preprocessor 和 postprocessor

# 概述

`PI05Processor` 位于 `phyai_utils_tools.models.pi05`。它负责把真实机器人侧的数据转换成 `PI05Request` 需要的 canonical tensor，也负责把模型输出的 action chunk 转回数据集真实 action 维度。

PhyAI 的 pi0.5 scheduler 不做图像 resize、文本 tokenization、state 离散化或 action unnormalize。这些逻辑都在 processor 里完成：

| 阶段            | 输入                      | 输出                                                        |
| ------------- | ----------------------- | --------------------------------------------------------- |
| `preprocess`  | `images`、`task`、`state` | `PI05ProcessedInputs(pixel_values, input_ids, lang_lens)` |
| `engine.step` | `PI05Request`           | `(B, chunk_size, max_action_dim)`                         |
| `postprocess` | raw action chunk        | `(B, chunk_size, action_dim)`                             |

<Note>
  `pi05_base` 的公开 checkpoint 里 normalizer 的 `features` 为空，所以默认 state/action normalize 是 no-op。若你的 lerobot checkpoint 带有 dataset stats，`from_pretrained` 会加载这些 stats sidecar，并在 pre/postprocess 中使用。
</Note>

# 输入规范

`preprocess` 接收一个 transition dict。常用字段如下：

| 字段       | 类型                                    | 备注                                                           |
| -------- | ------------------------------------- | ------------------------------------------------------------ |
| `images` | `list[torch.Tensor]` 或 `torch.Tensor` | 每路相机是 `(B, C, H, W)`；也可以传 stacked `(B, num_images, C, H, W)` |
| `task`   | `list[str]` 或 `str`                   | 每个 batch 样本一条任务文本                                            |
| `state`  | `torch.Tensor`                        | `(B, state_dim)`，pi0.5 prompt 使用 `[-1, 1]` 范围内的状态值           |

输出的 `PI05ProcessedInputs` 字段可以直接映射到 `PI05Request`：

| 字段             | Shape                                        | 备注                                                     |
| -------------- | -------------------------------------------- | ------------------------------------------------------ |
| `pixel_values` | `(B, num_images, C, image_size, image_size)` | 默认 `num_images=3`、`C=3`、`image_size=224`               |
| `input_ids`    | `(B, tokenizer_max_length)` int64            | 默认 tokenizer 是 `google/paligemma-3b-pt-224`，右侧 padding |
| `lang_lens`    | `(B,)` int64                                 | 每条 prompt 的真实 token 长度                                 |

图像会先按比例 resize 到正方形并 padding，再 stack 成 scheduler 需要的 `(B, num_images, C, H, W)`。`normalize_pixels=True` 时，processor 会把 `[0, 1]` 像素映射到 `[-1, 1]`。

# 从 checkpoint 构造

如果你的 checkpoint 目录里有 lerobot 格式的 `policy_preprocessor.json` 和 `policy_postprocessor.json`，优先用 `from_pretrained`。这条路径会保留 checkpoint 里记录的 processor step、normalizer 配置和 stats sidecar，同时补上 PhyAI 推理侧需要的 vision resize 和 action slice。

```python theme={null}
from pathlib import Path

import torch

from phyai_utils_tools.models.pi05 import PI05Processor

processor = PI05Processor.from_pretrained(
    Path("/path/to/pi05_base"),
    image_size=224,
    num_channels=3,
    num_images=3,
    action_dim=7,
    device="cuda",
    params_dtype=torch.bfloat16,
)
```

这条构造路径的行为：

* 加载 `policy_preprocessor.json` 和 `policy_postprocessor.json`。
* 给 tokenizer step 注入 HuggingFace tokenizer 对象。
* 把 preprocess 的 `device_processor` 指向 `device`，让模型输入落在推理设备上。
* postprocess 默认按 checkpoint 配置返回 CPU tensor；`pi05_base` 的 postprocessor 就是这样配置的。
* 在 loaded preprocessor 前追加 resize / optional pixel normalize。
* 在 loaded postprocessor 后追加 `SliceActionStep(action_dim=action_dim)`。

# 手工构造

没有 processor json，或者只想用 `pi05_base` 默认行为时，可以直接构造 `PI05Processor`：

```python theme={null}
import torch

from phyai_utils_tools.models.pi05 import PI05Processor

processor = PI05Processor(
    image_size=224,
    num_channels=3,
    num_images=3,
    tokenizer_max_length=200,
    action_dim=7,
    device="cuda",
    params_dtype=torch.bfloat16,
)
```

手工构造的 preprocess pipeline 顺序是：

<Steps>
  <Step title="Resize cameras">
    `ResizeWithPadStep` 读取 `images`，校验相机数量和通道数，把每路相机 resize/pad 到 `image_size × image_size`。
  </Step>

  <Step title="Normalize state">
    `NormalizerStep` 根据 `dataset_stats` 和 `PI05_NORM_MAP` 处理 `state`。没有 stats 时是 no-op。
  </Step>

  <Step title="Build prompt">
    `StateTokenizerPrepareStep` 把 `state` 离散成 256 个 bins，并组装成 `Task: <task>, State: <bins>;\nAction: `。
  </Step>

  <Step title="Tokenize">
    `TokenizerStep` 使用 PaliGemma tokenizer，把 prompt 编码成 `input_ids` 和 `lang_lens`。
  </Step>

  <Step title="Move tensors">
    `DeviceStep` 把 tensor 移到 `device`，并把浮点 tensor 转成 `params_dtype`。
  </Step>
</Steps>

postprocess pipeline 先做 action unnormalize，再按 `action_dim` 裁掉模型内部 padding 的 action 维度，最后把结果放回 CPU。

# 和 Engine 串起来

下面的例子展示 raw 相机、任务文本和 state 如何经过 `PI05Processor` 变成 `PI05Request`，再交给 `Engine` 推理。

```python theme={null}
from pathlib import Path

import torch

from phyai.engine import Engine, EngineArgs
from phyai.engine_config import DeviceConfig, EngineConfig, RuntimeConfig
from phyai.models.pi05.configuration_pi05 import PI05Config
from phyai.models.pi05.main_pi05 import PI05Args
from phyai.models.pi05.scheduler_ws1_pi05 import PI05Request
from phyai.utils import load_config
from phyai_utils_tools.models.pi05 import PI05Processor

checkpoint_dir = Path("/path/to/pi05_base")
cfg = load_config(checkpoint_dir, PI05Config)
device = torch.device("cuda")
dtype = torch.bfloat16
batch_size = 1
action_dim = 7

processor = PI05Processor.from_pretrained(
    checkpoint_dir,
    image_size=cfg.vision.image_size,
    num_channels=cfg.vision.num_channels,
    num_images=3,
    action_dim=action_dim,
    device=device,
    params_dtype=dtype,
)

engine = Engine(
    EngineArgs(
        plugin="pi05",
        plugin_args=PI05Args(
            checkpoint_dir=checkpoint_dir,
            max_batch_size=batch_size,
        ),
        config=EngineConfig(
            device=DeviceConfig(target="cuda", params_dtype=dtype),
            runtime=RuntimeConfig(use_cuda_graph=True),
        ),
    )
)

try:
    raw = {
        "images": [
            torch.rand(batch_size, 3, 480, 640, device=device),
            torch.rand(batch_size, 3, 480, 640, device=device),
            torch.rand(batch_size, 3, 480, 640, device=device),
        ],
        "task": ["pick up the cup"],
        "state": torch.rand(batch_size, 7, device=device) * 2 - 1,
    }

    processed = processor.preprocess(raw)
    request = PI05Request(
        pixel_values=processed.pixel_values,
        input_ids=processed.input_ids,
        lang_lens=processed.lang_lens,
    )

    raw_actions = engine.step(request)
    actions = processor.postprocess(raw_actions)
    print(actions.shape)
finally:
    engine.close()
```

<Tip>
  如果只想测 engine 本身的 latency，可以跳过 processor，直接构造已经 resize/tokenize 好的 `PI05Request`。`examples/pi05/run_pi05.py --raw` 就是这条路径。
</Tip>

# 保存和加载

手工构造的 processor 可以保存成 lerobot 兼容的 json：

```python theme={null}
processor.save_pretrained("/tmp/pi05_processor")
```

保存结果包含：

| 文件                          | 内容                                                 |
| --------------------------- | -------------------------------------------------- |
| `policy_preprocessor.json`  | normalizer、pi0.5 prompt step、tokenizer、device step |
| `policy_postprocessor.json` | unnormalizer、device step                           |
| `*.safetensors`             | 仅当 normalizer / unnormalizer 有 stats 时生成           |

PhyAI 侧的 vision resize、optional pixel normalize 和 action slice 不写进 json；它们在 `PI05Processor.from_pretrained(...)` 里根据构造参数重新补上。这和 lerobot 的边界一致：图像 resize 和 action 裁剪属于推理侧模型胶水，不属于 checkpoint json 的通用 processor core。

# 常见问题

## `images` shape 不匹配

`num_images` 和 `num_channels` 必须和 processor 构造参数一致。默认 `pi05_base` 是 3 路 RGB 相机，所以 list 输入需要 3 个 `(B, 3, H, W)` tensor，stacked 输入需要 `(B, 3, 3, H, W)`。

## `state` 是否必须传

`StateTokenizerPrepareStep` 支持没有 `state` 的路径；此时 prompt 只包含任务文本，不包含 state bins。但 pi0.5 的常规机器人推理路径应传入 proprioceptive state。

## action 输出为什么回到 CPU

`PI05Processor.from_pretrained` 不会覆盖 checkpoint postprocessor 里的 `device_processor`。`pi05_base` 的 postprocessor 配置会把 action 返回 CPU，便于交给机器人控制侧或后续评估代码。

## tokenizer 是否会联网

默认 tokenizer 名是 `google/paligemma-3b-pt-224`。如果本地 HuggingFace cache 没有这个 tokenizer，首次构造 processor 可能会触发下载。离线环境下可以传入已经准备好的 tokenizer 对象：

```python theme={null}
processor = PI05Processor(
    tokenizer=my_tokenizer,
    image_size=224,
    num_images=3,
    tokenizer_max_length=200,
)
```
