upload real-robot deployment code

This commit is contained in:
yuchen-x
2025-09-23 15:13:22 +08:00
parent 5dcd1ca503
commit f12b478265
130 changed files with 10434 additions and 5 deletions

View File

@@ -0,0 +1,70 @@
# Getting Started
### Code framework
| Module Name | Documentation Link |
| ------------------------- | -------------------------------------------------- |
| robots | [build_robot](./build_robot.md) |
| robot_devices/arm | [add_robot_arm](./add_robot_arm.md) |
| robot_devices/cameras | [add_robot_camera](./add_robot_camera.md) |
| robot_devices/endeffector | [add_robot_endeffector](./add_robot_endeffector.md)|
### Simple Usage (Example code, not executable)
```python
import time
import math
import torch
from unitree_deploy.robot.robot_utils import make_robot
from unitree_deploy.robot_devices.robots_devices_utils import precise_wait
class YourPolicy:
def predict_action(self, observation, policy):
# Logic for predicting action
pass
class UnitreeEnv:
def __init__(self):
self.robot = make_robot(self.robot_type)
if not self.robot.is_connected:
self.robot.connect()
# If disconnection is needed, call disconnect() here
# self.robot.disconnect()
def get_obs(self):
# Get observation
observation = self.robot.capture_observation()
return observation
def step(self, pred_action, t_command_target):
# Execute action
t_cycle_end = time.monotonic() + self.control_dt
t_command_target = t_cycle_end + self.control_dt
action = self.robot.send_action(torch.from_numpy(pred_action), t_command_target)
precise_wait(t_cycle_end)
return action
if __name__ == "__main__":
policy = YourPolicy() # Create policy instance
env = UnitreeEnv() # Create environment instance
t_start = time.monotonic() # Get start time
iter_idx = 0 # Initialize iteration index
control_dt = 1 / 30 # Control loop interval (30Hz)
try:
while True:
t_cycle_end = t_start + (iter_idx + 1) * control_dt # Calculate end time of current cycle
t_command_target = t_cycle_end + control_dt # Calculate command target time
observation = env.get_obs() # Get environment observation
pred_action = policy.predict_action(observation, policy) # Predict action
env.step(pred_action, t_command_target) # Execute action
precise_wait(t_cycle_end) # Wait until cycle end
iter_idx += 1 # Update iteration index
finally:
# Perform cleanup operations on exit (e.g., disconnect robot)
pass

View File

@@ -0,0 +1,76 @@
# How to Build Your Own Arm
### Define your own config for the robot arm (unitree_deploy/robot_devices/arm/config.py)
```python
@ArmConfig.register_subclass("z1") # Register your custom arm wrapper. Here use def __init__(self, config: Z1DualArmConfig):
@dataclass
class Z1ArmConfig(ArmConfig):
port: str
motors: dict[str, tuple[int, str]]
mock: bool = False
init_pose_left: list = None
init_pose_right: list = None
control_dt: float = 1/500.0
# Default parameters go first [parameters that may need to be customized],
# Non-default parameters go later [fixed parameters]
```
### Description of methods in your arm class (unitree_deploy/robot_devices/arm/utils.py)
```python
# Base class for Arm, extensible with required methods
class Arm(Protocol):
def connect(self): ...
def disconnect(self): ...
def motor_names(self): ...
def read_current_motor_q(self): ...
def read_current_arm_q(self): ...
def read_current_arm_dq(self): ...
def write_arm(self): ...
def arm_ik(self): ...
```
How to implement external calls?
Use make_arm_motors_buses_from_configs [based on the config file] to construct the UnitreeRobot class.
Use make_arm_motors_bus [based on arm_type] which is generally used for external module loading.
### Implementation of the arm class (unitree_deploy/robot_devices/arm/.../....py)
```python
# These methods need to be implemented and completed
def connect(self): ...
def disconnect(self): ...
def motor_names(self): ...
# connect() and disconnect() should handle initialization and homing respectively
def read_current_motor_q(self): ...
def read_current_arm_q(self): ...
def read_current_arm_dq(self): ...
# Outputs should be unified as np.ndarray
def write_arm(self): ...
# Write control commands here
def arm_ik(self): ...
# Wrap IK into your own arm class for external calling
# Private/protected properties [for reading motor names, IDs, etc.]
@property
def motor_names(self) -> list[str]:
return list(self.motors.keys())
@property
def motor_models(self) -> list[str]:
return [model for _, model in self.motors.values()]
@property
def motor_indices(self) -> list[int]:
return [idx for idx, _ in self.motors.values()]
```
All arms use threading to implement \_subscribe_motor_state and \_ctrl_motor_state threads for internal reading and writing within the class.

View File

@@ -0,0 +1,66 @@
# How to build your own cameras
### Define your own config for cameras (unitree_deploy/robot_devices/cameras/config.py)
```python
@CameraConfig.register_subclass("opencv") # Define and wrap your own cameras. Here use def __init__(self, config: OpenCVCameraConfig):
@dataclass
class OpenCVCameraConfig(CameraConfig):
"""
Example of tested options for Intel Real Sense D405:
OpenCVCameraConfig(0, 30, 640, 480)
OpenCVCameraConfig(0, 60, 640, 480)
OpenCVCameraConfig(0, 90, 640, 480)
OpenCVCameraConfig(0, 30, 1280, 720)
"""
# Define the required camera parameters
camera_index: int
fps: int | None = None
width: int | None = None
height: int | None = None
color_mode: str = "rgb"
channels: int | None = None
rotation: int | None = None
mock: bool = False
def __post_init__(self):
if self.color_mode not in ["rgb", "bgr"]:
raise ValueError(
f"`color_mode` is expected to be 'rgb' or 'bgr', but {self.color_mode} is provided."
)
self.channels = 3
if self.rotation not in [-90, None, 90, 180]:
raise ValueError(f"`rotation` must be in [-90, None, 90, 180] (got {self.rotation})")
# Default parameters go first [parameters that need to be customized],
# Non-default parameters go later [fixed parameters]
```
### Description of methods in your cameras class (unitree_deploy/robot_devices/cameras/utils.py)
```python
# Base class for cameras, extensible with required methods
class Camera(Protocol):
def connect(self): ...
def read(self, temporary_color: str | None = None) -> np.ndarray: ... # Single-threaded reading
def async_read(self) -> np.ndarray: ... # Multi-threaded
def disconnect(self): ...
```
How can external modules implement calls? Use **make_cameras_from_configs [based on configuration files]** to construct the `UnitreeRobot` class.
**make_camera [based on camera_type]** is generally used for external module loading.
### Implementation of the `camera` class (unitree_deploy/robot_devices/camera/.../....py)
```python
# These need to be completed, focusing on implementing these two parts
def read(self, temporary_color: str | None = None) -> np.ndarray: ... # Single-threaded reading
def async_read(self) -> np.ndarray: ... # Multi-threaded
```
All cameras use threading to implement `async_read` for internal read and write operations.

View File

@@ -0,0 +1,77 @@
# How to Build Your Own End-Effector [Currently dex_1 and dex_3 are available]
### Define your own config for the end-effector (unitree_deploy/robot_devices/endeffector/config.py)
```python
@EndEffectorConfig.register_subclass("gripper") # Register your custom end-effector wrapper. Here it uses def __init__(self, config: GripperConfig):
@dataclass
class GripperConfig(EndEffectorConfig):
motors: dict[str, tuple[int, str]]
unit_test: bool = False
control_dt: float = 1/200
mock: bool = False
def __post_init__(self):
if self.control_dt < 0.002:
raise ValueError(f"`control_dt` must > 1/500 (got {self.control_dt})")
# Default arguments should be placed first [parameters that may need to be customized],
# Non-default arguments should be placed later [fixed or less important parameters].
```
### Description of methods in your end-effector class (unitree_deploy/robot_devices/endeffector/utils.py)
```python
# Base class for EndEffector, extend with required methods
class EndEffector(Protocol):
def connect(self): ...
def disconnect(self): ...
def motor_names(self): ...
def read_current_endeffector_q(self): ...
def read_current_endeffector_dq(self): ...
def write_endeffector(self): ...
def endeffector_ik(self): ...
```
How to call externally?
Use make_endeffector_motors_buses_from_configs → Construct the UnitreeRobot class based on the config file
Use make_endeffector_motors_bus → Construct based on endeffector_type (typically for external module loading)
### Implementation of your end-effector class (unitree_deploy/robot_devices/endeffector/.../....py)
```python
# These methods need to be implemented and completed
def connect(self): ...
def disconnect(self): ...
def motor_names(self): ...
# connect() and disconnect() should handle initialization and homing respectively
def read_current_endeffector_q(self): ...
def read_current_endeffector_dq(self): ...
# Outputs should be unified as np.ndarray
def write_endeffector(self): ...
# Write control commands here
def arm_ik(self): ...
# Wrap IK into your own arm class, to be called externally
# Private/protected properties
# (for reading motor names, IDs, etc. These will be used in UnitreeRobot class for dataset encapsulation)
@property
def motor_names(self) -> list[str]:
return list(self.motors.keys())
@property
def motor_models(self) -> list[str]:
return [model for _, model in self.motors.values()]
@property
def motor_indices(self) -> list[int]:
return [idx for idx, _ in self.motors.values()]
```
For arms, use threading to implement \_subscribe_gripper_motor_state (thread for reading motor states),\_ctrl_gripper_motor (thread for motor control),Both threads should run internally within the class.

View File

@@ -0,0 +1,140 @@
# Build your own robot
### Add your own config ((unitree_deploy/robot/robot_configs.py))
The base class of robot config is defined as **UnitreeRobotConfig**
```python
class UnitreeRobotConfig(RobotConfig):
cameras: dict[str, CameraConfig] = field(default_factory=lambda: {}) # Corresponding to your own camera
arm: dict[str, ArmConfig] = field(default_factory=lambda: {}) # Corresponding to your own arm
endeffector: dict[str, EndEffectorConfig] = field(default_factory=lambda: {}) # Corresponding to your own end-effector
mock: bool = False # Simulation [To be implemented, for debugging, to check some class definitions and message type formats]
```
Specific example: separately fill in \[name\]:robot_devies → cameras,
arm, endeffector.\
If not provided, they default to empty and will not affect the system.\
(In principle, different robot_devies and different quantities can be
constructed.)
```python
class Z1dual_Dex1_Opencv_RobotConfig(UnitreeRobotConfig):
# Troubleshooting: If one of your IntelRealSense cameras freezes during
# data recording due to bandwidth limit, you might need to plug the camera
# into another USB hub or PCIe card.
cameras: dict[str, CameraConfig] = field(
default_factory=lambda: { # Add corresponding configs for different cameras [name]:OpenCVCameraConfig + required parameters
"cam_high": OpenCVCameraConfig(
camera_index="/dev/video0",
fps=30,
width=640,
height=480,
),
"cam_left_wrist": OpenCVCameraConfig(
camera_index="/dev/video2",
fps=30,
width=640,
height=480,
),
"cam_right_wrist": OpenCVCameraConfig(
camera_index="/dev/video4",
fps=30,
width=640,
height=480,
),
}
)
arm: dict[str, ArmConfig] = field(
default_factory=lambda: {
"z1_dual": Z1DualArmConfig( # Add corresponding configs for different arms [name]:Z1DualArmConfig + required parameters
unit_test = False,
init_pose_left = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
init_pose_right = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
control_dt = 1/500.0,
motors={
# name: (index, model)
"kLeftWaist": [0, "z1-joint"],
"kLeftShoulder": [1, "z1-joint"],
"kLeftElbow": [2, "z1-joint"],
"kLeftForearmRoll": [3, "z1-joint"],
"kLeftWristAngle": [4, "z1-joint"],
"kLeftWristRotate": [5, "z1-joint"],
"kRightWaist": [7, "z1-joint"],
"kRightShoulder": [8, "z1-joint"],
"kRightElbow": [9, "z1-joint"],
"kRightForearmRoll": [10, "z1-joint"],
"kRightWristAngle": [11, "z1-joint"],
"kRightWristRotate": [12, "z1-joint"],
},
),
}
)
endeffector: dict[str, EndEffectorConfig] = field(
default_factory=lambda: {
"gripper": GripperConfig( # Add corresponding configs for different end-effectors [name]:GripperConfig + required parameters
unit_test = False,
unit_test = True,
control_dt = 1/250,
motors={
# name: (index, model)
"kLeftGripper": [0, "z1_gripper-joint"],
"kRightGripper": [1, "z1_gripper-joint"],
},
),
}
)
mock: bool = False
```
---
### robot utils ((unitree_deploy/robot/utils.py))
```python
Implementation of the Robot base class
class Robot(Protocol):
robot_type: str
features: dict
def connect(self): ... # Connect devices (including cameras, arms, end-effectors of robot_devies)
def capture_observation(self): ... # capture_observation (Get current state, including data from camera + arm + end-effector)
def send_action(self, action): ... # send_action (Send action to arm + end-effector actuators, can be used for model inference and data replay)
def disconnect(self): ... # Disconnect devices
```
External calls **make_robot_from_config** and **make_robot** are used in
**control_robot**, to initialize the robot and implement specific
functions.
---
### manipulator ((unitree_deploy/robot/manipulator.py))
UnitreeRobot implements initialization by calling
**UnitreeRobotConfig**.
```python
Several important parts of the implementation
def capture_observation(self): # Get current obs, return { observation.state, observation.images}
def send_action( # Model inference and data replay, receives action + time
self, action: torch.Tensor, t_command_target:float|None = None
) -> torch.Tensor:
# Here we input device data
# Output (arm + end-effector) joint angle positions, end-effector positions, or other data conversion (IK is implemented here!)
# Output is uniformly converted into joint angle positions {"left":arm_joint_points, "roght":arm_joint_points} + {"left":endeffector_joint_points, "roght":endeffector_joint_points}
# Why consider left and right? Because this separates single-arm cases, and different arms and different end-effectors.
# This way, the implementation can work properly.
def convert_data_based_on_robot_type(self, robot_type: str, leader_pos: dict[str, np.ndarray]
) -> None | tuple[dict[str, np.ndarray], dict[str, np.ndarray]]:
```