4. SDK接口调用说明
4.1 发布的话题
4.1.1 话题列表
| 话题名称 | 消息类型 | 频率 | 描述 |
|---|---|---|---|
/ruiyan_hand/left/set_angles | sensor_msgs/JointState | 100Hz | 左手关节状态(21个) |
/ruiyan_hand/right/set_angles | sensor_msgs/JointState | 100Hz | 右手关节状态(21个) |
4.1.2 JointState 消息格式
yaml
# sensor_msgs/JointState
header:
stamp: # 时间戳
sec: 1705310450
nanosec: 123456789
frame_id: "left_hand" # 或 "right_hand"
name: # 关节名称列表(21个)
- "left_joint_0" # 拇指指尖
- "left_joint_1" # 拇指关节2
- "left_joint_2" # 拇指关节3
- "left_joint_3" # 拇指关节4
- "left_joint_4" # 拇指关节5
- "left_joint_5" # 食指关节1
# ... (共21个关节)
- "left_joint_20" # 小指关节4
position: # 关节位置(ADC 值:0-4095)
- 2048.0
- 1856.0
- 2304.0
# ... (21个位置值)
velocity: [] # 速度(未使用,为空)
effort: [] # 力矩(未使用,为空)4.1.3 关节索引映射
| 手指 | 关节数量 | 索引范围 | 关节名称 |
|---|---|---|---|
| 拇指 | 5 | 0-4 | left_joint_0 ~ left_joint_4 |
| 食指 | 4 | 5-8 | left_joint_5 ~ left_joint_8 |
| 中指 | 4 | 9-12 | left_joint_9 ~ left_joint_12 |
| 无名指 | 4 | 13-16 | left_joint_13 ~ left_joint_16 |
| 小指 | 4 | 17-20 | left_joint_17 ~ left_joint_20 |
4.2 订阅主手数据的完整流程
4.2.1 Python 订阅器示例
python
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
class PSIGloveSubscriber(Node):
"""PSI 手套数据订阅器节点"""
def __init__(self):
super().__init__('psi_glove_subscriber')
# 创建订阅器
self.subscription = self.create_subscription(
JointState,
'/ruiyan_hand/left/set_angles', # 话题名称
self.joint_state_callback,
10 # QoS 队列深度
)
self.frame_count = 0
self.get_logger().info('PSI 手套订阅器已启动')
def joint_state_callback(self, msg: JointState):
"""
关节状态回调函数
Args:
msg: JointState 消息
"""
self.frame_count += 1
# 每 10 帧打印一次
if self.frame_count % 10 == 0:
self.get_logger().info(f'\n--- 帧 #{self.frame_count} ---')
self.get_logger().info(f'时间戳: {msg.header.stamp.sec}.{msg.header.stamp.nanosec}')
self.get_logger().info(f'坐标系: {msg.header.frame_id}')
self.get_logger().info(f'关节数量: {len(msg.name)}')
# 打印拇指数据(前 5 个关节)
self.get_logger().info('拇指关节位置:')
for i in range(min(5, len(msg.name))):
self.get_logger().info(f' {msg.name[i]}: {msg.position[i]:.3f}')
def main(args=None):
"""主函数"""
# 初始化 ROS2
rclpy.init(args=args)
# 创建订阅器节点
subscriber = PSIGloveSubscriber()
try:
# 保持节点运行
rclpy.spin(subscriber)
except KeyboardInterrupt:
subscriber.get_logger().info('用户中断')
finally:
# 清理
subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()4.2.2 C++ 订阅器示例
cpp
#include <rclcpp/rclcpp.hpp>
#include <sensor_msgs/msg/joint_state.hpp>
class PSIGloveSubscriber : public rclcpp::Node
{
public:
PSIGloveSubscriber() : Node("psi_glove_subscriber"), frame_count_(0)
{
// 创建订阅器
subscription_ = this->create_subscription<sensor_msgs::msg::JointState>(
"/ruiyan_hand/left/set_angles",
10,
std::bind(&PSIGloveSubscriber::joint_state_callback, this, std::placeholders::_1)
);
RCLCPP_INFO(this->get_logger(), "PSI 手套订阅器已启动");
}
private:
void joint_state_callback(const sensor_msgs::msg::JointState::SharedPtr msg)
{
frame_count_++;
// 每 10 帧打印一次
if (frame_count_ % 10 == 0) {
RCLCPP_INFO(this->get_logger(), "\n--- 帧 #%d ---", frame_count_);
RCLCPP_INFO(this->get_logger(), "坐标系: %s", msg->header.frame_id.c_str());
RCLCPP_INFO(this->get_logger(), "关节数量: %zu", msg->name.size());
// 打印拇指数据(前 5 个关节)
RCLCPP_INFO(this->get_logger(), "拇指关节位置:");
for (size_t i = 0; i < std::min(size_t(5), msg->name.size()); ++i) {
RCLCPP_INFO(this->get_logger(), " %s: %.3f",
msg->name[i].c_str(), msg->position[i]);
}
}
}
rclcpp::Subscription<sensor_msgs::msg::JointState>::SharedPtr subscription_;
int frame_count_;
};
int main(int argc, char** argv)
{
rclcpp::init(argc, argv);
auto node = std::make_shared<PSIGloveSubscriber>();
rclcpp::spin(node);
rclcpp::shutdown();
return 0;
}4.3 数据格式和单位
数据单位和范围
- 数据类型:
double(在 ROS2 消息中) - 数值范围:
0.0 - 4095.0(12位 ADC) - 物理含义: 关节传感器的电压值(经过模数转换)
- 数值对应:
0.0: 传感器最小值(手指完全伸直)2048.0: 传感器中间值4095.0: 传感器最大值(手指完全弯曲)
数据处理示例(Python)
python
def normalize_joint(value: float) -> float:
"""将 ADC 值归一化到 [0.0, 1.0]"""
return value / 4095.0
def adc_to_angle(value: float, min_adc: float, max_adc: float,
min_angle: float, max_angle: float) -> float:
"""将 ADC 值映射到角度"""
normalized = (value - min_adc) / (max_adc - min_adc)
return min_angle + normalized * (max_angle - min_angle)
def joint_state_callback(self, msg: JointState):
"""回调函数中处理数据"""
# 归一化拇指第一关节
normalized = normalize_joint(msg.position[0])
self.get_logger().info(f"归一化值: {normalized:.3f}")
# 映射到角度(假设校准范围)
angle = adc_to_angle(msg.position[0], 500.0, 3500.0, 0.0, 90.0)
self.get_logger().info(f"关节角度: {angle:.2f}°")4.4 高级用法示例
4.4.1 同时订阅双手数据
python
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
class DualHandSubscriber(Node):
"""同时订阅双手数据的节点"""
def __init__(self):
super().__init__('dual_hand_subscriber')
# 订阅左手
self.left_sub = self.create_subscription(
JointState,
'/ruiyan_hand/left/set_angles',
self.left_callback,
10
)
# 订阅右手
self.right_sub = self.create_subscription(
JointState,
'/ruiyan_hand/right/set_angles',
self.right_callback,
10
)
self.left_data = None
self.right_data = None
self.get_logger().info('双手订阅器已启动')
def left_callback(self, msg: JointState):
self.left_data = msg
self.process_data()
def right_callback(self, msg: JointState):
self.right_data = msg
self.process_data()
def process_data(self):
"""当双手数据都可用时处理"""
if self.left_data and self.right_data:
left_thumb = self.left_data.position[0]
right_thumb = self.right_data.position[0]
self.get_logger().info(
f"左手拇指: {left_thumb:.1f}, 右手拇指: {right_thumb:.1f}"
)
def main():
rclpy.init()
node = DualHandSubscriber()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()4.4.2 数据记录到 CSV
python
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
import csv
from datetime import datetime
class DataRecorder(Node):
"""记录手套数据到 CSV 文件"""
def __init__(self):
super().__init__('data_recorder')
# 创建 CSV 文件
filename = f"glove_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
self.csv_file = open(filename, 'w', newline='')
self.csv_writer = csv.writer(self.csv_file)
# 写入表头
header = ['timestamp'] + [f'joint_{i}' for i in range(21)]
self.csv_writer.writerow(header)
# 订阅数据
self.subscription = self.create_subscription(
JointState,
'/ruiyan_hand/left/set_angles',
self.callback,
10
)
self.frame_count = 0
self.get_logger().info(f'开始记录数据到 {filename}')
def callback(self, msg: JointState):
"""记录数据"""
self.frame_count += 1
# 写入一行数据
timestamp = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
row = [timestamp] + list(msg.position)
self.csv_writer.writerow(row)
# 每 100 帧打印一次
if self.frame_count % 100 == 0:
self.get_logger().info(f'已记录 {self.frame_count} 帧数据')
def __del__(self):
"""析构函数,关闭文件"""
if hasattr(self, 'csv_file'):
self.csv_file.close()
self.get_logger().info('数据文件已关闭')
def main():
rclpy.init()
node = DataRecorder()
try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('用户中断,保存数据...')
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()