第一次完整测例跑完

This commit is contained in:
2026-01-18 00:30:10 +08:00
parent ca15cc593b
commit 25c6fc04db
180 changed files with 29305 additions and 0 deletions

View File

@@ -0,0 +1,105 @@
import time
import numpy as np
import pinocchio as pin
from unitree_deploy.robot.robot_configs import g1_motors
from unitree_deploy.robot_devices.arm.configs import G1ArmConfig
from unitree_deploy.robot_devices.arm.utils import make_arm_motors_buses_from_configs
from unitree_deploy.robot_devices.robots_devices_utils import precise_wait
if __name__ == "__main__":
# ============== Arm Configuration ==============
def g1_dual_arm_default_factory():
return {
"g1": G1ArmConfig(
init_pose=np.zeros(14),
motors=g1_motors,
mock=False,
),
}
# ==============================================
# Initialize and connect to the robotic arm
arm = make_arm_motors_buses_from_configs(g1_dual_arm_default_factory())
for name in arm:
arm[name].connect()
time.sleep(1.5)
print("✅ Arm connected. Waiting to start...")
# Define initial target poses for left and right arms
L_tf_target = pin.SE3(
pin.Quaternion(1, 0, 0, 0),
np.array([0.25, +0.25, 0.1]),
)
R_tf_target = pin.SE3(
pin.Quaternion(1, 0, 0, 0),
np.array([0.25, -0.25, 0.1]),
)
rotation_speed = 0.005 # Rotation speed in radians per iteration
# Motion parameters
control_dt = 1 / 50 # Control cycle duration (20ms)
step = 0
max_step = 240
initial_data_received = True # Used to switch from drive to schedule mode
# Wait for user input to start the motion loop
user_input = input("Please enter the start signal (enter 's' to start the subsequent program): \n")
if user_input.lower() == "s":
try:
while True:
# Define timing for the control cycle
t_cycle_end = time.monotonic() + control_dt
t_command_target = t_cycle_end + control_dt
direction = 1 if step <= 120 else -1
angle = rotation_speed * (step if step <= 120 else (240 - step))
cos_half_angle = np.cos(angle / 2)
sin_half_angle = np.sin(angle / 2)
L_quat = pin.Quaternion(cos_half_angle, 0, sin_half_angle, 0) # 绕 Y 轴旋转
R_quat = pin.Quaternion(cos_half_angle, 0, 0, sin_half_angle) # 绕 Z 轴旋转
delta_l = np.array([0.001, 0.001, 0.001]) * direction
delta_r = np.array([0.001, -0.001, 0.001]) * direction
L_tf_target.translation += delta_l
R_tf_target.translation += delta_r
L_tf_target.rotation = L_quat.toRotationMatrix()
R_tf_target.rotation = R_quat.toRotationMatrix()
# Solve inverse kinematics for the arm
for name in arm:
sol_q, sol_tauff = arm[name].arm_ik(L_tf_target.homogeneous, R_tf_target.homogeneous)
print(f"Arm {name} solution: q={sol_q}, tauff={sol_tauff}")
# Determine command mode
cmd_target = "drive_to_waypoint" if initial_data_received else "schedule_waypoint"
# Send joint target command to arm
arm[name].write_arm(
q_target=sol_q,
tauff_target=sol_tauff, # Optional: send torque feedforward
time_target=t_command_target - time.monotonic() + time.perf_counter(),
cmd_target="schedule_waypoint",
)
# Update step and reset after full cycle
step = (step + 1) % (max_step + 1)
initial_data_received = False
# Wait until end of control cycle
precise_wait(t_cycle_end)
except KeyboardInterrupt:
# Handle Ctrl+C to safely disconnect
print("\n🛑 Ctrl+C detected. Disconnecting arm...")
for name in arm:
arm[name].disconnect()
print("✅ Arm disconnected. Exiting.")

View File

