Lerobot的核心代码在src/lerobot中,这个文件夹下有:
- async_inference
- datasets
- model
- outputs
- __pycache__
- scripts
- transport
- cameras
- envs
- motors
- policies
- rl
- teleoperators
- utils
- configs
- __init__.py
- optim
- processor
- robots
- templates
- __version__.py 抛开一些Python文件,其中主要是几部分构成:
- 核心算法与策略
- policies/: 这是核心目录,存放各种机器人控制策略的具体实现代码(例如 ACT, Diffusion Policy, VQ-Bet 等)。每个子目录通常对应一种特定的策略模型。
- model/: 存放通用的模型组件,目前包含 kinematics.py,主要涉及简单的运动学模型,独立于具体策略。
- optim/: 包含优化器(Optimizers)和学习率调度器(Schedulers)的配置与实现。
- processor/: 负责数据的预处理和后处理流水线。包含归一化(Normalization)、Tokenization、图像处理以及 Gym Action/Observation 的处理逻辑。
- 硬件与机器人抽象
- robots/: 定义具体机器人的配置和类。这里包含了不同机器人(如 Koch, Aloha, So-100 等)的硬件映射和初始化逻辑。
- cameras/: 相机硬件的驱动和抽象层,用于连接和读取不同的摄像头设备(如 OpenCV 相机, Intel RealSense 等)。
- motors/: 电机硬件的驱动和抽象层,用于控制不同的电机(如 Dynamixel, Feetech 等)。
- teleoperators/: 用于遥操作(Teleoperation)的设备接口,支持通过手柄、VR 设备或主手(Leader Arm)来控制机器人并收集数据。
- 数据与环境
- datasets/: 处理数据集的加载、下载、上传和格式化。LeRobot 拥有自己的数据格式,这里的代码负责与 Hugging Face Hub 交互以及数据流的构建。
- envs/: 包含仿真环境的封装,通常遵循 Gymnasium 接口,用于在仿真中训练或评估策略。
- 系统功能与工具
- configs/: 存放项目的默认配置文件(通常是 .yaml 格式),用于 Hydra 配置管理系统,涵盖默认的策略、环境和硬件配置。
- utils/: 通用的工具函数集合,包含日志记录、随机种子设置、路径管理、可视化工具等。
- scripts/: 包含各种实用脚本,用于项目维护、测试或一些特定的辅助任务。
- templates/: 可能包含代码生成或项目结构的模板文件。
- 进阶功能
- transport/: 处理通信相关的逻辑(使用 gRPC),用于跨进程或跨设备的分布式控制与数据传输。
- async_inference/: 支持异步推理的模块,用于在部署时提高模型推理的吞吐量和响应速度。
- rl/: 包含强化学习(Reinforcement Learning)相关的组件,如 Actor(执行者)、Learner(学习者)、Buffer(经验回放缓冲区)等,用于在线学习或微调。 总结来说,src/lerobot 的结构非常模块化:policies 和 datasets 是算法核心,robots、cameras、motors 构成了硬件抽象层,而其他文件夹则提供了必要的辅助功能和系统支持。
推理的接口
首先先在mujoco环境中创建一个Franka机械臂,然后仿照SmolVLA应用创建一个机械臂实例,对于从真机到模拟环境的改动,可以依照下面的Gymnasium的最小实现仿照完成:
# 这就是 Gymnasium 的全部核心 API
env = gym.make("SomeEnv")
obs, info = env.reset() # 重置,拿初始观测
obs, reward, done, truncated, info = env.step(action) # 执行动作
image = env.render() # 获取图像
# 了解空间定义(用于检查维度)
env.observation_space # 观测空间
env.action_space # 动作空间Eval的接口
整体而言,eval文件分为以下几个部分:
lerobot_eval.py
│
├── 导入部分
│ ├── make_policy, make_pre_post_processors ← 模型工厂
│ ├── make_env, make_env_pre_post_processors ← 环境工厂
│ ├── preprocess_observation ← 观测预处理
│ └── write_video ← 视频保存
│
├── rollout() ← ⭐ 核心函数:跑一轮推理
│
├── eval_policy() ← 多 episode 评估循环
│
└── main() / CLI ← 命令行入口
先从最核心的rollout()函数开始,这个函数本质就是先前推理的官方实现,可以见SmolVLA应用。
但是这个官方实现采用并行的方式进行测试,加快了速度:
done = np.array([False] * env.num_envs)
max_steps = env.call("_max_episode_steps")[0]除此之外,大致核心逻辑是相同的:先创建一个虚拟环境,然后
policy.reset()
observation, info = env.reset(seed=seeds)
...
all_observations = []
all_actions = []
all_rewards = []
all_successes = []
all_dones = []
step = 0
# Keep track of which environments are done.
done = np.array([False] * env.num_envs)
max_steps = env.call("_max_episode_steps")[0]
...
while not np.all(done) and step < max_steps:
# Numpy array to tensor and changing dictionary keys to LeRobot policy format.
observation = preprocess_observation(observation)
observation = add_envs_task(env, observation)
observation = preprocessor(observation)
with torch.inference_mode():
action = policy.select_action(observation)
action = postprocessor(action)
# Convert to CPU / numpy. 由于env.step() 只接受 numpy array,不接受 torch tensor
action_numpy: np.ndarray = action.to("cpu").numpy()
assert action_numpy.ndim == 2, "Action dimensions should be (batch, action_dim)"
# Apply the next action.
observation, reward, terminated, truncated, info = env.step(action_numpy)
if render_callback is not None:
render_callback(env)
done = terminated | truncated | done
if step + 1 == max_steps:
done = np.ones_like(done, dtype=bool)
step += 1
ret = {
ACTION: torch.stack(all_actions, dim=1),
"reward": torch.stack(all_rewards, dim=1),
"success": torch.stack(all_successes, dim=1),
"done": torch.stack(all_dones, dim=1),
}
return ret剩余还要处理一些用于显示eval进度和最后输出画面的步骤。这里不详述了。