WALL-OSS Full Deployment Guide
This article provides a systematic introduction to the complete engineering deployment pipeline of WALL-OSS, covering the end-to-end loop from data collection to model training and inference. By following this guide, you will be able to independently run a full WALL-OSS system.
1. Introduction to WALL-OSS
WALL-OSS (World-Action-Language-Learning β Open Source System) is an open-source foundation model for embodied intelligence, proposed by the XSquare Robot team in 2025. Its goal is to bridge the gap between visionβlanguageβaction (VLA) and extend the semantic capabilities of large vision-language models (VLMs) into real-world physical interaction.
Technically, WALL-OSS introduces a tightly coupled multimodal architecture (tightly-coupled MoE structure) that integrates both discrete and continuous action modeling strategies. Through a two-stage training pipeline (Inspiration β Integration), the model gradually unifies semantic reasoning and high-frequency action generation. Its core innovations include:
- Embodied perceptionβenhanced multimodal pretraining: Large-scale training on unified visionβlanguageβaction data to strengthen spatial, causal, and manipulation understanding.
- Unified Cross-Level Chain-of-Thought (Uni-CoT): A single differentiable framework that unifies high-level instruction reasoning, sub-task decomposition, and fine-grained action synthesis, forming a continuous chain from βunderstandingβ to βexecution.β
- Mixture-of-Experts (MoE) action heads: Dynamically activating experts depending on the task phase and modeling actions in discrete or continuous space to maintain stable VLM priors.
- Two-stage training paradigm:
- Inspiration stage: Injecting discrete action priors to strengthen spatial understanding and semantic-action alignment.
- Integration stage: Using flow matching to achieve high-frequency continuous control. Overall, WALL-OSS is not merely a model but an engineering and scientific paradigm that connects semantic reasoning with physical action, providing a verifiable technical route toward embodied AGI.
2. WALL-OSS Deployment Pipeline
This article does not focus on the internal architecture or algorithmic details of WALL-OSS. Instead, it emphasizes the complete engineering deployment pipeline. We will walk through how to use the open-source Lerobot S101 platform to build an end-to-end workflow consisting of data collection, model finetuning, and inference deployment, including system dependencies, task configuration, model training procedures, and inference service setup. By following this step-by-step guide, readers will understand how WALL-OSS operates on real robots and can quickly reproduce or extend its embodied intelligence capabilities. This article includes the following three parts:
- Data collection using the open-source S101 platform (other platforms may refer to this workflow)
- Model finetuning
- Model deployment and validation
2.1 Data Collection Using the Open-Source S101 Platform
2.1.1 Setting Up and Using the S101 Platform
For setup and calibration of the S101 platform, refer to the official tutorial or the Seeed Studio documentation. Note: This tutorial uses Lerobot 0.3.4 and Lerobot-dataset v2.1. Different versions may behave differently. After assembling and calibrating the robot, test teleoperation using:
lerobot-teleoperate \
--robot.type=so101_follower \
--robot.port=YOUR_PORT \
--robot.cameras= YOUR_CAMERAS \
--robot.id=YOUR_ROBOT_ID \
--teleop.type=so101_leader \
--teleop.port=YOUR_PORT \
--teleop.id=YOUR_ROBOT_ID \
--display_data=true
Camera configurations can be customized. A typical setup uses one top camera and one side or wrist camera.
2.1.2 Data Collection
Once teleoperation works, start collecting data:
lerobot-record \
--robot.type=so101_follower \
--robot.port=YOUR_PORT \
--robot.id=YOUR_ROBOT_ID \
--robot.cameras=YOUR_CAMERAS \
--teleop.type=so101_leader \
--teleop.port=YOUR_PORT \
--teleop.id=YOUR_ROBOT_ID \
--dataset.num_episodes=NUM_EPISODES \
--dataset.single_task=YOUR_TASK \
--dataset.push_to_hub=false \
--dataset.episode_time_s=60 \
--dataset.reset_time_s=60 \
--display_data=true \
--dataset.repo_id=YOUR_REPO_ID
dataset.episode_time_s and dataset.reset_time_s should be set longer than the usual duration (2β3Γ) to avoid premature termination.
During data collection/reset, you may:
- Press β‘οΈ to skip early to the next reset/collection stage
- Press β¬ οΈ to go back if the current collection or reset is problematic
A simple diagram:
π Example Data Collection & Reset Flow
[Reset #1] βββΆ [Collect #1] βββΆ [Reset #2] βββΆ [Collect #2] βββΆ [Reset #3] βββΆ [Collect #3]
β² β² β²
β β β
β¬
οΈ Back β¬
οΈ Back β¬
οΈ Back
β β β
ββββββββββββββββ΄βββββββββββββββ΄βββββββββββββββΆ
β‘οΈ Forward (Next Stage)
Example:
- Currently at **Collect #2**
- If data is wrong β press **β¬
οΈ** to return to **Reset #2**
- If data is OK β press **β‘οΈ** to enter **Reset #3**
To collect large datasets, you can press ESC to stop and resume later with:
--resume=true
2.1.3 Data Visualization & Replay
Before finetuning, verify data correctness via:
- Visualization, including videos, state and action trajectories. You can examine the data distribution (e.g. spatial distribution of objects, action distribution)
python src/lerobot/scripts/visualize_dataset_html.py --repo-id YOUR_REPO_ID
- Trajectory replay, examining the physical repeatability (optional)
lerobot-replay \
--robot.type=so101_follower \
--robot.port=YOUR_PORT \
--robot.id=YOUR_ROBOT_ID \
--dataset.repo_id=YOUR_REPO_ID \
--dataset.episode=0
2.2 Model Finetuning
2.2.1 Environment Setup
Follow the official WALL-OSS repo for installation.
If flash-attn or wallx builds slowly, increase MAX_JOBS (e.g., 16, 32).
For easier customization, install wall-x in editable mode:
MAX_JOBS=16 pip install --no-build-isolation --verbose -e .
2.2.2 Preparing the Dataset
WALL-OSS supports LeRobot datasets natively, so S101-collected data can be used directly. Other datasets must be stored in LeRobot format or converted afterwards.
A typical LeRobotDataset v2.1 structure:
.
βββ data
β βββ chunk-000
β β βββ episode_000000.parquet
β β βββ ...
βββ meta
β βββ episodes.jsonl
β βββ info.json
β βββ stats.json
β βββ tasks.jsonl
βββ videos
βββ chunk-000
β βββ observation.images.laptop/episode_000000.mp4
β βββ ...
2.2.3 Download Pretrained Weights & Configure Training
Download FLOW/FAST pretrained models:
hf download x-square-robot/wall-oss-flow --local-dir /path/to/wall-oss-flow
hf download x-square-robot/wall-oss-fast --local-dir /path/to/wall-oss-fast
Clone FAST tokenizer:
git clone https://huggingface.co/physical-intelligence/fast /path/to/fast
Modify the training config (example for S101 single-arm finetuning):
# wall-x/workspace/lerobot_example/config_qact.yml
# Model and paths configuration, for flow
log_name: "lerobot_training_flow"
log_project: "wall-oss_finetuning"
model_type: wall-oss
pretrained_wallx_path: "/path/to/pretrained/wall-oss-flow" # Must set
save_path: "/path/to/workspace/flow_checkpoints" # Must set
use_fast_tokenizer: False # True: train FAST, False: train Flow
action_tokenizer_path: None # Must set if use_fast_tokenizer is true
# # Model and paths configuration
# log_name: "lerobot_training_fast"
# log_project: "wall-oss_finetuning"
# model_type: wall-oss
# pretrained_wallx_path: "/path/to/wall-oss-fast" # Must set
# save_path: "/path/to/workspace/fast_checkpoints" # Must set
# use_fast_tokenizer: True # True:fla train FAST, False: train Flow
# action_tokenizer_path: "/path/to/fast" # Must set if use_fast_tokenizer is true
# Torch Profile
profile: False
profile_save_path: /path/to/profile/
profile_wait_iters: 10
profile_warmup_iters: 5
profile_active_iters: 2
# Training hyperparameters
num_warmup_steps: 100
num_training_steps: 64000000
learning_rate: 0.00005
min_lr: 0.00005
num_epoch: 100
gradient_accumulation_steps: 32
batch_size_per_gpu: 8
padding_side: left
epoch_save_interval: 10
# Training optimization settings
FSDP2: True
torch_compile: False
# Robot configuration - Define degrees of freedom for each component
dof_config:
shoulder_pan.pos: 1
shoulder_lift.pos: 1
elbow_flex.pos: 1
wrist_flex.pos: 1
wrist_roll.pos: 1
gripper.pos: 1
# Agent proprioception configuration (typically matches DOF config)
agent_pos_config:
shoulder_pan.pos: 1
shoulder_lift.pos: 1
elbow_flex.pos: 1
wrist_flex.pos: 1
wrist_roll.pos: 1
gripper.pos: 1
# # Checkpoint resuming configuration
# resume:
# ckpt: "/path/to/resume_model/"
# load_ckpt_only: true
norm_stats_path: None
enable_customized_robot_config: true
customized_robot_config:
name: "YOUR_REPO_ID"
customized_dof_config:
"shoulder_pan.pos" : 1
"shoulder_lift.pos" : 1
"elbow_flex.pos" : 1
"wrist_flex.pos" : 1
"wrist_roll.pos" : 1
"gripper.pos" : 1
customized_agent_pos_config:
"shoulder_pan.pos" : 1
"shoulder_lift.pos" : 1
"elbow_flex.pos" : 1
"wrist_flex.pos" : 1
"wrist_roll.pos" : 1
"gripper.pos" : 1
# Data configuration
data:
use_lerobot: true
# LeRobot dataset configuration
lerobot_config:
repo_id: "YOUR_REPO_ID"
root: null
episodes: null
image_transforms: null
delta_timestamps: null
tolerance_s: 1e-4
revision: null
force_cache_sync: false
download_videos: true
video_backend: null
action_horizon: 32
train_test_split: 0.95
# Action keys for observation and prediction
obs_action_keys:
- shoulder_pan.pos
- shoulder_lift.pos
- elbow_flex.pos
- wrist_flex.pos
- wrist_roll.pos
- gripper.pos
predict_action_keys:
- shoulder_pan.pos
- shoulder_lift.pos
- elbow_flex.pos
- wrist_flex.pos
- wrist_roll.pos
- gripper.pos
# Image resolution configuration for different camera views
resolution:
face_view: 256
left_wrist_view: 256
right_wrist_view: 256
move1_view: 256
move2_view: 256
top_view: 256
wall_view: 256
multi_modal: 256
Be sure to:
- Adjust
batch_size_per_gpubased on GPU memory - Add
YOUR_REPO_IDtoACTION_DATASET_NAMESandKEY_MAPPINGSin WALL-X source code
# wall_x/data/config.py
ACTION_DATASET_NAMES = [
"x2_normal",
"agibotworld_alpha",
"droid",
"fractal",
"bridge_data_v2",
"DobbE",
"RH20T",
"UMI-biarm",
"austin_buds",
"austin_sailor",
"austin_sirius",
"bc_z",
"berkeley_autolab_ur5",
"berkeley_cable_routing",
"berkeley_fanuc_manipulation",
"dlr_edan_shared_control",
"fmb",
"furniture_bench",
"jaco_play",
"nyu_rot",
"stanford_hydra",
"stanford_kuka_multimodal",
"taco_play",
"utaustin_mutex",
"viola",
"physical-intelligence/libero",
"lerobot/aloha_mobile_cabinet",
########################################
# Add your repo id here
"YOUR_REPO_ID",
########################################
]
# wall_x/data/utils.py
KEY_MAPPINGS = {
########################################
# Add your customized key mapping here
"YOUR_REPO_ID": {
"camera": {
"observation.images.top": "face_view", # find a similar view
"observation.images.side": "wall_view", # find a similar view
},
"state": "observation.state",
"action": "action",
},
########################################
"lerobot/aloha_mobile_cabinet": {
"camera": {
"observation.images.cam_high": "face_view",
"observation.images.cam_left_wrist": "left_wrist_view",
"observation.images.cam_right_wrist": "right_wrist_view",
},
"state": "observation.state",
"action": "action",
},
"physical-intelligence/libero": {
"camera": {
"image": "face_view",
"wrist_image": "left_wrist_view",
},
"state": "state",
"action": "actions",
},
}
2.2.4 Compute Dataset Statistics
WALL-OSS normalizes actions using q1βq99 to avoid outliers (from the FAST paper). Therefore, we should compute the statistics of collected dataset before training.
Modify scripts/compute_norm_stats.py:
path = /path/to/workspace/lerobot_example/config_qact.yml
output_path = /path/to/save/norm_stats
Run:
python scripts/compute_norm_stats.py --batch_size 256 --num_workers 2
This process may take a long time. You can adjust batch_size and 'num_workers' for acceleration. Afterwards, copy the generated norm_stats.json into norm_stats_path in your config.
2.2.5 Training
Modify GPU settings in workspace/lerobot_example/run.sh
#!/bin/bash
export CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 # your GPU setting
NUM_GPUS=$(echo $CUDA_VISIBLE_DEVICES | tr ',' '\n' | wc -l)
# print current time
echo "[current time: $(date +'%Y-%m-%d %H:%M:%S')]"
code_dir="/path/to/wall-x"
config_path="/path/to/wall-x/workspace/lerobot_example"
# Use a fixed port instead of a random one
export PORT=$((21000 + $RANDOM % 30000))
MASTER_PORT=10239 # use 5 digits ports
export LAUNCHER="accelerate launch --num_processes=$NUM_GPUS --main_process_port=$PORT"
export SCRIPT="${code_dir}/train_qact.py"
export SCRIPT_ARGS="--config ${config_path}/config_qact.yml --seed $MASTER_PORT"
echo "Running command: $LAUNCHER $SCRIPT $SCRIPT_ARGS"
$LAUNCHER $SCRIPT $SCRIPT_ARGS
then run training with:
bash ./workspace/lerobot_example/run.sh
You should see logs similar to:
[2025-11-11 18:51:07] epoch 9/ 10 | iter 2894/ 2951 | loss 1.522499 | lr 0.000050 | time_per_step_avg 0.475970s |
(min, max) time across ranks (ms):
interval-time ..................................: (458.56, 502.47)
data-load ......................................: (1.93, 26.09)
forward-compute ................................: (137.14, 162.45)
backward-compute ...............................: (234.47, 258.09)
optimizer ......................................: (0.29, 0.56)
Monitor the training using wandb. Below is an example of fast:
2.2.6 Merge Checkpoints into Final Weights (Optional)
If training used FSDP2, merge weights:
python scripts/merge_sharded_weights.py YOUR_CHECKPOINT_DIR OUTPUT_PATH
Place the following in one folder:
./
- model.safetensors
- processor/*
- config.json from WALL-OSS-FLOW/FAST
Now the model is loadable via from_pretrained function.
2.2.7 Open-Loop Evaluation
After training, we can preliminarily evaluate the model on the collected data. Modify scripts/draw_openloop_plot.py:
model_path = "/path/to/flow_or_fast_model"
action_tokenizer_path = None # for flow: None; for fast: "/path/to/fast"
save_dir = "/path/to/your/save/dir"
path = "/path/to/config_qact.yml"
then run:
python scripts/draw_openloop_plot.py --origin_action_dim YOUR_ACTION_DIM
You can get the results, for example for a flow model:

Here, wall-x uses the first episode by default. You can also evaluate on unseen data by modifying both LeRobot and scripts/draw_openloop_plot.py as described.
# scripts/draw_openloop_plot.py
dataset = load_test_dataset(config, lerobot_config, seed=42, episode=500) # select an unseen episode
# src/lerobot/datasets/lerobot_dataset.py
# line 498
# add this line
self.episode_mapping = {ep: idx for idx, ep in enumerate(self.episodes)} if self.episodes is not None else None
# line 647-648
# modify to below
if self.episode_mapping is not None:
ep_start = self.episode_data_index["from"][self.episode_mapping[ep_idx]]
ep_end = self.episode_data_index["to"][self.episode_mapping[ep_idx]]
else:
ep_start = self.episode_data_index["from"][ep_idx]
ep_end = self.episode_data_index["to"][ep_idx]
2.3 Model Deployment & Validation
2.3.1 Preparing the Policy Server and Robot Client
Setup:
- Remote inference machine with a single RTX 4090, running a policy server script
- Local machine (CPU-only) to control the robot and send observations, running a robot client script
This tutorial hacks wall-x inference into the Lerobot codebase for convenience. You should customize a policy server and a robot client script for your own robot.
My full wallx_server.py script is post below.
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Example:
Customized example:
python scripts/wallx_server.py \
--host=0.0.0.0 \
--port=10808 \
--fps=10 \
--inference_latency=0.1 \
--obs_queue_timeout=1
"""
import logging
import pickle # nosec
import threading
import time
from concurrent import futures
from dataclasses import asdict
from pprint import pformat
from queue import Empty, Queue
from lerobot.configs.types import FeatureType, PolicyFeature
import draccus
import grpc
import torch
from PIL import Image
from lerobot.scripts.server.configs import PolicyServerConfig
from lerobot.scripts.server.helpers import (
FPSTracker,
Observation,
TimedAction,
TimedObservation,
get_logger,
raw_observation_to_observation,
)
from lerobot.transport import (
services_pb2, # type: ignore
services_pb2_grpc, # type: ignore
)
from lerobot.transport.utils import receive_bytes_in_chunks
from qwen_vl_utils.vision_process import smart_resize
from wall_x.data.config import X2RDataProcessingConfig
from wall_x.data.utils import get_wallx_normal_text, process_grounding_points, replace_action_token, preprocesser_call, KEY_MAPPINGS, load_norm_stats
import yaml
import torch
from wall_x.model.qwen2_5_based.modeling_qwen2_5_vl_act import Qwen2_5_VLMoEForAction
from wall_x.data.load_lerobot_dataset import get_data_configs
from transformers import AutoProcessor
def load_config(config_path):
"""Load configuration from YAML file."""
with open(config_path, "r") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
config["data"]["model_type"] = config.get("model_type")
return config
class PolicyServer(services_pb2_grpc.AsyncInferenceServicer):
prefix = "policy_server"
logger = get_logger(prefix)
def __init__(self, config: PolicyServerConfig):
self.config = config
self.shutdown_event = threading.Event()
# FPS measurement
self.fps_tracker = FPSTracker(target_fps=config.fps)
self.observation_queue = Queue(maxsize=1)
self._predicted_timesteps_lock = threading.Lock()
self._predicted_timesteps = set()
self.last_processed_obs = None
# Hack: We fix the attributes here only for wallx
self.device = 'cuda'
self.policy_type = 'wallx'
self.model_path = "/path/to/model"
self.action_tokenizer_path = None # or /path/to/fast
self.config_path = "/path/to/config"
self.task_instruction = "your task name"
self.lerobot_features = None
self.actions_per_chunk = 32
self.policy = None
self.norm_stats = None
# WallX preprocessing configuration
self.data_config = None
self.cam_key_mapping = None
self.repo_id = None
self.frame_index = None # Track frame index for observations
self.processor = None
self.max_length = 768
self.use_fast_tokenizer = False
self.train_action_tokenizer = None
@property
def running(self):
return not self.shutdown_event.is_set()
@property
def policy_image_features(self):
image_features = {
"observation.images.top": PolicyFeature(type=FeatureType.VISUAL, shape=[3, 480, 640]),
"observation.images.side": PolicyFeature(type=FeatureType.VISUAL, shape=[3, 480, 640]),
}
return image_features
def _reset_server(self) -> None:
"""Flushes server state when new client connects."""
# only running inference on the latest observation received by the server
self.shutdown_event.set()
self.observation_queue = Queue(maxsize=1)
with self._predicted_timesteps_lock:
self._predicted_timesteps = set()
# Reset frame index for new episode
self.frame_index = 0
def Ready(self, request, context): # noqa: N802
client_id = context.peer()
self.logger.info(f"Client {client_id} connected and ready")
self._reset_server()
self.shutdown_event.clear()
return services_pb2.Empty()
def SendPolicyInstructions(self, request, context): # noqa: N802
"""Receive policy instructions from the robot client"""
if not self.running:
self.logger.warning("Server is not running. Ignoring policy instructions.")
return services_pb2.Empty()
client_id = context.peer()
policy_specs = pickle.loads(request.data) # nosec
self.lerobot_features = policy_specs.lerobot_features
self.logger.info(
f"Receiving policy instructions from {client_id} | "
f"Policy type: {self.policy_type} | "
f"Actions per chunk: {self.actions_per_chunk} | "
f"Device: {self.device}"
)
# self.lerobot_features = policy_specs.lerobot_features
start = time.perf_counter()
config = load_config(self.config_path)
dataload_config = get_data_configs(config["data"])
lerobot_config = dataload_config.get("lerobot_config", {})
self.repo_id = lerobot_config.get("repo_id", None)
assert self.repo_id is not None, "repo id is required"
norm_stats_path = config.get("norm_stats_path", None)
stats = load_norm_stats(norm_stats_path, self.repo_id)
self.action_min_stat = stats["action"].min.to(self.device)
self.action_delta = stats["action"].delta.to(self.device)
self.state_min_stat = stats["state"].min.to(self.device)
self.state_delta = stats["state"].delta.to(self.device)
# load model from checkpoint
self.policy = Qwen2_5_VLMoEForAction.from_pretrained(
self.model_path, train_config=config, action_tokenizer_path=self.action_tokenizer_path
)
self.policy.eval()
self.policy = self.policy.to(self.device)
self.policy = self.policy.to(torch.bfloat16)
self.predict_mode = "fast" if config.get("use_fast_tokenizer", False) else "diffusion"
self.action_dim = 20 if self.predict_mode == "diffusion" else 6
end = time.perf_counter()
self.logger.info(f"Time taken to put policy on {self.device}: {end - start:.4f} seconds")
# Initialize WallX preprocessing configuration
self.cam_key_mapping = KEY_MAPPINGS.get(self.repo_id, {})["camera"]
self.state_key_mapping = KEY_MAPPINGS.get(self.repo_id, {})["state"]
self.action_key_mapping = KEY_MAPPINGS.get(self.repo_id, {})["action"]
self.data_config = X2RDataProcessingConfig().update(
train_test_split=dataload_config["train_test_split"],
split_seed=dataload_config["split_seed"],
predict_action_keys=dataload_config["predict_action_keys"],
obs_action_keys=dataload_config["obs_action_keys"],
resolution=dataload_config.get("resolution", None),
priority_order=dataload_config.get("priority_order", None),
)
# Load processor
self.processor = AutoProcessor.from_pretrained(self.model_path, use_fast=True)
self.processor.tokenizer.padding_side = "left"
self.max_length = dataload_config.get("max_length", 768)
self.use_fast_tokenizer = config.get("use_fast_tokenizer", False)
# Load action tokenizer if using fast tokenizer
if self.use_fast_tokenizer:
action_tokenizer_path = self.action_tokenizer_path
if action_tokenizer_path:
self.train_action_tokenizer = AutoProcessor.from_pretrained(
action_tokenizer_path, trust_remote_code=True
)
self.logger.info(f"WallX preprocessing initialized with repo_id: {self.repo_id}")
self.logger.info(f"Resolution config: {self.data_config.resolution}")
self.logger.info(f"Task instruction: {self.task_instruction}")
self.logger.info(f"Max length: {self.max_length}")
return services_pb2.Empty()
def SendObservations(self, request_iterator, context): # noqa: N802
"""Receive observations from the robot client"""
client_id = context.peer()
self.logger.debug(f"Receiving observations from {client_id}")
receive_time = time.time() # comparing timestamps so need time.time()
start_deserialize = time.perf_counter()
received_bytes = receive_bytes_in_chunks(
request_iterator, None, self.shutdown_event, self.logger
) # blocking call while looping over request_iterator
timed_observation = pickle.loads(received_bytes) # nosec
deserialize_time = time.perf_counter() - start_deserialize
self.logger.debug(f"Received observation #{timed_observation.get_timestep()}")
obs_timestep = timed_observation.get_timestep()
self.frame_index = obs_timestep
obs_timestamp = timed_observation.get_timestamp()
# Calculate FPS metrics
fps_metrics = self.fps_tracker.calculate_fps_metrics(obs_timestamp)
self.logger.info(
f"Received observation #{obs_timestep} | "
f"Avg FPS: {fps_metrics['avg_fps']:.2f} | " # fps at which observations are received from client
f"Target: {fps_metrics['target_fps']:.2f} | "
f"One-way latency: {(receive_time - obs_timestamp) * 1000:.2f}ms"
)
self.logger.debug(
f"Server timestamp: {receive_time:.6f} | "
f"Client timestamp: {obs_timestamp:.6f} | "
f"Deserialization time: {deserialize_time:.6f}s"
)
if not self._enqueue_observation(
timed_observation # wrapping a RawObservation
):
self.logger.info(f"Observation #{obs_timestep} has been filtered out")
return services_pb2.Empty()
def GetActions(self, request, context): # noqa: N802
"""Returns actions to the robot client. Actions are sent as a single
chunk, containing multiple actions."""
client_id = context.peer()
self.logger.debug(f"Client {client_id} connected for action streaming")
# Generate action based on the most recent observation and its timestep
try:
getactions_starts = time.perf_counter()
obs = self.observation_queue.get(timeout=self.config.obs_queue_timeout)
self.logger.info(
f"Running inference for observation #{obs.get_timestep()} (must_go: {obs.must_go})"
)
with self._predicted_timesteps_lock:
self._predicted_timesteps.add(obs.get_timestep())
start_time = time.perf_counter()
action_chunk = self._predict_action_chunk(obs)
inference_time = time.perf_counter() - start_time
start_time = time.perf_counter()
actions_bytes = pickle.dumps(action_chunk) # nosec
serialize_time = time.perf_counter() - start_time
# Create and return the action chunk
actions = services_pb2.Actions(data=actions_bytes)
self.logger.info(
f"Action chunk #{obs.get_timestep()} generated | "
f"Total time: {(inference_time + serialize_time) * 1000:.2f}ms"
)
self.logger.debug(
f"Action chunk #{obs.get_timestep()} generated | "
f"Inference time: {inference_time:.2f}s |"
f"Serialize time: {serialize_time:.2f}s |"
f"Total time: {inference_time + serialize_time:.2f}s"
)
time.sleep(
max(0, self.config.inference_latency - max(0, time.perf_counter() - getactions_starts))
) # sleep controls inference latency
return actions
except Empty: # no observation added to queue in obs_queue_timeout
return services_pb2.Empty()
except Exception as e:
import traceback
self.logger.error(f"Error in StreamActions: {e}")
self.logger.error(f"Traceback: {traceback.format_exc()}")
return services_pb2.Empty()
def _obs_sanity_checks(self, obs: TimedObservation, previous_obs: TimedObservation) -> bool:
"""Check if the observation is valid to be processed by the policy"""
with self._predicted_timesteps_lock:
predicted_timesteps = self._predicted_timesteps
if obs.get_timestep() in predicted_timesteps:
self.logger.debug(f"Skipping observation #{obs.get_timestep()} - Timestep predicted already!")
return False
# elif observations_similar(obs, previous_obs, lerobot_features=self.lerobot_features):
# self.logger.debug(
# f"Skipping observation #{obs.get_timestep()} - Observation too similar to last obs predicted!"
# )
# return False
else:
return True
def _enqueue_observation(self, obs: TimedObservation) -> bool:
"""Enqueue an observation if it must go through processing, otherwise skip it.
Observations not in queue are never run through the policy network"""
if (
obs.must_go
or self.last_processed_obs is None
or self._obs_sanity_checks(obs, self.last_processed_obs)
):
last_obs = self.last_processed_obs.get_timestep() if self.last_processed_obs else "None"
self.logger.debug(
f"Enqueuing observation. Must go: {obs.must_go} | Last processed obs: {last_obs}"
)
# If queue is full, get the old observation to make room
if self.observation_queue.full():
# pops from queue
_ = self.observation_queue.get_nowait()
self.logger.debug("Observation queue was full, removed oldest observation")
# Now put the new observation (never blocks as queue is non-full here)
self.observation_queue.put(obs)
return True
return False
def _time_action_chunk(self, t_0: float, action_chunk: list[torch.Tensor], i_0: int) -> list[TimedAction]:
"""Turn a chunk of actions into a list of TimedAction instances,
with the first action corresponding to t_0 and the rest corresponding to
t_0 + i*environment_dt for i in range(len(action_chunk))
"""
return [
TimedAction(timestamp=t_0 + i * self.config.environment_dt, timestep=i_0 + i, action=action)
for i, action in enumerate(action_chunk)
]
def _prepare_observation(self, observation_t: TimedObservation) -> Observation:
"""
Prepare observation, ready for policy inference.
E.g.: To keep observation sampling rate high (and network packet tiny) we send int8 [0,255] images from the
client and then convert them to float32 [0,1] images here, before running inference.
For WallX policy, this also applies full preprocessing similar to PreprocessedDataset:
- Vision: Converts images to PIL format, applies resolution constraints, smart_resize
- Text: Generates instruction text with camera views and action tokens
- Action/State: Extracts agent position and prepares action format
"""
# RawObservation from robot.get_observation() - wrong keys, wrong dtype, wrong image shape
observation: Observation = raw_observation_to_observation(
observation_t.get_observation(),
self.lerobot_features,
self.policy_image_features,
self.device,
)
# Apply WallX-style preprocessing (vision + text + action)
observation = self._apply_wallx_preprocessing(observation)
# processed Observation - right keys, right dtype, right image shape, text, etc.
return observation
def _apply_wallx_preprocessing(self, observation: Observation) -> Observation:
"""
Apply WallX-style preprocessing to observation.
This matches the preprocessing done in PreprocessedDataset.__getitem__.
Includes:
- Vision preprocessing (PIL conversion, resolution constraints, smart_resize)
- Text generation (instruction text with camera views and action tokens)
- Agent position extraction
Args:
observation: Observation dict with image tensors
Returns:
Observation dict with preprocessed data ready for WallX inference
"""
# 1. Vision preprocessing
processed_images = []
orig_height, orig_width = None, None
resized_height, resized_width = None, None
# Get image keys from the observation (filter for image observations)
image_keys = [key for key in observation.keys() if 'image' in key.lower()]
for key in image_keys:
if key not in observation:
continue
# Get the image tensor (should be [C, H, W] in float32 [0,1] range)
img_tensor = observation[key]
# Convert from tensor to PIL image
# Permute from [C, H, W] to [H, W, C] and convert to uint8 [0, 255]
if img_tensor.ndim == 4: # If batch dimension exists [B, C, H, W]
img_tensor = img_tensor.squeeze(0)
current_obs = img_tensor.clone().permute(1, 2, 0)
img_pil = Image.fromarray((current_obs * 255).to(torch.uint8).cpu().numpy())
orig_width, orig_height = img_pil.size
# Apply resolution constraints (if config is not -1)
# Map the observation key to the camera view name
cam_view = self.cam_key_mapping.get(key, None)
if cam_view:
target_size = self.data_config.resolution.get(cam_view, -1)
if target_size != -1:
# Maintain aspect ratio logic
if orig_width > orig_height: # Landscape image
new_width = target_size
new_height = int(target_size * orig_height / orig_width)
else: # Portrait image
new_height = target_size
new_width = int(target_size * orig_width / orig_height)
img_pil = img_pil.resize((new_width, new_height))
# Apply smart scaling (qwen logic)
current_width, current_height = img_pil.size
resized_height, resized_width = smart_resize(
current_height,
current_width,
factor=self.data_config.image_factor,
min_pixels=self.data_config.min_pixels,
max_pixels=self.data_config.max_pixels,
)
resized_img = img_pil.resize((resized_width, resized_height))
processed_images.append(resized_img)
# Update the observation with the PIL image
observation[key] = resized_img
# 2. Text preprocessing - generate instruction text
if self.task_instruction is not None and orig_height is not None:
instruction_info = {"instruction": self.task_instruction}
action_chunk_size = self.actions_per_chunk # 33 - 1
# Generate the complete text with instruction and action tokens
complete_text, generate_subtask = get_wallx_normal_text(
instruction_info,
action_chunk_size,
self.frame_index,
self.data_config.priority_order,
self.cam_key_mapping,
generate_subtask_ratio=self.data_config.generate_subtask_ratio,
)
# Process grounding points (adjust coordinates for resized images)
text = process_grounding_points(
complete_text,
orig_height,
orig_width,
resized_height,
resized_width,
self.data_config.model_type
)
observation["text"] = text
observation["image_inputs"] = processed_images
# 3. Extract agent position (observation.state)
# Look for state keys in observation
for key in observation.keys():
if 'state' in key.lower():
observation["agent_pos"] = observation[key]
break
# 4. Track frame index
observation["frame_index"] = self.frame_index
# 5. Apply DataCollator-style processing (normalization, tokenization, etc.)
observation = self._apply_datacollator_processing(observation)
return observation
def _normalize(self, data: torch.Tensor, min_stat: torch.Tensor, delta: torch.Tensor) -> torch.Tensor:
"""Normalize data to [-1, 1] range"""
return torch.clamp((data - min_stat) / delta * 2.0 - 1.0, -1, 1)
def _apply_datacollator_processing(self, observation: Observation) -> Observation:
"""
Apply DataCollator-style processing to observation.
This matches the processing done in DataCollator.__call__.
Includes:
- Agent position normalization and padding to 20 dims
- Action normalization and padding (create dummy action for inference)
- Action token replacement in text
- Tokenization and image processing via preprocesser_call
- MOE token type generation
Args:
observation: Observation dict with preprocessed data
Returns:
Observation dict ready for model inference
"""
additional_inputs = {}
# 1. Process agent position (proprioception)
if "agent_pos" in observation:
agent_pos = observation["agent_pos"]
# Ensure proper shape [1, dim] for single observation
if agent_pos.dim() == 1:
agent_pos = agent_pos.unsqueeze(0)
if agent_pos.dim() == 2:
agent_pos = agent_pos.unsqueeze(1) # [batch, 1, dim]
# Handle NaN values
agent_pos_mask = (~torch.isnan(agent_pos)).float()
agent_pos = torch.nan_to_num(agent_pos, nan=0.0)
# Normalize
agent_pos = self._normalize(agent_pos, self.state_min_stat, self.state_delta)
# Pad to 20 dimensions if needed
if agent_pos.shape[-1] != 20:
padding_size = 20 - agent_pos.shape[-1]
agent_pos = torch.cat(
[
agent_pos,
torch.zeros(agent_pos.shape[0], agent_pos.shape[1], padding_size, device=agent_pos.device)
],
dim=-1
)
agent_pos_mask = torch.cat(
[
agent_pos_mask,
torch.zeros(agent_pos_mask.shape[0], agent_pos_mask.shape[1], padding_size, device=agent_pos_mask.device)
],
dim=-1
)
additional_inputs["proprioception"] = agent_pos
additional_inputs["agent_pos_mask"] = agent_pos_mask
self.logger.debug(f"Agent position shape: {agent_pos.shape}")
self.logger.debug(f"Agent position mask shape: {agent_pos_mask.shape}")
# 2. Create dummy action chunk for inference (will be predicted by model)
# Create dummy normalized action with proper shape [1, 1, action_dim]
action_dim = 6
action = torch.zeros(1, self.actions_per_chunk, action_dim, dtype=torch.float32, device=self.device)
dof_mask = torch.ones(1, self.actions_per_chunk, action_dim, dtype=torch.float32, device=self.device)
action = self._normalize(action, self.action_min_stat, self.action_delta)
# Pad to 20 dimensions if needed
if action.shape[-1] != 20:
padding_size = 20 - action.shape[-1]
action = torch.cat(
[
action,
torch.zeros(action.shape[0], action.shape[1], padding_size, device=action.device)
],
dim=-1
)
dof_mask = torch.cat(
[
dof_mask,
torch.zeros(dof_mask.shape[0], dof_mask.shape[1], padding_size, device=dof_mask.device)
],
dim=-1
)
additional_inputs["action_chunk"] = action
additional_inputs["dof_mask"] = dof_mask
self.logger.debug(f"Action chunk shape: {action.shape}")
self.logger.debug(f"DOF mask shape: {dof_mask.shape}")
# 3. Get image inputs and text
if "image_inputs" in observation:
additional_inputs["image_inputs"] = [observation["image_inputs"]]
if "text" in observation:
additional_inputs["text"] = [observation["text"]]
if "frame_index" in observation:
additional_inputs["frame_index"] = torch.tensor([observation["frame_index"]], dtype=torch.long)
# 4. Replace action tokens in text with dummy actions
additional_inputs["text"] = replace_action_token(
additional_inputs["text"],
additional_inputs["action_chunk"],
self.train_action_tokenizer if self.use_fast_tokenizer else None,
[self.repo_id] * len(additional_inputs["text"]),
additional_inputs["dof_mask"],
)
# 5. Process images and text with processor
inputs = preprocesser_call(
processor=self.processor,
text=additional_inputs.pop("text"),
images=additional_inputs.pop("image_inputs"),
videos=None,
padding=True,
truncation=True,
return_tensors="pt",
max_length=self.max_length,
)
# 6. Create MOE token types (identify action tokens)
action_token_id = self.processor.tokenizer.convert_tokens_to_ids("<|action|>")
additional_inputs["moe_token_types"] = inputs.input_ids == action_token_id
# 7. Merge all inputs
inputs.update(additional_inputs)
# 8. Add dataset name
inputs["dataset_names"] = [self.repo_id] * inputs[
"action_chunk"
].shape[0]
return inputs
def _get_action_chunk(self, observation: dict[str, torch.Tensor]) -> torch.Tensor:
"""Get an action chunk from the policy. The chunk contains only"""
observation = {k: v.to(self.device) if isinstance(v, torch.Tensor) else v
for k, v in observation.items()}
with torch.no_grad():
outputs = self.policy(
**observation,
action_dim=self.action_dim,
pred_horizon=self.actions_per_chunk,
mode="predict",
predict_mode=self.predict_mode,
re_generate=True,
)
chunk = outputs["predict_action"].detach().cpu()
if chunk.ndim != 3:
chunk = chunk.unsqueeze(0) # adding batch dimension, now shape is (B, chunk_size, action_dim)
chunk = chunk[:, : self.actions_per_chunk, :6]
return chunk
def _predict_action_chunk(self, observation_t: TimedObservation) -> list[TimedAction]:
"""Predict an action chunk based on an observation"""
inference_starts = time.perf_counter()
"""1. Prepare observation"""
start_time = time.perf_counter()
observation = self._prepare_observation(observation_t)
preprocessing_time = time.perf_counter() - start_time
self.last_processed_obs: TimedObservation = observation_t
self.logger.debug("Finishing observation preparation stage!!!!")
"""2. Get action chunk"""
start_time = time.perf_counter()
action_tensor = self._get_action_chunk(observation)
inference_time = time.perf_counter() - start_time
"""3. Post-inference processing"""
start_time = time.perf_counter()
# Move to CPU before serializing
action_tensor = action_tensor.cpu().squeeze(0)
action_chunk = self._time_action_chunk(
observation_t.get_timestamp(), list(action_tensor), observation_t.get_timestep()
)
postprocessing_time = time.perf_counter() - start_time
inference_stops = time.perf_counter()
self.logger.info(
f"Observation {observation_t.get_timestep()} |"
f"Inference time: {1000 * (inference_stops - inference_starts):.2f}ms"
)
# full-process latency breakdown for debugging purposes
self.logger.debug(
f"Observation {observation_t.get_timestep()} | "
f"Preprocessing time: {1000 * (preprocessing_time - inference_starts):.2f}ms | "
f"Inference time: {1000 * (inference_time - preprocessing_time):.2f}ms | "
f"Postprocessing time: {1000 * (postprocessing_time - inference_time):.2f}ms | "
f"Total time: {1000 * (postprocessing_time - inference_starts):.2f}ms"
)
return action_chunk
def stop(self):
"""Stop the server"""
self._reset_server()
self.logger.info("Server stopping...")
@draccus.wrap()
def serve(cfg: PolicyServerConfig):
"""Start the PolicyServer with the given configuration.
Args:
config: PolicyServerConfig instance. If None, uses default configuration.
"""
logging.info(pformat(asdict(cfg)))
# Create the server instance first
policy_server = PolicyServer(cfg)
# Setup and start gRPC server
server = grpc.server(futures.ThreadPoolExecutor(max_workers=4))
services_pb2_grpc.add_AsyncInferenceServicer_to_server(policy_server, server)
server.add_insecure_port(f"{cfg.host}:{cfg.port}")
policy_server.logger.info(f"PolicyServer started on {cfg.host}:{cfg.port}")
server.start()
server.wait_for_termination()
policy_server.logger.info("Server terminated")
if __name__ == "__main__":
serve()
Important: Ensure observation preprocessing matches wall-x training preprocessing exactly. Robot client uses the existing Lerobot implementation:
src/lerobot/scripts/server/robot_client.py
2.3.2 Deploying on the Real Robot
Start the policy server:
python scripts/wallx_server.py \
--host=0.0.0.0 \
--port=YOUR_PORT \
--fps=30 \
--inference_latency=0.033 \
--obs_queue_timeout=1
Start the robot client:
python src/lerobot/scripts/server/robot_client.py \
--robot.type=so101_follower \
--robot.port=YOUR_ROBOT_PORT \
--robot.id=YOUR_ROBOT_ID \
--robot.cameras=YOUR_CAMERAS \
--task=TASK_NAME \
--server_address=SERVER_IP:PORT \
--policy_type=smolvla \
--pretrained_name_or_path=None \
--policy_device=cuda \
--actions_per_chunk=32 \
--chunk_size_threshold=0. \
--aggregate_fn_name=weighted_average
Here, since I am hacking things directly, both policy_type and pretrained_name_or_path need to be set manually.
chunk_size_threshold and aggregate_fn_name are parameters used by smolvla for asynchronous inference.
chunk_size_thresholdspecifies the action queue length (as a ratio of its maximum length) below which an inference request is sent.aggregate_fn_namedetermines how to weight action chunks that are temporally repeated.
You can simply set chunk_size_threshold = 0 to disable asynchronous inference.
After launching both, you should see the model controlling the robot:
- Smooth successful case
- Struggling but successful case
- Failure case
3. Common Pitfalls & Practical Fixes π₯²
3.1 Hardware Issues
When assembling the S101, joint assembly is critical. Repeated data collection and deployment can cause friction between 3D-printed parts, servo output shafts, and aluminum flanges. Remove support material thoroughly or sand the servo shaft. Remember to recalibrate afterward.
3.2 Data Collection
There is no universal rule for βgood data.β Quality depends heavily on:
- Task diversity
- Spatial variation
- Object diversity
- Robustness to distrubances
In practice, the best approach is iterative:
- Collect data
- Train model
- Identify failure modes
- Collect more targeted data
3.3 Model Deployment
Although the model is fixed after training, deployment choices affect performance:
- Using half the predicted action chunk may yield smoother results
- Proper control and inference frequency tuning improves stability
- Environment mismatch between training and deployment may degrade performance
- Debugging is complex because failures may originate from many aspects:
- Data collection
- Training
- Deployment environment
- Script bugs or parameters
- ...
Try to design a systematic validation process. Hypothesize boldly and verify carefully. Just take your time. Good luck!