@@ -0,0 +1,91 @@
import math
import time
import numpy as np
import pinocchio as pin
from unitree_deploy.real_unitree_env import make_real_env
from unitree_deploy.utils.rerun_visualizer import RerunLogger, flatten_images, visualization_data
from unitree_deploy.utils.rich_logger import log_info
from unitree_deploy.utils.trajectory_generator import sinusoidal_gripper_motion
if __name__ == "__main__":
period = 2.0
motion_period = 2.0
motion_amplitude = 0.99
rerun_logger = RerunLogger()
env = make_real_env(robot_type="g1_dex1", dt=1 / 30)
env.connect()
# Define initial target poses for left and right arms
L_tf_target = pin.SE3(
pin.Quaternion(1, 0, 0, 0),
np.array([0.25, +0.25, 0.1]),
)
R_tf_target = pin.SE3(
pin.Quaternion(1, 0, 0, 0),
np.array([0.25, -0.25, 0.1]),
)
rotation_speed = 0.005 # Rotation speed in radians per iteration
# Motion parameters
control_dt = 1 / 50 # Control cycle duration (20ms)
step = 0
max_step = 240
initial_data_received = True # Used to switch from drive to schedule mode
# Wait for user input to start the motion loop
user_input = input("Please enter the start signal (enter 's' to start the subsequent program): \n")
if user_input.lower() == "s":
try:
current_time = math.pi / 2
idx = 0 # Initialize index for logging
while True:
# Define timing for the control cycle
t_cycle_end = time.monotonic() + control_dt
t_command_target = t_cycle_end + control_dt
direction = 1 if step <= 120 else -1
angle = rotation_speed * (step if step <= 120 else (240 - step))
cos_half_angle = np.cos(angle / 2)
sin_half_angle = np.sin(angle / 2)
L_quat = pin.Quaternion(cos_half_angle, 0, sin_half_angle, 0) # 绕 Y 轴旋转
R_quat = pin.Quaternion(cos_half_angle, 0, 0, sin_half_angle) # 绕 Z 轴旋转
delta_l = np.array([0.001, 0.001, 0.001]) * direction
delta_r = np.array([0.001, -0.001, 0.001]) * direction
L_tf_target.translation += delta_l
R_tf_target.translation += delta_r
L_tf_target.rotation = L_quat.toRotationMatrix()
R_tf_target.rotation = R_quat.toRotationMatrix()
# Solve inverse kinematics for the left arm
for arm_name in env.robot.arm:
arm_sol_q, arm_sol_tauff = env.robot.arm[arm_name].arm_ik(
L_tf_target.homogeneous, R_tf_target.homogeneous
)
gripper_target_q = sinusoidal_gripper_motion(
period=motion_period, amplitude=motion_amplitude, current_time=time.perf_counter()
)
action = np.concatenate([arm_sol_q, gripper_target_q], axis=0)
step_type, reward, _, observation = env.step(action)
idx += 1
visualization_data(idx, flatten_images(observation), observation["qpos"], arm_sol_q, rerun_logger)
# Update step and reset after full cycle
current_time += control_dt
step = (step + 1) % (max_step + 1)
except KeyboardInterrupt:
# Handle Ctrl+C to safely disconnect
log_info("\n🛑 Ctrl+C detected. Disconnecting arm...")
env.close()

View File

@@ -0,0 +1,81 @@
import math
import time
import numpy as np
import pinocchio as pin
from unitree_deploy.robot.robot_configs import z1_motors
from unitree_deploy.robot_devices.arm.utils import make_arm_motors_bus
from unitree_deploy.robot_devices.robots_devices_utils import precise_wait
from unitree_deploy.utils.trajectory_generator import generate_rotation, sinusoidal_gripper_motion
if __name__ == "__main__":
# ============== Arm Configuration ==============
arm_type = "z1"
arm_kwargs = {
"arm_type": arm_type,
"init_pose": [0.00623, 1.11164, -0.77531, -0.32167, -0.005, 0.0, 0.0], # Initial joint pose
"motors": z1_motors,
}
# ==============================================
# Initialize and connect to the robotic arm
arm = make_arm_motors_bus(**arm_kwargs)
arm.connect()
time.sleep(2)
print("✅ Arm connected. Waiting to start...")
# Define arm initial target poses
arm_tf_target = pin.SE3(pin.Quaternion(1, 0, 0, 0), np.array([0.2, 0, 0.4]))
# Motion parameters
rotation_speed = 0.01 # Rotation speed (rad per step)
control_dt = 1 / 30 # Control cycle duration (20ms)
step = 0
max_step = 240 # Full motion cycle
# Wait for user input to start the motion loop
user_input = input("Please enter the start signal (enter 's' to start the subsequent program): \n")
if user_input.lower() == "s":
try:
current_time = math.pi / 2
while True:
# Define timing for the control cycle
t_cycle_end = time.monotonic() + control_dt
t_command_target = t_cycle_end + control_dt
# Generate target rotation and translation
L_quat, R_quat, delta_l, delta_r = generate_rotation(step, rotation_speed, max_step)
arm_tf_target.translation += delta_l
# delta_r is not used in this context
arm_tf_target.rotation = L_quat.toRotationMatrix()
# Solve inverse kinematics for the left arm
arm_sol_q, arm_sol_tauff = arm.arm_ik(arm_tf_target.homogeneous)
# Generate sinusoidal motion for the gripper
target_gripper = (
sinusoidal_gripper_motion(period=4.0, amplitude=0.99, current_time=current_time) - 1
) # Adjust target_q by subtracting 1
target_arm = np.concatenate((arm_sol_q, target_gripper), axis=0) # Add a zero for the gripper
arm.write_arm(
q_target=target_arm,
# tauff_target=left_sol_tauff, # Optional: send torque feedforward
time_target=t_command_target - time.monotonic() + time.perf_counter(),
cmd_target="schedule_waypoint",
)
# Update step and reset after full cycle
step = (step + 1) % (max_step + 1)
current_time += control_dt
# Wait until end of control cycle
precise_wait(t_cycle_end)
except KeyboardInterrupt:
# Handle Ctrl+C to safely disconnect
print("\n🛑 Ctrl+C detected. Disconnecting arm...")
arm.disconnect()
print("✅ Arm disconnected. Exiting.")

