跳转到主要内容

概述

Cosmos3 的 policy 路径不是“从文字生成视频”的那条路。它更像一个会看、会想、也会试着动手的世界模型:给它一帧或一段 observation,再给一句任务描述,它可以预测接下来该怎么动;给它动作,它可以推一段可能发生的未来;给它一段已经发生的 transition,它也可以反过来猜中间用了什么动作。 这页默认使用 Cosmos3-Nano-Policy-DROID。如果目标是出 action,不要直接拿通用 Cosmos3-Nano 顶上;T2V/T2AV 生成路径仍然看 /zh/models/cosmos/ws1,那是另一条入口。 本页讲的是 ws1,也就是单卡路径。它覆盖三个 mode:
Mode输入输出
policyobservation image/video + promptaction chunk,可选 rollout video
forward_dynamicsobservation + prompt + 已知 actionrollout video,同时保留 action 输出
inverse_dynamicsobservation video + prompt解释这段 transition 的 action chunk
examples/cosmos3/run_cosmos3_policy.py 已经接好了这三个 mode。脚本里 decode_video=True 是打开的,所以只要 scheduler 返回 pixels,就会保存 rollout mp4;action 总会保存成 JSON。

架构

policy 路径使用 cosmos3_policy plugin。它和 T2V/T2AV 生成路径共用 Cosmos3 transformer,但 request 里多了 action latent、domain id 和 mode。视频和动作在同一个 denoise loop 里被推进,只是每个 mode 对“哪些是 clean 条件、哪些要生成”的定义不同。
phyai/src/phyai/models/cosmos3
main_cosmos3_policy.py
scheduler_ws1_cosmos3_policy.py
model_runner_policy_cosmos3.py
model_runner_vae_cosmos3.py
modeling_cosmos3.py
vae_wan.py
sampler_unipc.py
主要组件:
组件职责
Cosmos3PolicyEntry加载 transformer;当 decode_video=True 时额外加载 VAE
Cosmos3PolicyScheduler根据 mode 组织 video/action 的 clean mask 和 noised mask,运行 UniPC
Cosmos3ActionRunner调用 policy transformer,输出 video velocity 和 action velocity
Cosmos3PolicyProcessor处理 observation、prompt、action padding、domain id、输出 action 后处理

三种 mode 怎么理解

policy

policy 是最像机器人控制的一条路:你给它 observation 和任务,它预测 action chunk。默认情况下,第一帧 observation 是 clean 条件,后面的 video latent 和全部 action latent 都从噪声里生成。 适合问:“看到当前画面,要做什么?”

forward_dynamics

forward_dynamics 给模型一个 observation,再给一段已知 action,让它推演 rollout video。这里 action 是 clean 条件,video 是要生成的目标。 适合问:“如果这样动,接下来会发生什么?” 这个 mode 必须传 --action-file

inverse_dynamics

inverse_dynamics 反过来:给一段 observation video,让模型推断能解释这段变化的 action chunk。默认整段 video 都是 clean 条件,action 从噪声里恢复。 适合问:“从 A 到 B,中间大概做了什么动作?”

输入规范

Cosmos3PolicyProcessor.preprocess() 接收一个 dict。脚本会把 CLI 参数整理成这个 dict:
raw_input = {
    "images": observation,
    "task": prompt,
    "cond_action": action,  # 仅 forward_dynamics 需要
}
支持的原始输入:
字段类型备注
images图片路径、PIL image、numpy array、torch tensor,或这些对象的 list单图变成 1 帧;list 作为多帧 observation
task / promptstrlist[str]任务文本;list 时取第一条
cond_action / actionlist、numpy array 或 torch.Tensorforward_dynamics 需要
domain_name / domain_idstrint可覆盖 processor 构造参数
modestr可覆盖 processor 构造参数
图像会被转成 (1, 3, T, H, W),数值范围 [-1, 1]。如果传的是 --video,脚本会读取前 action_chunk_size + 1 帧;如果视频太短,会重复最后一帧补齐。

Domain 和 action 维度

Cosmos3 的 action 输出有两个宽度:
名称含义
action_dim模型内部动作宽度,默认 64
raw_action_dim机器人 embodiment 的真实动作宽度
processor 会把 conditioning action padding 到 action_dim,engine 输出后再 slice 回 raw_action_dim 常见 domain:
domain_namedomain_idraw_action_dim
bridge_orig_lerobot710
droid_lerobot810
agibotworld1529
fractal2010
如果你传的是整数 domain_id,processor 没法从名字推断 raw_action_dim,这时需要显式传 --raw-action-dim

运行路径

1

准备权重

准备一份 Cosmos3-Nano-Policy-DROID checkpoint。policy 路径至少需要:
/path/to/Cosmos3-Nano-Policy-DROID/
  transformer/
  text_tokenizer/
  scheduler/
  vae/             # decode_video=True 时需要
