跳转到主要内容

概述

Cosmos3-Nano-Policy-DROID 是 Cosmos3 系列里的 policy model。Cosmos3 本身是面向 Physical AI 的 omnimodal world model;policy 版本接收语言指令和 DROID 机器人平台的视觉 observation,生成用于 manipulation 和 control 的 robot action trajectory。 本页讲的是 Cosmos3 policy 的多卡路径,也就是 cosmos3_policy_wn plugin。它覆盖 policyforward_dynamicsinverse_dynamics 三种 mode:视频 latent 和 action latent 在同一个 denoise loop 里推进,最终返回 action;如果开启 decode_video=True,还会返回 rollout video。 PhyAI 目前在这条路径上支持两类并行:policy transformer 沿 tp 轴做 tensor parallel;当 cfg=2guidance_scale > 1 时,cond / uncond 两个 CFG branch 会被分到两个 TP group 上并行运行。rollout video 的 VAE decode 也会按 rank 做空间 tile 切分,并用 halo overlap 合并边界。

Mode 与输出

三种 mode 的 clean / noisy 规则如下:
ModeClean videoClean action生成目标
policy默认第 0 个 latent frame,或 cond_frame_indexes 指定的帧action chunk,可选 rollout video
forward_dynamics默认第 0 个 latent frame,或 cond_frame_indexes 指定的帧cond_action 全部动作步rollout video
inverse_dynamics默认所有 video latent frame,或 cond_frame_indexes 指定的帧解释 observation transition 的 action chunk
输出始终包含 action。decode_video=True 时,plugin 会额外返回 video latent 和 decoded pixels。
KeyShape / 类型说明
action[B, action_chunk, raw_action_dim]padding tail 已经裁掉
video[B, C, t_lat, h_lat, w_lat]rollout / denoised video latent
pixels[B, 3, T, H, W],可选decode_video=True 且 checkpoint 有 VAE 时返回
Cosmos3 policy 内部 action 宽度默认是 action_dim=64。真实机器人动作宽度由 raw_action_dim 决定,scheduler 会在输出前裁掉 padding tail。

并行拓扑

下面用 TP=4CFG=2world_size=8 作为例子。rank 0-3 是 cond branch 的一个 TP group,rank 4-7 是 uncond branch 的另一个 TP group。每个 denoise step 里,同一 branch 的 4 个 TP rank 会一起跑 transformer forward,不是流水顺序。 Cosmos3 WN TP=4 CFG=2 的 8 卡并行拓扑 P.all_gather(axis="cfg") 使用 engine 初始化时创建的 parallel mesh。ParallelConfig(world_size=cfg_size * tp_size, cfg_size=cfg_size, tp_size=tp_size) 会把 rank 映射成 (cfg_rank, tp_rank);沿 cfg 轴 gather 时,只收集相同 tp_rank、不同 cfg_rank 的 rank。这样每个 TP 分片都能拿到 cond / uncond velocity,再在本地完成 CFG combine。 VAE 的 8 卡切分示意如下,cfg 作为 outer axis,tp 作为 inner axis: Cosmos3 WAN VAE 8 卡空间切分示意

运行路径

1

准备权重和输入

准备一份 Cosmos3-Nano-Policy-DROID checkpoint。如果需要输出 rollout video,checkpoint 里还需要 vae/
/path/to/Cosmos3-Nano-Policy-DROID/
  transformer/
  text_tokenizer/
  scheduler/
  vae/             # decode_video=True 时需要
policyinverse_dynamics 可以输入 observation image 或 video;forward_dynamics 还需要 action JSON。
2

构造多卡 Policy Engine

插件名是 "cosmos3_policy_wn"torchrun --nproc_per_node 必须等于 cfg_size * tp_size
import torch

from phyai.engine import Engine, EngineArgs
from phyai.engine_config import (
    DeviceConfig,
    EngineConfig,
    ParallelConfig,
    RuntimeConfig,
)
from phyai.models.cosmos3.main_cosmos3_policy_wn import Cosmos3PolicyWNArgs

checkpoint_dir = "/path/to/Cosmos3-Nano-Policy-DROID"
local_rank = 0
cfg_size = 1
tp_size = 4

engine = Engine(
    EngineArgs(
        plugin="cosmos3_policy_wn",
        plugin_args=Cosmos3PolicyWNArgs(
            checkpoint_dir=checkpoint_dir,
            flow_shift=10.0,
            use_karras_sigmas=None,
            decode_video=True,
        ),
        config=EngineConfig(
            device=DeviceConfig(
                target=f"cuda:{local_rank}",
                params_dtype=torch.bfloat16,
            ),
            parallel=ParallelConfig(
                world_size=cfg_size * tp_size,
                cfg_size=cfg_size,
                tp_size=tp_size,
            ),
            runtime=RuntimeConfig(use_cuda_graph=False),
        ),
    )
)
3

构造输入处理器