View File

@@ -0,0 +1,112 @@
import time
import numpy as np
import pinocchio as pin
from unitree_deploy.robot_devices.arm.configs import Z1DualArmConfig
from unitree_deploy.robot_devices.arm.utils import make_arm_motors_buses_from_configs
from unitree_deploy.robot_devices.robots_devices_utils import precise_wait
from unitree_deploy.utils.trajectory_generator import generate_rotation
if __name__ == "__main__":
# ============== Arm Configuration ==============
def z1_dual_arm_single_config_factory():
return {
"z1_dual": Z1DualArmConfig(
left_robot_ip="127.0.0.1",
left_robot_port1=8073,
left_robot_port2=8074,
right_robot_ip="127.0.0.1",
right_robot_port1=8071,
right_robot_port2=8072,
init_pose_left=[0, 0, 0, 0, 0, 0],
init_pose_right=[0, 0, 0, 0, 0, 0],
control_dt=1 / 250.0,
motors={
# name: (index, model)
"kLeftWaist": [0, "z1-joint"],
"kLeftShoulder": [1, "z1-joint"],
"kLeftElbow": [2, "z1-joint"],
"kLeftForearmRoll": [3, "z1-joint"],
"kLeftWristAngle": [4, "z1-joint"],
"kLeftWristRotate": [5, "z1-joint"],
"kRightWaist": [7, "z1-joint"],
"kRightShoulder": [8, "z1-joint"],
"kRightElbow": [9, "z1-joint"],
"kRightForearmRoll": [10, "z1-joint"],
"kRightWristAngle": [11, "z1-joint"],
"kRightWristRotate": [12, "z1-joint"],
},
),
}
# ==============================================
# Initialize and connect to the robotic arm
arm = make_arm_motors_buses_from_configs(z1_dual_arm_single_config_factory())
for name in arm:
arm[name].connect()
time.sleep(1.5)
print("✅ Arm connected. Waiting to start...")
# Define initial target poses for left and right arms
L_tf_target = pin.SE3(pin.Quaternion(1, 0, 0, 0), np.array([0.2, 0, 0.4]))
R_tf_target = pin.SE3(pin.Quaternion(1, 0, 0, 0), np.array([0.2, 0, 0.3]))
# Motion parameters
rotation_speed = 0.01 # Rotation speed (rad per step)
control_dt = 1 / 30 # Control cycle duration (20ms)
step = 0
max_step = 240 # Full motion cycle
initial_data_received = True # Used to switch from drive to schedule mode
# Wait for user input to start the motion loop
user_input = input("Please enter the start signal (enter 's' to start the subsequent program): \n")
if user_input.lower() == "s":
try:
while True:
# Define timing for the control cycle
t_cycle_end = time.monotonic() + control_dt
t_command_target = t_cycle_end + control_dt
# Generate target rotation and translation
L_quat, R_quat, delta_l, delta_r = generate_rotation(step, rotation_speed, max_step)
# Apply translation deltas to target pose
L_tf_target.translation += delta_l
R_tf_target.translation += delta_r
# Apply rotation to target pose
L_tf_target.rotation = L_quat.toRotationMatrix()
R_tf_target.rotation = R_quat.toRotationMatrix()
# Solve inverse kinematics for the left arm
for name in arm:
sol_q, sol_tauff = arm[name].arm_ik(L_tf_target.homogeneous, R_tf_target.homogeneous)
# Determine command mode
cmd_target = "drive_to_waypoint" if initial_data_received else "schedule_waypoint"
# Send joint target command to arm
for name in arm:
arm[name].write_arm(
q_target=sol_q,
# tauff_target=sol_tauff, # Optional: send torque feedforward
time_target=t_command_target - time.monotonic() + time.perf_counter(),
cmd_target=cmd_target,
)
# Update step and reset after full cycle
step = (step + 1) % (max_step + 1)
initial_data_received = False
# Wait until end of control cycle
precise_wait(t_cycle_end)
except KeyboardInterrupt:
# Handle Ctrl+C to safely disconnect
print("\n🛑 Ctrl+C detected. Disconnecting arm...")
for name in arm:
arm[name].disconnect()
print("✅ Arm disconnected. Exiting.")