2

构造 Engine

插件名是 "cosmos3_policy"。示例脚本里 decode_video=True,所以会加载 VAE,并在输出里带上 decoded rollout pixels。
import torch

from phyai.engine import Engine, EngineArgs
from phyai.engine_config import DeviceConfig, EngineConfig, RuntimeConfig
from phyai.models.cosmos3.main_cosmos3_policy import Cosmos3PolicyArgs

checkpoint_dir = "/path/to/Cosmos3-Nano-Policy-DROID"

engine = Engine(
    EngineArgs(
        plugin="cosmos3_policy",
        plugin_args=Cosmos3PolicyArgs(
            checkpoint_dir=checkpoint_dir,
            flow_shift=10.0,
            use_karras_sigmas=None,
            decode_video=True,
        ),
        config=EngineConfig(
            device=DeviceConfig(target="cuda", params_dtype=torch.bfloat16),
            runtime=RuntimeConfig(use_cuda_graph=False),
        ),
    )
)
use_karras_sigmas=None 表示从 checkpoint 的 scheduler config 读取;示例也支持显式传 false 走 linear-flow + flow_shift
3

构造 Processor

Cosmos3PolicyProcessor 会处理 observation resize/pad、prompt tokenization、action padding、domain id 解析,以及输出 action 的 slice / 可选反归一化。
import torch

from phyai_utils_tools.models.cosmos3 import Cosmos3PolicyProcessor

processor = Cosmos3PolicyProcessor(
    tokenizer_name_or_path=f"{checkpoint_dir}/text_tokenizer",
    height=480,
    width=832,
    num_frames=17,
    mode="policy",
    domain_name="droid_lerobot",
    action_chunk_size=16,
    fps=24.0,
    image_size=480,
    prompt_format="json",
    view_point="ego_view",
    cond_frame_indexes=(0,),
    device="cuda",
    params_dtype=torch.bfloat16,
)
4

Preprocess 输入

processed = processor.preprocess(
    {
        "images": "/path/to/observation.png",
        "task": "robot picks up the cup",
    }
)
processed.video_shape 是像素尺寸 (T, H, W),给 request 前要用 pixel_to_latent_shape 转成 latent grid。
5

构造 Request

from phyai.models.cosmos3 import Cosmos3ActionRequest, pixel_to_latent_shape

request = Cosmos3ActionRequest(
    text_ids=processed.text_ids.to("cuda"),
    text_mask=processed.text_mask.to("cuda"),
    neg_text_ids=processed.neg_text_ids.to("cuda"),
    neg_text_mask=processed.neg_text_mask.to("cuda"),
    video_shape=pixel_to_latent_shape(*processed.video_shape),
    mode=processed.mode,
    domain_id=processed.domain_id,
    action_chunk=processed.action_chunk,
    raw_action_dim=processed.raw_action_dim,
    cond_video_pixels=processed.pixel_values.to(
        device="cuda", dtype=torch.bfloat16
    ),
    cond_action=(
        processed.cond_action.to(device="cuda", dtype=torch.bfloat16)
        if processed.cond_action is not None
        else None
    ),
    cond_frame_indexes=processed.cond_frame_indexes,
    fps=24.0,
    num_inference_steps=30,
    guidance_scale=1.0,
    seed=42,
)
6

Step 和后处理

result = engine.step(request)
output = processor.postprocess(result)
action = output["action"]
pixels = output.get("pixels")
action 一定会返回,shape 是 (1, action_chunk, raw_action_dim)。如果 engine 是 decode_video=Truepixels 也会返回,范围 [0, 1]

脚本示例

Policy

单图 observation,预测 action:
uv run python examples/cosmos3/run_cosmos3_policy.py \
    --checkpoint /path/to/Cosmos3-Nano-Policy-DROID \
    --image observation.png \
    --prompt "robot picks up the cup" \
    --domain-name droid_lerobot \
    --out .cache/cosmos3_policy_out
输出:
文件内容
.cache/cosmos3_policy_out_action.jsonaction chunk
.cache/cosmos3_policy_out.mp4rollout video,如果返回了 decoded pixels

Forward dynamics

给定 action,生成 rollout video:
uv run python examples/cosmos3/run_cosmos3_policy.py \
    --checkpoint /path/to/Cosmos3-Nano-Policy-DROID \
    --image observation.png \
    --prompt "robot pushes the object forward" \
    --domain-name droid_lerobot \
    --mode forward_dynamics \
    --action-file action.json \
    --out .cache/cosmos3_forward_out
