WALL-OSS Full Deployment Guide

Community Article Published November 19, 2025

This article provides a systematic introduction to the complete engineering deployment pipeline of WALL-OSS, covering the end-to-end loop from data collection to model training and inference. By following this guide, you will be able to independently run a full WALL-OSS system.


1. Introduction to WALL-OSS

WALL-OSS (World-Action-Language-Learning – Open Source System) is an open-source foundation model for embodied intelligence, proposed by the XSquare Robot team in 2025. Its goal is to bridge the gap between vision–language–action (VLA) and extend the semantic capabilities of large vision-language models (VLMs) into real-world physical interaction.

image image

Technically, WALL-OSS introduces a tightly coupled multimodal architecture (tightly-coupled MoE structure) that integrates both discrete and continuous action modeling strategies. Through a two-stage training pipeline (Inspiration β†’ Integration), the model gradually unifies semantic reasoning and high-frequency action generation. Its core innovations include:

  • Embodied perception–enhanced multimodal pretraining: Large-scale training on unified vision–language–action data to strengthen spatial, causal, and manipulation understanding.
  • Unified Cross-Level Chain-of-Thought (Uni-CoT): A single differentiable framework that unifies high-level instruction reasoning, sub-task decomposition, and fine-grained action synthesis, forming a continuous chain from β€œunderstanding” to β€œexecution.”
  • Mixture-of-Experts (MoE) action heads: Dynamically activating experts depending on the task phase and modeling actions in discrete or continuous space to maintain stable VLM priors.
  • Two-stage training paradigm:
    • Inspiration stage: Injecting discrete action priors to strengthen spatial understanding and semantic-action alignment.
    • Integration stage: Using flow matching to achieve high-frequency continuous control. Overall, WALL-OSS is not merely a model but an engineering and scientific paradigm that connects semantic reasoning with physical action, providing a verifiable technical route toward embodied AGI.

2. WALL-OSS Deployment Pipeline

This article does not focus on the internal architecture or algorithmic details of WALL-OSS. Instead, it emphasizes the complete engineering deployment pipeline. We will walk through how to use the open-source Lerobot S101 platform to build an end-to-end workflow consisting of data collection, model finetuning, and inference deployment, including system dependencies, task configuration, model training procedures, and inference service setup. By following this step-by-step guide, readers will understand how WALL-OSS operates on real robots and can quickly reproduce or extend its embodied intelligence capabilities. This article includes the following three parts:

  1. Data collection using the open-source S101 platform (other platforms may refer to this workflow)
  2. Model finetuning
  3. Model deployment and validation

2.1 Data Collection Using the Open-Source S101 Platform

2.1.1 Setting Up and Using the S101 Platform

For setup and calibration of the S101 platform, refer to the official tutorial or the Seeed Studio documentation. Note: This tutorial uses Lerobot 0.3.4 and Lerobot-dataset v2.1. Different versions may behave differently. After assembling and calibrating the robot, test teleoperation using:

lerobot-teleoperate \
    --robot.type=so101_follower \
    --robot.port=YOUR_PORT \
    --robot.cameras= YOUR_CAMERAS \
    --robot.id=YOUR_ROBOT_ID \
    --teleop.type=so101_leader \
    --teleop.port=YOUR_PORT \
    --teleop.id=YOUR_ROBOT_ID \
    --display_data=true

Camera configurations can be customized. A typical setup uses one top camera and one side or wrist camera.

Setup

2.1.2 Data Collection

Once teleoperation works, start collecting data:

lerobot-record \
  --robot.type=so101_follower \
  --robot.port=YOUR_PORT \
  --robot.id=YOUR_ROBOT_ID \
  --robot.cameras=YOUR_CAMERAS \
  --teleop.type=so101_leader \
  --teleop.port=YOUR_PORT \
  --teleop.id=YOUR_ROBOT_ID \
  --dataset.num_episodes=NUM_EPISODES \
  --dataset.single_task=YOUR_TASK \
  --dataset.push_to_hub=false \
  --dataset.episode_time_s=60 \
  --dataset.reset_time_s=60 \
  --display_data=true \
  --dataset.repo_id=YOUR_REPO_ID

dataset.episode_time_s and dataset.reset_time_s should be set longer than the usual duration (2–3Γ—) to avoid premature termination. During data collection/reset, you may:

  • Press ➑️ to skip early to the next reset/collection stage
  • Press ⬅️ to go back if the current collection or reset is problematic

A simple diagram:

πŸ“Š Example Data Collection & Reset Flow