View File

@@ -0,0 +1,65 @@
import math
import time
import numpy as np
import pinocchio as pin
from unitree_deploy.real_unitree_env import make_real_env
from unitree_deploy.utils.rerun_visualizer import RerunLogger, flatten_images, visualization_data
from unitree_deploy.utils.rich_logger import log_info
from unitree_deploy.utils.trajectory_generator import generate_rotation, sinusoidal_gripper_motion
if __name__ == "__main__":
rerun_logger = RerunLogger()
env = make_real_env(robot_type="z1_realsense", dt=1 / 30)
env.connect()
# Define initial target poses for left and right arms
arm_tf_target = pin.SE3(pin.Quaternion(1, 0, 0, 0), np.array([0.2, 0, 0.4]))
# Motion parameters
rotation_speed = 0.01 # Rotation speed (rad per step)
control_dt = 1 / 30 # Control cycle duration (20ms)
step = 0
max_step = 240 # Full motion cycle
# Wait for user input to start the motion loop
user_input = input("Please enter the start signal (enter 's' to start the subsequent program): \n")
if user_input.lower() == "s":
try:
current_time = math.pi / 2
idx = 0 # Initialize index for logging
while True:
# Define timing for the control cycle
t_cycle_end = time.monotonic() + control_dt
t_command_target = t_cycle_end + control_dt
# Generate target rotation and translation
L_quat, R_quat, delta_l, delta_r = generate_rotation(step, rotation_speed, max_step)
arm_tf_target.translation += delta_l
# delta_r is not used in this context
arm_tf_target.rotation = L_quat.toRotationMatrix()
# Solve inverse kinematics for the left arm
for arm_name in env.robot.arm:
arm_sol_q, arm_sol_tauff = env.robot.arm[arm_name].arm_ik(arm_tf_target.homogeneous)
# Generate sinusoidal motion for the gripper
target_gripper = (
sinusoidal_gripper_motion(period=4.0, amplitude=0.99, current_time=current_time) - 1
) # Adjust target_q by subtracting 1
target_arm = np.concatenate((arm_sol_q, target_gripper), axis=0) # Add a zero for the gripper
step_type, reward, _, observation = env.step(target_arm)
idx += 1
visualization_data(idx, flatten_images(observation), observation["qpos"], target_arm, rerun_logger)
# Update step and reset after full cycle
current_time += control_dt
step = (step + 1) % (max_step + 1)
except KeyboardInterrupt:
# Handle Ctrl+C to safely disconnect
log_info("\n🛑 Ctrl+C detected. Disconnecting arm...")
env.close()

View File

