45 lines
2.1 KiB
Python
45 lines
2.1 KiB
Python
"""
|
||
python test/test_replay.py --repo-id unitreerobotics/G1_CameraPackaging_NewDataset --robot_type g1_dex1
|
||
python test/test_replay.py --repo-id unitreerobotics/Z1_StackBox_Dataset --robot_type z1_realsense
|
||
python test/test_replay.py --repo-id unitreerobotics/Z1_Dual_Dex1_StackBox_Dataset_V2 --robot_type z1_dual_dex1_realsense
|
||
"""
|
||
|
||
import tyro
|
||
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
||
|
||
from unitree_deploy.real_unitree_env import make_real_env
|
||
from unitree_deploy.utils.rerun_visualizer import RerunLogger, flatten_images, visualization_data
|
||
from unitree_deploy.utils.rich_logger import log_info
|
||
|
||
|
||
# Replay a specific episode from the LeRobot dataset using the real environment robot_type:(e.g., g1_dex1, z1_realsense, z1_dual_dex1_realsense)
|
||
def replay_lerobot_data(repo_id: str, robot_type: str, root: str | None = None, episode: int = 145):
|
||
dataset = LeRobotDataset(repo_id, root=root, episodes=[episode])
|
||
actions = dataset.hf_dataset.select_columns("action")
|
||
init_pose_arm = actions[0]["action"].numpy()[:14] if robot_type == "g1" else actions[0]["action"].numpy()
|
||
rerun_logger = RerunLogger()
|
||
|
||
env = make_real_env(robot_type=robot_type, dt=1 / 30, init_pose_arm=init_pose_arm)
|
||
env.connect()
|
||
|
||
try:
|
||
# Wait for user input to start the motion loop
|
||
user_input = input("Please enter the start signal (enter 's' to start the subsequent program): \n")
|
||
if user_input.lower() == "s":
|
||
log_info("Replaying episode")
|
||
for idx in range(dataset.num_frames):
|
||
action = actions[idx]["action"].numpy()
|
||
if robot_type == "z1_realsense":
|
||
action[-1] = -action[-1]
|
||
step_type, reward, _, observation = env.step(action)
|
||
visualization_data(idx, flatten_images(observation), observation["qpos"], action, rerun_logger)
|
||
env.close()
|
||
except KeyboardInterrupt:
|
||
# Handle Ctrl+C to safely disconnect
|
||
log_info("\n🛑 Ctrl+C detected. Disconnecting arm...")
|
||
env.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
tyro.cli(replay_lerobot_data)
|