[Reset #1] ──▢ [Collect #1] ──▢ [Reset #2] ──▢ [Collect #2] ──▢ [Reset #3] ──▢ [Collect #3]
     β–²              β–²              β–²
     β”‚              β”‚              β”‚
    ⬅️ Back        ⬅️ Back        ⬅️ Back
     β”‚              β”‚              β”‚
     └──────────────┴──────────────┴──────────────▢
                       ➑️ Forward (Next Stage)

Example:
- Currently at **Collect #2**
- If data is wrong β†’ press **⬅️** to return to **Reset #2**
- If data is OK β†’ press **➑️** to enter **Reset #3**

To collect large datasets, you can press ESC to stop and resume later with: --resume=true

2.1.3 Data Visualization & Replay

Before finetuning, verify data correctness via:

  1. Visualization, including videos, state and action trajectories. You can examine the data distribution (e.g. spatial distribution of objects, action distribution)
python src/lerobot/scripts/visualize_dataset_html.py --repo-id YOUR_REPO_ID
  1. Trajectory replay, examining the physical repeatability (optional)
lerobot-replay \
    --robot.type=so101_follower \
    --robot.port=YOUR_PORT \
    --robot.id=YOUR_ROBOT_ID \
    --dataset.repo_id=YOUR_REPO_ID \
    --dataset.episode=0

2.2 Model Finetuning

2.2.1 Environment Setup

Follow the official WALL-OSS repo for installation. If flash-attn or wallx builds slowly, increase MAX_JOBS (e.g., 16, 32). For easier customization, install wall-x in editable mode:

MAX_JOBS=16 pip install --no-build-isolation --verbose -e .

2.2.2 Preparing the Dataset

WALL-OSS supports LeRobot datasets natively, so S101-collected data can be used directly. Other datasets must be stored in LeRobot format or converted afterwards.

A typical LeRobotDataset v2.1 structure:

.
β”œβ”€β”€ data
β”‚   β”œβ”€β”€ chunk-000
β”‚   β”‚   β”œβ”€β”€ episode_000000.parquet
β”‚   β”‚   β”œβ”€β”€ ...
β”œβ”€β”€ meta
β”‚   β”œβ”€β”€ episodes.jsonl
β”‚   β”œβ”€β”€ info.json
β”‚   β”œβ”€β”€ stats.json
β”‚   └── tasks.jsonl
└── videos
    β”œβ”€β”€ chunk-000
    β”‚   β”œβ”€β”€ observation.images.laptop/episode_000000.mp4
    β”‚   β”œβ”€β”€ ...

2.2.3 Download Pretrained Weights & Configure Training

Download FLOW/FAST pretrained models:

hf download x-square-robot/wall-oss-flow --local-dir /path/to/wall-oss-flow
hf download x-square-robot/wall-oss-fast --local-dir /path/to/wall-oss-fast

Clone FAST tokenizer:

git clone https://huggingface.co/physical-intelligence/fast /path/to/fast

Modify the training config (example for S101 single-arm finetuning):

# wall-x/workspace/lerobot_example/config_qact.yml

# Model and paths configuration, for flow
log_name: "lerobot_training_flow"
log_project: "wall-oss_finetuning"
model_type: wall-oss
pretrained_wallx_path: "/path/to/pretrained/wall-oss-flow"  # Must set
save_path: "/path/to/workspace/flow_checkpoints"                # Must set
use_fast_tokenizer: False                       # True: train FAST, False: train Flow
action_tokenizer_path: None         # Must set if use_fast_tokenizer is true

# # Model and paths configuration
# log_name: "lerobot_training_fast"
# log_project: "wall-oss_finetuning"
# model_type: wall-oss
# pretrained_wallx_path: "/path/to/wall-oss-fast"  # Must set
# save_path: "/path/to/workspace/fast_checkpoints"                # Must set
# use_fast_tokenizer: True                       # True:fla train FAST, False: train Flow
# action_tokenizer_path: "/path/to/fast"         # Must set if use_fast_tokenizer is true

# Torch Profile
profile: False
profile_save_path: /path/to/profile/
profile_wait_iters: 10
profile_warmup_iters: 5
profile_active_iters: 2

# Training hyperparameters
num_warmup_steps: 100
num_training_steps: 64000000
learning_rate: 0.00005
min_lr: 0.00005
num_epoch: 100
gradient_accumulation_steps: 32
batch_size_per_gpu: 8
padding_side: left
epoch_save_interval: 10

# Training optimization settings
FSDP2: True
torch_compile: False

# Robot configuration - Define degrees of freedom for each component
dof_config:
  shoulder_pan.pos: 1
  shoulder_lift.pos: 1
  elbow_flex.pos: 1
  wrist_flex.pos: 1
  wrist_roll.pos: 1
  gripper.pos: 1

# Agent proprioception configuration (typically matches DOF config)
agent_pos_config:
  shoulder_pan.pos: 1
  shoulder_lift.pos: 1
  elbow_flex.pos: 1
  wrist_flex.pos: 1
  wrist_roll.pos: 1
  gripper.pos: 1

# # Checkpoint resuming configuration
# resume:
#   ckpt: "/path/to/resume_model/"
#   load_ckpt_only: true

norm_stats_path: None

enable_customized_robot_config: true
customized_robot_config:
  name: "YOUR_REPO_ID"
  customized_dof_config:
    "shoulder_pan.pos" : 1
    "shoulder_lift.pos" : 1
    "elbow_flex.pos" : 1
    "wrist_flex.pos" : 1
    "wrist_roll.pos" : 1
    "gripper.pos" : 1

  customized_agent_pos_config:
    "shoulder_pan.pos" : 1
    "shoulder_lift.pos" : 1
    "elbow_flex.pos" : 1
    "wrist_flex.pos" : 1
    "wrist_roll.pos" : 1
    "gripper.pos" : 1

# Data configuration
data:
  use_lerobot: true

  # LeRobot dataset configuration
  lerobot_config:
    repo_id: "YOUR_REPO_ID"
    root: null
    episodes: null
    image_transforms: null
    delta_timestamps: null
    tolerance_s: 1e-4
    revision: null
    force_cache_sync: false
    download_videos: true
    video_backend: null

  action_horizon: 32
  train_test_split: 0.95

  # Action keys for observation and prediction
  obs_action_keys:
    - shoulder_pan.pos
    - shoulder_lift.pos
    - elbow_flex.pos
    - wrist_flex.pos
    - wrist_roll.pos
    - gripper.pos

  predict_action_keys:
    - shoulder_pan.pos
    - shoulder_lift.pos
    - elbow_flex.pos
    - wrist_flex.pos
    - wrist_roll.pos
    - gripper.pos

  # Image resolution configuration for different camera views
  resolution:
    face_view: 256
    left_wrist_view: 256
    right_wrist_view: 256
    move1_view: 256
    move2_view: 256
    top_view: 256
    wall_view: 256
    multi_modal: 256

Be sure to:

  • Adjust batch_size_per_gpu based on GPU memory
  • Add YOUR_REPO_ID to ACTION_DATASET_NAMES and KEY_MAPPINGS in WALL-X source code
# wall_x/data/config.py
ACTION_DATASET_NAMES = [
    "x2_normal",
    "agibotworld_alpha",
    "droid",
    "fractal",
    "bridge_data_v2",
    "DobbE",
    "RH20T",
    "UMI-biarm",
    "austin_buds",
    "austin_sailor",
    "austin_sirius",
    "bc_z",
    "berkeley_autolab_ur5",
    "berkeley_cable_routing",
    "berkeley_fanuc_manipulation",
    "dlr_edan_shared_control",
    "fmb",
    "furniture_bench",
    "jaco_play",
    "nyu_rot",
    "stanford_hydra",
    "stanford_kuka_multimodal",
    "taco_play",
    "utaustin_mutex",
    "viola",
    "physical-intelligence/libero",
    "lerobot/aloha_mobile_cabinet",
    ########################################
    # Add your repo id here
    "YOUR_REPO_ID",
    ########################################
]
# wall_x/data/utils.py
KEY_MAPPINGS = {
    ########################################
    # Add your customized key mapping here
    "YOUR_REPO_ID": {
        "camera": {
            "observation.images.top": "face_view", # find a similar view
            "observation.images.side": "wall_view", # find a similar view
        },
        "state": "observation.state",
        "action": "action",
    },
    ########################################
    "lerobot/aloha_mobile_cabinet": {
        "camera": {
            "observation.images.cam_high": "face_view",
            "observation.images.cam_left_wrist": "left_wrist_view",
            "observation.images.cam_right_wrist": "right_wrist_view",
        },
        "state": "observation.state",
        "action": "action",
    },
    "physical-intelligence/libero": {
        "camera": {
            "image": "face_view",
            "wrist_image": "left_wrist_view",
        },
        "state": "state",
        "action": "actions",
    },
}

2.2.4 Compute Dataset Statistics

WALL-OSS normalizes actions using q1–q99 to avoid outliers (from the FAST paper). Therefore, we should compute the statistics of collected dataset before training. Modify scripts/compute_norm_stats.py:

path = /path/to/workspace/lerobot_example/config_qact.yml
output_path = /path/to/save/norm_stats

Run:

python scripts/compute_norm_stats.py --batch_size 256 --num_workers 2

This process may take a long time. You can adjust batch_size and 'num_workers' for acceleration. Afterwards, copy the generated norm_stats.json into norm_stats_path in your config.

2.2.5 Training

Modify GPU settings in workspace/lerobot_example/run.sh

#!/bin/bash
export CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 # your GPU setting
NUM_GPUS=$(echo $CUDA_VISIBLE_DEVICES | tr ',' '\n' | wc -l)

# print current time
echo "[current time: $(date +'%Y-%m-%d %H:%M:%S')]"

code_dir="/path/to/wall-x"
config_path="/path/to/wall-x/workspace/lerobot_example"

# Use a fixed port instead of a random one
export PORT=$((21000 + $RANDOM % 30000))

MASTER_PORT=10239 # use 5 digits ports

export LAUNCHER="accelerate launch --num_processes=$NUM_GPUS --main_process_port=$PORT"

export SCRIPT="${code_dir}/train_qact.py"
export SCRIPT_ARGS="--config ${config_path}/config_qact.yml --seed $MASTER_PORT"

echo "Running command: $LAUNCHER $SCRIPT $SCRIPT_ARGS"

$LAUNCHER $SCRIPT $SCRIPT_ARGS

then run training with:

bash ./workspace/lerobot_example/run.sh

You should see logs similar to:

[2025-11-11 18:51:07] epoch   9/ 10 | iter   2894/  2951 | loss 1.522499 | lr 0.000050 | time_per_step_avg 0.475970s |
(min, max) time across ranks (ms):
    interval-time ..................................: (458.56, 502.47)
    data-load ......................................: (1.93, 26.09)
    forward-compute ................................: (137.14, 162.45)
    backward-compute ...............................: (234.47, 258.09)
    optimizer ......................................: (0.29, 0.56)

Monitor the training using wandb. Below is an example of fast:

image

2.2.6 Merge Checkpoints into Final Weights (Optional)

If training used FSDP2, merge weights:

python scripts/merge_sharded_weights.py YOUR_CHECKPOINT_DIR OUTPUT_PATH

Place the following in one folder:

./
- model.safetensors
- processor/*
- config.json from WALL-OSS-FLOW/FAST

Now the model is loadable via from_pretrained function.

2.2.7 Open-Loop Evaluation

After training, we can preliminarily evaluate the model on the collected data. Modify scripts/draw_openloop_plot.py:

model_path = "/path/to/flow_or_fast_model"
action_tokenizer_path = None # for flow: None; for fast: "/path/to/fast"
save_dir = "/path/to/your/save/dir"
path = "/path/to/config_qact.yml"

then run:

python scripts/draw_openloop_plot.py --origin_action_dim YOUR_ACTION_DIM

You can get the results, for example for a flow model: image

Here, wall-x uses the first episode by default. You can also evaluate on unseen data by modifying both LeRobot and scripts/draw_openloop_plot.py as described.

# scripts/draw_openloop_plot.py
dataset = load_test_dataset(config, lerobot_config, seed=42, episode=500) # select an unseen episode
# src/lerobot/datasets/lerobot_dataset.py
# line 498
# add this line
self.episode_mapping = {ep: idx for idx, ep in enumerate(self.episodes)} if self.episodes is not None else None

# line 647-648
# modify to below
if self.episode_mapping is not None:
    ep_start = self.episode_data_index["from"][self.episode_mapping[ep_idx]]
    ep_end = self.episode_data_index["to"][self.episode_mapping[ep_idx]]
else: 
    ep_start = self.episode_data_index["from"][ep_idx]
    ep_end = self.episode_data_index["to"][ep_idx]

Results are obtained like: image

2.3 Model Deployment & Validation

2.3.1 Preparing the Policy Server and Robot Client

Setup:

  • Remote inference machine with a single RTX 4090, running a policy server script
  • Local machine (CPU-only) to control the robot and send observations, running a robot client script

This tutorial hacks wall-x inference into the Lerobot codebase for convenience. You should customize a policy server and a robot client script for your own robot. My full wallx_server.py script is post below.

# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Example:

Customized example:
python scripts/wallx_server.py \
     --host=0.0.0.0 \
     --port=10808 \
     --fps=10 \
     --inference_latency=0.1 \
     --obs_queue_timeout=1

"""

import logging
import pickle  # nosec
import threading
import time
from concurrent import futures
from dataclasses import asdict
from pprint import pformat
from queue import Empty, Queue
from lerobot.configs.types import FeatureType, PolicyFeature

import draccus
import grpc
import torch
from PIL import Image

from lerobot.scripts.server.configs import PolicyServerConfig
from lerobot.scripts.server.helpers import (
    FPSTracker,
    Observation,
    TimedAction,
    TimedObservation,
    get_logger,
    raw_observation_to_observation,
)
from lerobot.transport import (
    services_pb2,  # type: ignore
    services_pb2_grpc,  # type: ignore
)
from lerobot.transport.utils import receive_bytes_in_chunks
from qwen_vl_utils.vision_process import smart_resize
from wall_x.data.config import X2RDataProcessingConfig
from wall_x.data.utils import get_wallx_normal_text, process_grounding_points, replace_action_token, preprocesser_call, KEY_MAPPINGS, load_norm_stats

import yaml
import torch
from wall_x.model.qwen2_5_based.modeling_qwen2_5_vl_act import Qwen2_5_VLMoEForAction
from wall_x.data.load_lerobot_dataset import get_data_configs
from transformers import AutoProcessor

def load_config(config_path):
    """Load configuration from YAML file."""
    with open(config_path, "r") as f:
        config = yaml.load(f, Loader=yaml.FullLoader)

    config["data"]["model_type"] = config.get("model_type")

    return config

class PolicyServer(services_pb2_grpc.AsyncInferenceServicer):
    prefix = "policy_server"
    logger = get_logger(prefix)

    def __init__(self, config: PolicyServerConfig):
        self.config = config
        self.shutdown_event = threading.Event()

        # FPS measurement
        self.fps_tracker = FPSTracker(target_fps=config.fps)

        self.observation_queue = Queue(maxsize=1)

        self._predicted_timesteps_lock = threading.Lock()
        self._predicted_timesteps = set()

        self.last_processed_obs = None

        # Hack: We fix the attributes here only for wallx
        self.device = 'cuda'
        self.policy_type = 'wallx'
        self.model_path = "/path/to/model"
        self.action_tokenizer_path = None # or /path/to/fast
        self.config_path = "/path/to/config"
        self.task_instruction = "your task name"
        self.lerobot_features = None
        self.actions_per_chunk = 32
        self.policy = None
        self.norm_stats = None
        
        # WallX preprocessing configuration
        self.data_config = None
        self.cam_key_mapping = None
        self.repo_id = None
        self.frame_index = None  # Track frame index for observations
        self.processor = None
        self.max_length = 768
        self.use_fast_tokenizer = False
        self.train_action_tokenizer = None

    @property
    def running(self):
        return not self.shutdown_event.is_set()

    @property
    def policy_image_features(self):
        image_features = {
            "observation.images.top": PolicyFeature(type=FeatureType.VISUAL, shape=[3, 480, 640]),
            "observation.images.side": PolicyFeature(type=FeatureType.VISUAL, shape=[3, 480, 640]),
        }
        return image_features

    def _reset_server(self) -> None:
        """Flushes server state when new client connects."""
        # only running inference on the latest observation received by the server
        self.shutdown_event.set()
        self.observation_queue = Queue(maxsize=1)

        with self._predicted_timesteps_lock:
            self._predicted_timesteps = set()
        
        # Reset frame index for new episode
        self.frame_index = 0

    def Ready(self, request, context):  # noqa: N802
        client_id = context.peer()
        self.logger.info(f"Client {client_id} connected and ready")
        self._reset_server()
        self.shutdown_event.clear()

        return services_pb2.Empty()

    def SendPolicyInstructions(self, request, context):  # noqa: N802
        """Receive policy instructions from the robot client"""

        if not self.running:
            self.logger.warning("Server is not running. Ignoring policy instructions.")
            return services_pb2.Empty()

        client_id = context.peer()

        policy_specs = pickle.loads(request.data)  # nosec
        self.lerobot_features = policy_specs.lerobot_features

        self.logger.info(
            f"Receiving policy instructions from {client_id} | "
            f"Policy type: {self.policy_type} | "
            f"Actions per chunk: {self.actions_per_chunk} | "
            f"Device: {self.device}"
        )

        # self.lerobot_features = policy_specs.lerobot_features
        start = time.perf_counter()

        config = load_config(self.config_path)
        dataload_config = get_data_configs(config["data"])
        lerobot_config = dataload_config.get("lerobot_config", {})
        self.repo_id = lerobot_config.get("repo_id", None)
        assert self.repo_id is not None, "repo id is required"

        norm_stats_path = config.get("norm_stats_path", None)
        stats = load_norm_stats(norm_stats_path, self.repo_id)
        self.action_min_stat = stats["action"].min.to(self.device)
        self.action_delta = stats["action"].delta.to(self.device)
        self.state_min_stat = stats["state"].min.to(self.device)
        self.state_delta = stats["state"].delta.to(self.device)

        # load model from checkpoint
        self.policy = Qwen2_5_VLMoEForAction.from_pretrained(
            self.model_path, train_config=config, action_tokenizer_path=self.action_tokenizer_path
        )
        self.policy.eval()
        self.policy = self.policy.to(self.device)
        self.policy = self.policy.to(torch.bfloat16)

        self.predict_mode = "fast" if config.get("use_fast_tokenizer", False) else "diffusion"
        self.action_dim = 20 if self.predict_mode == "diffusion" else 6
        
        end = time.perf_counter()

        self.logger.info(f"Time taken to put policy on {self.device}: {end - start:.4f} seconds")
        
        # Initialize WallX preprocessing configuration
        self.cam_key_mapping = KEY_MAPPINGS.get(self.repo_id, {})["camera"]
        self.state_key_mapping = KEY_MAPPINGS.get(self.repo_id, {})["state"]
        self.action_key_mapping = KEY_MAPPINGS.get(self.repo_id, {})["action"]
        
        self.data_config = X2RDataProcessingConfig().update(
            train_test_split=dataload_config["train_test_split"],
            split_seed=dataload_config["split_seed"],
            predict_action_keys=dataload_config["predict_action_keys"],
            obs_action_keys=dataload_config["obs_action_keys"],
            resolution=dataload_config.get("resolution", None),
            priority_order=dataload_config.get("priority_order", None),
        )
        
        # Load processor
        self.processor = AutoProcessor.from_pretrained(self.model_path, use_fast=True)
        self.processor.tokenizer.padding_side = "left"
        self.max_length = dataload_config.get("max_length", 768)
        self.use_fast_tokenizer = config.get("use_fast_tokenizer", False)
        
        # Load action tokenizer if using fast tokenizer
        if self.use_fast_tokenizer:
            action_tokenizer_path = self.action_tokenizer_path
            if action_tokenizer_path:
                self.train_action_tokenizer = AutoProcessor.from_pretrained(
                    action_tokenizer_path, trust_remote_code=True
                )
        
        self.logger.info(f"WallX preprocessing initialized with repo_id: {self.repo_id}")
        self.logger.info(f"Resolution config: {self.data_config.resolution}")
        self.logger.info(f"Task instruction: {self.task_instruction}")
        self.logger.info(f"Max length: {self.max_length}")
        
        return services_pb2.Empty()

    def SendObservations(self, request_iterator, context):  # noqa: N802
        """Receive observations from the robot client"""
        client_id = context.peer()
        self.logger.debug(f"Receiving observations from {client_id}")

        receive_time = time.time()  # comparing timestamps so need time.time()
        start_deserialize = time.perf_counter()
        received_bytes = receive_bytes_in_chunks(
            request_iterator, None, self.shutdown_event, self.logger
        )  # blocking call while looping over request_iterator
        timed_observation = pickle.loads(received_bytes)  # nosec
        deserialize_time = time.perf_counter() - start_deserialize

        self.logger.debug(f"Received observation #{timed_observation.get_timestep()}")

        obs_timestep = timed_observation.get_timestep()
        self.frame_index = obs_timestep
        obs_timestamp = timed_observation.get_timestamp()

        # Calculate FPS metrics
        fps_metrics = self.fps_tracker.calculate_fps_metrics(obs_timestamp)

        self.logger.info(
            f"Received observation #{obs_timestep} | "
            f"Avg FPS: {fps_metrics['avg_fps']:.2f} | "  # fps at which observations are received from client
            f"Target: {fps_metrics['target_fps']:.2f} | "
            f"One-way latency: {(receive_time - obs_timestamp) * 1000:.2f}ms"
        )

        self.logger.debug(
            f"Server timestamp: {receive_time:.6f} | "
            f"Client timestamp: {obs_timestamp:.6f} | "
            f"Deserialization time: {deserialize_time:.6f}s"
        )

        if not self._enqueue_observation(
            timed_observation  # wrapping a RawObservation
        ):
            self.logger.info(f"Observation #{obs_timestep} has been filtered out")

        return services_pb2.Empty()

    def GetActions(self, request, context):  # noqa: N802
        """Returns actions to the robot client. Actions are sent as a single
        chunk, containing multiple actions."""
        client_id = context.peer()
        self.logger.debug(f"Client {client_id} connected for action streaming")

        # Generate action based on the most recent observation and its timestep
        try:
            getactions_starts = time.perf_counter()
            obs = self.observation_queue.get(timeout=self.config.obs_queue_timeout)
            self.logger.info(
                f"Running inference for observation #{obs.get_timestep()} (must_go: {obs.must_go})"
            )

            with self._predicted_timesteps_lock:
                self._predicted_timesteps.add(obs.get_timestep())

            start_time = time.perf_counter()
            action_chunk = self._predict_action_chunk(obs)
            inference_time = time.perf_counter() - start_time

            start_time = time.perf_counter()
            actions_bytes = pickle.dumps(action_chunk)  # nosec
            serialize_time = time.perf_counter() - start_time

            # Create and return the action chunk
            actions = services_pb2.Actions(data=actions_bytes)

            self.logger.info(
                f"Action chunk #{obs.get_timestep()} generated | "
                f"Total time: {(inference_time + serialize_time) * 1000:.2f}ms"
            )

            self.logger.debug(
                f"Action chunk #{obs.get_timestep()} generated | "
                f"Inference time: {inference_time:.2f}s |"
                f"Serialize time: {serialize_time:.2f}s |"
                f"Total time: {inference_time + serialize_time:.2f}s"
            )

            time.sleep(
                max(0, self.config.inference_latency - max(0, time.perf_counter() - getactions_starts))
            )  # sleep controls inference latency

            return actions

        except Empty:  # no observation added to queue in obs_queue_timeout
            return services_pb2.Empty()

        except Exception as e:
            import traceback
            self.logger.error(f"Error in StreamActions: {e}")
            self.logger.error(f"Traceback: {traceback.format_exc()}")

            return services_pb2.Empty()

    def _obs_sanity_checks(self, obs: TimedObservation, previous_obs: TimedObservation) -> bool:
        """Check if the observation is valid to be processed by the policy"""
        with self._predicted_timesteps_lock:
            predicted_timesteps = self._predicted_timesteps

        if obs.get_timestep() in predicted_timesteps:
            self.logger.debug(f"Skipping observation #{obs.get_timestep()} - Timestep predicted already!")
            return False

        # elif observations_similar(obs, previous_obs, lerobot_features=self.lerobot_features):
        #     self.logger.debug(
        #         f"Skipping observation #{obs.get_timestep()} - Observation too similar to last obs predicted!"
        #     )
        #     return False

        else:
            return True

    def _enqueue_observation(self, obs: TimedObservation) -> bool:
        """Enqueue an observation if it must go through processing, otherwise skip it.
        Observations not in queue are never run through the policy network"""

        if (
            obs.must_go
            or self.last_processed_obs is None
            or self._obs_sanity_checks(obs, self.last_processed_obs)
        ):
            last_obs = self.last_processed_obs.get_timestep() if self.last_processed_obs else "None"
            self.logger.debug(
                f"Enqueuing observation. Must go: {obs.must_go} | Last processed obs: {last_obs}"
            )

            # If queue is full, get the old observation to make room
            if self.observation_queue.full():
                # pops from queue
                _ = self.observation_queue.get_nowait()
                self.logger.debug("Observation queue was full, removed oldest observation")

            # Now put the new observation (never blocks as queue is non-full here)
            self.observation_queue.put(obs)
            return True

        return False

    def _time_action_chunk(self, t_0: float, action_chunk: list[torch.Tensor], i_0: int) -> list[TimedAction]:
        """Turn a chunk of actions into a list of TimedAction instances,
        with the first action corresponding to t_0 and the rest corresponding to
        t_0 + i*environment_dt for i in range(len(action_chunk))
        """
        return [
            TimedAction(timestamp=t_0 + i * self.config.environment_dt, timestep=i_0 + i, action=action)
            for i, action in enumerate(action_chunk)
        ]

    def _prepare_observation(self, observation_t: TimedObservation) -> Observation:
        """
        Prepare observation, ready for policy inference.
        E.g.: To keep observation sampling rate high (and network packet tiny) we send int8 [0,255] images from the
        client and then convert them to float32 [0,1] images here, before running inference.
        
        For WallX policy, this also applies full preprocessing similar to PreprocessedDataset:
        - Vision: Converts images to PIL format, applies resolution constraints, smart_resize
        - Text: Generates instruction text with camera views and action tokens
        - Action/State: Extracts agent position and prepares action format
        """
        # RawObservation from robot.get_observation() - wrong keys, wrong dtype, wrong image shape
        observation: Observation = raw_observation_to_observation(
            observation_t.get_observation(),
            self.lerobot_features,
            self.policy_image_features,
            self.device,
        )
        
        # Apply WallX-style preprocessing (vision + text + action)
        observation = self._apply_wallx_preprocessing(observation)
        
        # processed Observation - right keys, right dtype, right image shape, text, etc.
        return observation
    
    def _apply_wallx_preprocessing(self, observation: Observation) -> Observation:
        """
        Apply WallX-style preprocessing to observation.
        This matches the preprocessing done in PreprocessedDataset.__getitem__.
        
        Includes:
        - Vision preprocessing (PIL conversion, resolution constraints, smart_resize)
        - Text generation (instruction text with camera views and action tokens)
        - Agent position extraction
        
        Args:
            observation: Observation dict with image tensors
            
        Returns:
            Observation dict with preprocessed data ready for WallX inference
        """
        # 1. Vision preprocessing
        processed_images = []
        orig_height, orig_width = None, None
        resized_height, resized_width = None, None
        
        # Get image keys from the observation (filter for image observations)
        image_keys = [key for key in observation.keys() if 'image' in key.lower()]
        
        for key in image_keys:
            if key not in observation:
                continue
                
            # Get the image tensor (should be [C, H, W] in float32 [0,1] range)
            img_tensor = observation[key]
            
            # Convert from tensor to PIL image
            # Permute from [C, H, W] to [H, W, C] and convert to uint8 [0, 255]
            if img_tensor.ndim == 4:  # If batch dimension exists [B, C, H, W]
                img_tensor = img_tensor.squeeze(0)
            
            current_obs = img_tensor.clone().permute(1, 2, 0)
            img_pil = Image.fromarray((current_obs * 255).to(torch.uint8).cpu().numpy())
            orig_width, orig_height = img_pil.size
            
            # Apply resolution constraints (if config is not -1)
            # Map the observation key to the camera view name
            cam_view = self.cam_key_mapping.get(key, None)
            if cam_view:
                target_size = self.data_config.resolution.get(cam_view, -1)
                
                if target_size != -1:
                    # Maintain aspect ratio logic
                    if orig_width > orig_height:  # Landscape image
                        new_width = target_size
                        new_height = int(target_size * orig_height / orig_width)
                    else:  # Portrait image
                        new_height = target_size
                        new_width = int(target_size * orig_width / orig_height)
                    img_pil = img_pil.resize((new_width, new_height))
            
            # Apply smart scaling (qwen logic)
            current_width, current_height = img_pil.size
            resized_height, resized_width = smart_resize(
                current_height,
                current_width,
                factor=self.data_config.image_factor,
                min_pixels=self.data_config.min_pixels,
                max_pixels=self.data_config.max_pixels,
            )
            resized_img = img_pil.resize((resized_width, resized_height))
            processed_images.append(resized_img)
            
            # Update the observation with the PIL image
            observation[key] = resized_img
        
        # 2. Text preprocessing - generate instruction text
        if self.task_instruction is not None and orig_height is not None:
            instruction_info = {"instruction": self.task_instruction}
            action_chunk_size = self.actions_per_chunk  # 33 - 1
            
            # Generate the complete text with instruction and action tokens
            complete_text, generate_subtask = get_wallx_normal_text(
                instruction_info,
                action_chunk_size,
                self.frame_index,
                self.data_config.priority_order,
                self.cam_key_mapping,
                generate_subtask_ratio=self.data_config.generate_subtask_ratio,
            )
            
            # Process grounding points (adjust coordinates for resized images)
            text = process_grounding_points(
                complete_text, 
                orig_height, 
                orig_width, 
                resized_height, 
                resized_width, 
                self.data_config.model_type
            )
            
            observation["text"] = text
            observation["image_inputs"] = processed_images
        
        # 3. Extract agent position (observation.state)
        # Look for state keys in observation
        for key in observation.keys():
            if 'state' in key.lower():
                observation["agent_pos"] = observation[key]
                break
        
        # 4. Track frame index
        observation["frame_index"] = self.frame_index
        
        # 5. Apply DataCollator-style processing (normalization, tokenization, etc.)
        observation = self._apply_datacollator_processing(observation)
        
        return observation
    
    def _normalize(self, data: torch.Tensor, min_stat: torch.Tensor, delta: torch.Tensor) -> torch.Tensor:
        """Normalize data to [-1, 1] range"""
        return torch.clamp((data - min_stat) / delta * 2.0 - 1.0, -1, 1)

    
    def _apply_datacollator_processing(self, observation: Observation) -> Observation:
        """
        Apply DataCollator-style processing to observation.
        This matches the processing done in DataCollator.__call__.
        
        Includes:
        - Agent position normalization and padding to 20 dims
        - Action normalization and padding (create dummy action for inference)
        - Action token replacement in text
        - Tokenization and image processing via preprocesser_call
        - MOE token type generation
        
        Args:
            observation: Observation dict with preprocessed data
            
        Returns:
            Observation dict ready for model inference
        """
        additional_inputs = {}
        
        # 1. Process agent position (proprioception)
        if "agent_pos" in observation:
            agent_pos = observation["agent_pos"]
            
            # Ensure proper shape [1, dim] for single observation
            if agent_pos.dim() == 1:
                agent_pos = agent_pos.unsqueeze(0)
            if agent_pos.dim() == 2:
                agent_pos = agent_pos.unsqueeze(1)  # [batch, 1, dim]
            
            # Handle NaN values
            agent_pos_mask = (~torch.isnan(agent_pos)).float()
            agent_pos = torch.nan_to_num(agent_pos, nan=0.0)
            
            # Normalize
            agent_pos = self._normalize(agent_pos, self.state_min_stat, self.state_delta)
            
            # Pad to 20 dimensions if needed
            if agent_pos.shape[-1] != 20:
                padding_size = 20 - agent_pos.shape[-1]
                agent_pos = torch.cat(
                    [
                        agent_pos,
                        torch.zeros(agent_pos.shape[0], agent_pos.shape[1], padding_size, device=agent_pos.device)
                    ],
                    dim=-1
                )
                agent_pos_mask = torch.cat(
                    [
                        agent_pos_mask,
                        torch.zeros(agent_pos_mask.shape[0], agent_pos_mask.shape[1], padding_size, device=agent_pos_mask.device)
                    ],
                    dim=-1
                )
            
            additional_inputs["proprioception"] = agent_pos
            additional_inputs["agent_pos_mask"] = agent_pos_mask
            self.logger.debug(f"Agent position shape: {agent_pos.shape}")
            self.logger.debug(f"Agent position mask shape: {agent_pos_mask.shape}")
        
        # 2. Create dummy action chunk for inference (will be predicted by model)
        # Create dummy normalized action with proper shape [1, 1, action_dim]
        action_dim = 6
        action = torch.zeros(1, self.actions_per_chunk, action_dim, dtype=torch.float32, device=self.device)
        dof_mask = torch.ones(1, self.actions_per_chunk, action_dim, dtype=torch.float32, device=self.device)

        action = self._normalize(action, self.action_min_stat, self.action_delta)
        
        # Pad to 20 dimensions if needed
        if action.shape[-1] != 20:
            padding_size = 20 - action.shape[-1]
            action = torch.cat(
                [
                    action,
                    torch.zeros(action.shape[0], action.shape[1], padding_size, device=action.device)
                ],
                dim=-1
            )
            dof_mask = torch.cat(
                [
                    dof_mask,
                    torch.zeros(dof_mask.shape[0], dof_mask.shape[1], padding_size, device=dof_mask.device)
                ],
                dim=-1
            )
        
        additional_inputs["action_chunk"] = action
        additional_inputs["dof_mask"] = dof_mask
        self.logger.debug(f"Action chunk shape: {action.shape}")
        self.logger.debug(f"DOF mask shape: {dof_mask.shape}")
        
        # 3. Get image inputs and text
        if "image_inputs" in observation:
            additional_inputs["image_inputs"] = [observation["image_inputs"]]
        
        if "text" in observation:
            additional_inputs["text"] = [observation["text"]]
        
        if "frame_index" in observation:
            additional_inputs["frame_index"] = torch.tensor([observation["frame_index"]], dtype=torch.long)
        
        # 4. Replace action tokens in text with dummy actions
        additional_inputs["text"] = replace_action_token(
            additional_inputs["text"],
            additional_inputs["action_chunk"],
            self.train_action_tokenizer if self.use_fast_tokenizer else None,
            [self.repo_id] * len(additional_inputs["text"]),
            additional_inputs["dof_mask"],
        )
        
        # 5. Process images and text with processor
        inputs = preprocesser_call(
            processor=self.processor,
            text=additional_inputs.pop("text"),
            images=additional_inputs.pop("image_inputs"),
            videos=None,
            padding=True,
            truncation=True,
            return_tensors="pt",
            max_length=self.max_length,
        )
        
        # 6. Create MOE token types (identify action tokens)
        action_token_id = self.processor.tokenizer.convert_tokens_to_ids("<|action|>")
        additional_inputs["moe_token_types"] = inputs.input_ids == action_token_id
        
        # 7. Merge all inputs
        inputs.update(additional_inputs)
        
        # 8. Add dataset name
        inputs["dataset_names"] = [self.repo_id] * inputs[
            "action_chunk"
        ].shape[0]
        
        return inputs

    def _get_action_chunk(self, observation: dict[str, torch.Tensor]) -> torch.Tensor:
        """Get an action chunk from the policy. The chunk contains only"""
        observation = {k: v.to(self.device) if isinstance(v, torch.Tensor) else v 
               for k, v in observation.items()}
        with torch.no_grad():
            outputs = self.policy(
                    **observation,
                    action_dim=self.action_dim,
                    pred_horizon=self.actions_per_chunk,
                    mode="predict",
                    predict_mode=self.predict_mode,
                    re_generate=True,
            )
        chunk = outputs["predict_action"].detach().cpu()
        if chunk.ndim != 3:
            chunk = chunk.unsqueeze(0)  # adding batch dimension, now shape is (B, chunk_size, action_dim)

        chunk = chunk[:, : self.actions_per_chunk, :6]
        return chunk

    def _predict_action_chunk(self, observation_t: TimedObservation) -> list[TimedAction]:
        """Predict an action chunk based on an observation"""
        inference_starts = time.perf_counter()

        """1. Prepare observation"""
        start_time = time.perf_counter()
        observation = self._prepare_observation(observation_t)
        preprocessing_time = time.perf_counter() - start_time

        self.last_processed_obs: TimedObservation = observation_t
        self.logger.debug("Finishing observation preparation stage!!!!")

        """2. Get action chunk"""
        start_time = time.perf_counter()
        action_tensor = self._get_action_chunk(observation)
        inference_time = time.perf_counter() - start_time

        """3. Post-inference processing"""
        start_time = time.perf_counter()
        # Move to CPU before serializing
        action_tensor = action_tensor.cpu().squeeze(0)

        action_chunk = self._time_action_chunk(
            observation_t.get_timestamp(), list(action_tensor), observation_t.get_timestep()
        )
        postprocessing_time = time.perf_counter() - start_time
        inference_stops = time.perf_counter()

        self.logger.info(
            f"Observation {observation_t.get_timestep()} |"
            f"Inference time: {1000 * (inference_stops - inference_starts):.2f}ms"
        )

        # full-process latency breakdown for debugging purposes
        self.logger.debug(
            f"Observation {observation_t.get_timestep()} | "
            f"Preprocessing time: {1000 * (preprocessing_time - inference_starts):.2f}ms | "
            f"Inference time: {1000 * (inference_time - preprocessing_time):.2f}ms | "
            f"Postprocessing time: {1000 * (postprocessing_time - inference_time):.2f}ms | "
            f"Total time: {1000 * (postprocessing_time - inference_starts):.2f}ms"
        )

        return action_chunk

    def stop(self):
        """Stop the server"""
        self._reset_server()
        self.logger.info("Server stopping...")

@draccus.wrap()
def serve(cfg: PolicyServerConfig):
    """Start the PolicyServer with the given configuration.

    Args:
        config: PolicyServerConfig instance. If None, uses default configuration.
    """
    logging.info(pformat(asdict(cfg)))

    # Create the server instance first
    policy_server = PolicyServer(cfg)

    # Setup and start gRPC server
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=4))
    services_pb2_grpc.add_AsyncInferenceServicer_to_server(policy_server, server)
    server.add_insecure_port(f"{cfg.host}:{cfg.port}")

    policy_server.logger.info(f"PolicyServer started on {cfg.host}:{cfg.port}")
    server.start()

    server.wait_for_termination()

    policy_server.logger.info("Server terminated")

if __name__ == "__main__":
    serve() 

Important: Ensure observation preprocessing matches wall-x training preprocessing exactly. Robot client uses the existing Lerobot implementation:

src/lerobot/scripts/server/robot_client.py

2.3.2 Deploying on the Real Robot

Start the policy server:

python scripts/wallx_server.py \
     --host=0.0.0.0 \
     --port=YOUR_PORT \
     --fps=30 \
     --inference_latency=0.033 \
     --obs_queue_timeout=1

Start the robot client:

python src/lerobot/scripts/server/robot_client.py \
    --robot.type=so101_follower \
    --robot.port=YOUR_ROBOT_PORT \
    --robot.id=YOUR_ROBOT_ID \
    --robot.cameras=YOUR_CAMERAS \
    --task=TASK_NAME \
    --server_address=SERVER_IP:PORT \
    --policy_type=smolvla \
    --pretrained_name_or_path=None \
    --policy_device=cuda \
    --actions_per_chunk=32 \
    --chunk_size_threshold=0. \
    --aggregate_fn_name=weighted_average

Here, since I am hacking things directly, both policy_type and pretrained_name_or_path need to be set manually. chunk_size_threshold and aggregate_fn_name are parameters used by smolvla for asynchronous inference.

  • chunk_size_threshold specifies the action queue length (as a ratio of its maximum length) below which an inference request is sent.

  • aggregate_fn_name determines how to weight action chunks that are temporally repeated.

You can simply set chunk_size_threshold = 0 to disable asynchronous inference. After launching both, you should see the model controlling the robot:

  • Smooth successful case

  • Struggling but successful case

  • Failure case

3. Common Pitfalls & Practical Fixes πŸ₯²

3.1 Hardware Issues

When assembling the S101, joint assembly is critical. Repeated data collection and deployment can cause friction between 3D-printed parts, servo output shafts, and aluminum flanges. Remove support material thoroughly or sand the servo shaft. Remember to recalibrate afterward.

3.2 Data Collection

There is no universal rule for β€œgood data.” Quality depends heavily on:

  • Task diversity
  • Spatial variation
  • Object diversity
  • Robustness to distrubances

In practice, the best approach is iterative:

  1. Collect data
  2. Train model
  3. Identify failure modes
  4. Collect more targeted data

3.3 Model Deployment

Although the model is fixed after training, deployment choices affect performance:

  • Using half the predicted action chunk may yield smoother results
  • Proper control and inference frequency tuning improves stability
  • Environment mismatch between training and deployment may degrade performance
  • Debugging is complex because failures may originate from many aspects:
    • Data collection
    • Training
    • Deployment environment
    • Script bugs or parameters
    • ...

Try to design a systematic validation process. Hypothesize boldly and verify carefully. Just take your time. Good luck!

Community

Sign up or log in to comment