@@ -0,0 +1,64 @@
import time
import cv2
import numpy as np
import torch
from tqdm import tqdm
from unitree_deploy.robot_devices.cameras.configs import ImageClientCameraConfig
from unitree_deploy.robot_devices.cameras.utils import make_cameras_from_configs
from unitree_deploy.utils.rich_logger import log_success
# ============================From configs============================
def run_camera():
def image_client_default_factory():
return {
"imageclient": ImageClientCameraConfig(
head_camera_type="opencv",
head_camera_id_numbers=[4],
head_camera_image_shape=[480, 1280], # Head camera resolution
wrist_camera_type="opencv",
wrist_camera_id_numbers=[0, 2],
wrist_camera_image_shape=[480, 640], # Wrist camera resolution
aspect_ratio_threshold=2.0,
fps=30,
mock=False,
),
}
# ===========================================
cameras = make_cameras_from_configs(image_client_default_factory())
print(cameras)
for name in cameras:
cameras[name].connect()
log_success(f"Connecting {name} cameras.")
for _ in tqdm(range(20), desc="Camera warming up"):
for name in cameras:
cameras[name].async_read()
time.sleep(1 / 30)
while True:
images = {}
for name in cameras:
output = cameras[name].async_read()
if isinstance(output, dict):
for k, v in output.items():
images[k] = torch.from_numpy(v)
else:
images[name] = torch.from_numpy(output)
image_list = [np.stack([img.numpy()] * 3, axis=-1) if img.ndim == 2 else img.numpy() for img in images.values()]
stacked_image = np.hstack(image_list)
cv2.imshow("Stacked Image", stacked_image)
if (cv2.waitKey(1) & 0xFF) == ord("q"):
cv2.destroyAllWindows()
break
if __name__ == "__main__":
run_camera()

View File