Cosmos3PolicyProcessor 负责 observation resize/pad、prompt tokenization、domain id、action padding 和输出后处理。
from phyai_utils_tools.models.cosmos3 import Cosmos3PolicyProcessor

processor = Cosmos3PolicyProcessor(
    tokenizer_name_or_path=f"{checkpoint_dir}/text_tokenizer",
    height=480,
    width=832,
    num_frames=17,
    mode="policy",
    domain_name="droid_lerobot",
    action_chunk_size=16,
    fps=24.0,
    image_size=480,
    prompt_format="json",
    view_point="ego_view",
    cond_frame_indexes=(0,),
    device=f"cuda:{local_rank}",
    params_dtype=torch.bfloat16,
)

processed = processor.preprocess(
    {
        "images": "/path/to/observation.png",
        "task": "robot picks up the cup",
    }
)
4

构造 Request

Cosmos3ActionRequest 不包含并行信息。并行拓扑来自 engine config;request 只描述这次 policy 请求。
from phyai.models.cosmos3 import Cosmos3ActionRequest, pixel_to_latent_shape

device = f"cuda:{local_rank}"
dtype = torch.bfloat16

request = Cosmos3ActionRequest(
    text_ids=processed.text_ids.to(device),
    text_mask=processed.text_mask.to(device),
    neg_text_ids=processed.neg_text_ids.to(device),
    neg_text_mask=processed.neg_text_mask.to(device),
    video_shape=pixel_to_latent_shape(*processed.video_shape),
    mode=processed.mode,
    domain_id=processed.domain_id,
    action_chunk=processed.action_chunk,
    raw_action_dim=processed.raw_action_dim,
    cond_video_pixels=processed.pixel_values.to(device=device, dtype=dtype),
    cond_action=(
        processed.cond_action.to(device=device, dtype=dtype)
        if processed.cond_action is not None
        else None
    ),
    cond_frame_indexes=processed.cond_frame_indexes,
    fps=24.0,
    num_inference_steps=30,
    guidance_scale=1.0,
    seed=42,
)
5

所有 rank 同时运行

每个 rank 都必须调用 engine.step(request)。scheduler 内部会在 tpcfg 轴上触发 collective,不能只让 rank 0 运行。
result = engine.step(request)
6

只在 rank 0 保存结果

示例脚本只让 rank 0 做 postprocess、action JSON 和 mp4 写入,避免多个进程写同一个文件。
if local_rank == 0:
    output = processor.postprocess(result)
    action = output["action"]
    pixels = output.get("pixels")

运行示例

TP-only 的 4 卡 policy 推理:
torchrun --nproc_per_node=4 examples/cosmos3/run_cosmos3_policy_wn.py \
    --tp 4 \
    --checkpoint /path/to/Cosmos3-Nano-Policy-DROID \
    --image observation.png \
    --prompt "robot picks up the cup" \
    --domain-name droid_lerobot \
    --out .cache/cosmos3_policy_wn
CFG parallel + TP 的 8 卡 policy 推理:
torchrun --nproc_per_node=8 examples/cosmos3/run_cosmos3_policy_wn.py \
    --cfg 2 \
    --tp 4 \
    --guidance-scale 4.0 \
    --checkpoint /path/to/Cosmos3-Nano-Policy-DROID \
    --image observation.png \
    --prompt "robot picks up the cup" \
    --domain-name droid_lerobot \
    --out .cache/cosmos3_policy_wn
Forward dynamics 需要传入 action 文件:
torchrun --nproc_per_node=4 examples/cosmos3/run_cosmos3_policy_wn.py \
    --tp 4 \
    --checkpoint /path/to/Cosmos3-Nano-Policy-DROID \
    --image observation.png \
    --prompt "robot pushes the object forward" \
    --domain-name droid_lerobot \
    --mode forward_dynamics \
    --action-file action.json \
    --out .cache/cosmos3_forward_wn
Inverse dynamics 通常传 observation video,并指定 clean latent frame:
torchrun --nproc_per_node=4 examples/cosmos3/run_cosmos3_policy_wn.py \
    --tp 4 \
    --checkpoint /path/to/Cosmos3-Nano-Policy-DROID \
    --video obs.mp4 \
    --prompt "robot moves the cup to the right" \
    --domain-name droid_lerobot \
    --mode inverse_dynamics \
    --condition-frames 0,1 \
    --out .cache/cosmos3_inverse_wn
--nproc_per_node 必须等于 --cfg * --tp。Policy 示例默认 guidance_scale=1.0,这时 cfg=2 没有收益;只有把 --guidance-scale 设到大于 1,CFG parallel 才有意义。

实现注意

  • decode_video=True 需要 checkpoint 里有 vae/;否则只能返回 action 和 video latent。
  • forward_dynamics 必须提供 cond_action,processor 会负责把原始 action pad 到 action_dim
  • 这条路径仍然是一次处理一个 request 的示例/基线路径,不是 continuous batching scheduler。