action.json 支持两种格式:
{
  "shape": [2, 10],
  "dtype": "float32",
  "data": [
    [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
    [0.1, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
  ]
}
或:
{
  "action_chunks": [
    [
      [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
      [0.1, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
    ]
  ]
}
上面只是文件结构示意,数值要换成你的真实动作。DROID 的默认 raw_action_dim10;如果步数少于 action_chunk_size,processor 会重复最后一步补齐。

Inverse dynamics

给一段 observation video,反推 action:
uv run python examples/cosmos3/run_cosmos3_policy.py \
    --checkpoint /path/to/Cosmos3-Nano-Policy-DROID \
    --video obs.mp4 \
    --prompt "robot moves the cup to the right" \
    --domain-name droid_lerobot \
    --mode inverse_dynamics \
    --condition-frames 0,1 \
    --out .cache/cosmos3_inverse_out
如果不传 --condition-frames,脚本默认单图使用 0,视频使用 0,1

输出后处理

Cosmos3PolicyProcessor.postprocess() 做三件事:
  • 从 tensor 或 dict 里取出 action
  • 把 action slice 到 raw_action_dim
  • 如果传了 action_stats_path,把 action 反归一化回物理单位。
支持的反归一化方式:
action_normalization需要的 stats 字段
meanstdmeanstd
minmaxminmax
quantileq01q99
quantile_rotglobal_raw.q01global_raw.q99
没有 action_stats_path 时,action 保持模型输出的归一化尺度。

当前限制

  • 当前脚本一次处理一个请求,主要用于验证路径和跑样例,不是服务端调度器。
  • action/policy 示例默认用 DROID policy checkpoint 和 droid_lerobot。如果换 embodiment,需要同时换到匹配的 policy 权重、domain 和 action stats。
  • decode_video=True 会加载 VAE 并保存 rollout video;如果只关心 action latency,可以在代码里关闭它。
  • forward_dynamics 必须提供 action 文件;processor 会裁剪或 repeat 最后一帧补到 action_chunk_size
  • domain_name 不能解析出 raw_action_dim 时,需要显式传 --raw-action-dim
  • CUDA graph 目前还不是这条路径的核心优化点;代码里也留着后续优化空间。先把语义跑通,再谈吞吐。

完整代码

import torch

from phyai.engine import Engine, EngineArgs
from phyai.engine_config import DeviceConfig, EngineConfig, RuntimeConfig
from phyai.models.cosmos3 import Cosmos3ActionRequest, pixel_to_latent_shape
from phyai.models.cosmos3.main_cosmos3_policy import Cosmos3PolicyArgs
from phyai_utils_tools.models.cosmos3 import Cosmos3PolicyProcessor

checkpoint_dir = "/path/to/Cosmos3-Nano-Policy-DROID"
device = "cuda"
dtype = torch.bfloat16

engine = Engine(
    EngineArgs(
        plugin="cosmos3_policy",
        plugin_args=Cosmos3PolicyArgs(
            checkpoint_dir=checkpoint_dir,
            flow_shift=10.0,
            use_karras_sigmas=None,
            decode_video=True,
        ),
        config=EngineConfig(
            device=DeviceConfig(target=device, params_dtype=dtype),
            runtime=RuntimeConfig(use_cuda_graph=False),
        ),
    )
)

try:
    processor = Cosmos3PolicyProcessor(
        tokenizer_name_or_path=f"{checkpoint_dir}/text_tokenizer",
        height=480,
        width=832,
        num_frames=17,
        mode="policy",
        domain_name="droid_lerobot",
        action_chunk_size=16,
        fps=24.0,
        image_size=480,
        prompt_format="json",
        view_point="ego_view",
        cond_frame_indexes=(0,),
        device=device,
        params_dtype=dtype,
    )

    processed = processor.preprocess(
        {
            "images": "/path/to/observation.png",
            "task": "robot picks up the cup",
        }
    )
    request = Cosmos3ActionRequest(
        text_ids=processed.text_ids.to(device),
        text_mask=processed.text_mask.to(device),
        neg_text_ids=processed.neg_text_ids.to(device),
        neg_text_mask=processed.neg_text_mask.to(device),
        video_shape=pixel_to_latent_shape(*processed.video_shape),
        mode=processed.mode,
        domain_id=processed.domain_id,
        action_chunk=processed.action_chunk,
        raw_action_dim=processed.raw_action_dim,
        cond_video_pixels=processed.pixel_values.to(device=device, dtype=dtype),
        cond_action=(
            processed.cond_action.to(device=device, dtype=dtype)
            if processed.cond_action is not None
            else None
        ),
        cond_frame_indexes=processed.cond_frame_indexes,
        fps=24.0,
        num_inference_steps=30,
        guidance_scale=1.0,
        seed=42,
    )

    result = engine.step(request)
    output = processor.postprocess(result)
    print(output["action"].shape)
finally:
    engine.close()