@@ -0,0 +1,66 @@
import time
import cv2
import numpy as np
from unitree_deploy.robot_devices.cameras.configs import IntelRealSenseCameraConfig
from unitree_deploy.robot_devices.cameras.utils import make_cameras_from_configs
from unitree_deploy.utils.rich_logger import log_success
def run_camera():
# ===========================================
def intelrealsense_camera_default_factory():
return {
"cam_high": IntelRealSenseCameraConfig(
serial_number="044122071036",
fps=30,
width=640,
height=480,
),
"cam_wrist": IntelRealSenseCameraConfig(
serial_number="419122270615",
fps=30,
width=640,
height=480,
),
}
# ===========================================
cameras = make_cameras_from_configs(intelrealsense_camera_default_factory())
for name in cameras:
cameras[name].connect()
log_success(f"Connecting {name} cameras.")
for _ in range(20):
for name in cameras:
cameras[name].async_read()
time.sleep(1 / 30)
while True:
images = []
for name in cameras:
frame = cameras[name].async_read()
if frame is not None:
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
cv2.putText(frame, name, (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
images.append(frame)
if images:
rows = []
for i in range(0, len(images), 2):
row = np.hstack(images[i : i + 2])
rows.append(row)
canvas = np.vstack(rows)
cv2.imshow("All Cameras", canvas)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
cv2.destroyAllWindows()
if __name__ == "__main__":
run_camera()

View File

@@ -0,0 +1,95 @@
import time
import cv2
import numpy as np
import tyro
from tqdm import tqdm
from unitree_deploy.robot_devices.cameras.configs import OpenCVCameraConfig
from unitree_deploy.robot_devices.cameras.utils import make_camera, make_cameras_from_configs
from unitree_deploy.utils.rich_logger import log_success
def usb_camera_default_factory():
return {
"cam_high": OpenCVCameraConfig(
camera_index="/dev/video1",
fps=30,
width=640,
height=480,
),
"cam_left_wrist": OpenCVCameraConfig(
camera_index="/dev/video3",
fps=30,
width=640,
height=480,
),
"cam_right_wrist": OpenCVCameraConfig(
camera_index="/dev/video5",
fps=30,
width=640,
height=480,
),
}
def run_cameras(camera_style: int = 0):
"""
Runs camera(s) based on the specified style.
Args:
camera_style (int):
0 - Single camera (OpenCV).
1 - Multiple cameras from config.
"""
if camera_style == 0:
# ========== Single camera ==========
camera_kwargs = {"camera_type": "opencv", "camera_index": "/dev/video5", "mock": False}
camera = make_camera(**camera_kwargs)
camera.connect()
log_success("Connecting camera.")
while True:
color_image = camera.read()
color_image = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB)
cv2.imshow("Camera", color_image)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
elif camera_style == 1:
# ========== Multi-camera from configs ==========
cameras = make_cameras_from_configs(usb_camera_default_factory())
for name in cameras:
cameras[name].connect()
log_success(f"Connecting {name} camera.")
# Camera warm-up
for _ in tqdm(range(20), desc="Camera warming up"):
for name in cameras:
cameras[name].async_read()
time.sleep(1 / 30)
while True:
images = {}
for name in cameras:
images[name] = cameras[name].async_read()
image_list = [
np.stack([img.numpy()] * 3, axis=-1) if img.ndim == 2 else img.numpy() for img in images.values()
]
stacked_image = np.hstack(image_list)
cv2.imshow("Multi-Camera View", stacked_image)
if (cv2.waitKey(1) & 0xFF) == ord("q"):
cv2.destroyAllWindows()
break
else:
raise ValueError(f"Unsupported camera_style: {camera_style}")
if __name__ == "__main__":
tyro.cli(run_cameras)

View File

@@ -0,0 +1,60 @@
import time
import tyro
from unitree_deploy.robot_devices.endeffector.utils import (
Dex1_GripperConfig,
make_endeffector_motors_buses_from_configs,
)
from unitree_deploy.robot_devices.robots_devices_utils import precise_wait
from unitree_deploy.utils.rich_logger import log_success
from unitree_deploy.utils.trajectory_generator import sinusoidal_single_gripper_motion
period = 2.0
motion_period = 2.0
motion_amplitude = 0.99
def gripper_default_factory():
return {
"left": Dex1_GripperConfig(
unit_test=True,
motors={
"kLeftGripper": [0, "z1_gripper-joint"],
},
topic_gripper_state="rt/dex1/left/state",
topic_gripper_command="rt/dex1/left/cmd",
),
"right": Dex1_GripperConfig(
unit_test=True,
motors={
"kRightGripper": [1, "z1_gripper-joint"],
},
topic_gripper_state="rt/dex1/right/state",
topic_gripper_command="rt/dex1/right/cmd",
),
}
def run_gripper():
control_dt = 1 / 30
log_success("Running gripper in style 1 (multi-bus from config)")
endeffectors = make_endeffector_motors_buses_from_configs(gripper_default_factory())
for name in endeffectors:
endeffectors[name].connect()
log_success(f"Connected endeffector '{name}'.")
while True:
t_cycle_end = time.monotonic() + control_dt
target_q = sinusoidal_single_gripper_motion(
period=motion_period, amplitude=motion_amplitude, current_time=time.perf_counter()
)
for name in endeffectors:
endeffectors[name].write_endeffector(q_target=target_q)
precise_wait(t_cycle_end)
if __name__ == "__main__":
tyro.cli(run_gripper)

View File

@@ -0,0 +1,44 @@
"""
python test/test_replay.py --repo-id unitreerobotics/G1_CameraPackaging_NewDataset --robot_type g1_dex1
python test/test_replay.py --repo-id unitreerobotics/Z1_StackBox_Dataset --robot_type z1_realsense
python test/test_replay.py --repo-id unitreerobotics/Z1_Dual_Dex1_StackBox_Dataset_V2 --robot_type z1_dual_dex1_realsense
"""
import tyro
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from unitree_deploy.real_unitree_env import make_real_env
from unitree_deploy.utils.rerun_visualizer import RerunLogger, flatten_images, visualization_data
from unitree_deploy.utils.rich_logger import log_info
# Replay a specific episode from the LeRobot dataset using the real environment robot_type(e.g., g1_dex1, z1_realsense, z1_dual_dex1_realsense)
def replay_lerobot_data(repo_id: str, robot_type: str, root: str | None = None, episode: int = 145):
dataset = LeRobotDataset(repo_id, root=root, episodes=[episode])
actions = dataset.hf_dataset.select_columns("action")
init_pose_arm = actions[0]["action"].numpy()[:14] if robot_type == "g1" else actions[0]["action"].numpy()
rerun_logger = RerunLogger()
env = make_real_env(robot_type=robot_type, dt=1 / 30, init_pose_arm=init_pose_arm)
env.connect()
try:
# Wait for user input to start the motion loop
user_input = input("Please enter the start signal (enter 's' to start the subsequent program): \n")
if user_input.lower() == "s":
log_info("Replaying episode")
for idx in range(dataset.num_frames):
action = actions[idx]["action"].numpy()
if robot_type == "z1_realsense":
action[-1] = -action[-1]
step_type, reward, _, observation = env.step(action)
visualization_data(idx, flatten_images(observation), observation["qpos"], action, rerun_logger)
env.close()
except KeyboardInterrupt:
# Handle Ctrl+C to safely disconnect
log_info("\n🛑 Ctrl+C detected. Disconnecting arm...")
env.close()
if __name__ == "__main__":
tyro.cli(replay_lerobot_data)