Skip to content

4. SDK接口调用说明

4.1 发布的话题

4.1.1 话题列表

话题名称消息类型频率描述
/ruiyan_hand/left/set_anglessensor_msgs/JointState100Hz左手关节状态(21个)
/ruiyan_hand/right/set_anglessensor_msgs/JointState100Hz右手关节状态(21个)

4.1.2 JointState 消息格式

yaml
# sensor_msgs/JointState
header:
  stamp:              # 时间戳
    sec: 1705310450
    nanosec: 123456789
  frame_id: "left_hand"  # 或 "right_hand"

name:                 # 关节名称列表(21个)
  - "left_joint_0"    # 拇指指尖
  - "left_joint_1"    # 拇指关节2
  - "left_joint_2"    # 拇指关节3
  - "left_joint_3"    # 拇指关节4
  - "left_joint_4"    # 拇指关节5
  - "left_joint_5"    # 食指关节1
  # ... (共21个关节)
  - "left_joint_20"   # 小指关节4

position:             # 关节位置(ADC 值:0-4095)
  - 2048.0
  - 1856.0
  - 2304.0
  # ... (21个位置值)

velocity: []          # 速度(未使用,为空)
effort: []            # 力矩(未使用,为空)

4.1.3 关节索引映射

手指关节数量索引范围关节名称
拇指50-4left_joint_0 ~ left_joint_4
食指45-8left_joint_5 ~ left_joint_8
中指49-12left_joint_9 ~ left_joint_12
无名指413-16left_joint_13 ~ left_joint_16
小指417-20left_joint_17 ~ left_joint_20

4.2 订阅主手数据的完整流程

4.2.1 Python 订阅器示例

python
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState


class PSIGloveSubscriber(Node):
    """PSI 手套数据订阅器节点"""
    
    def __init__(self):
        super().__init__('psi_glove_subscriber')
        
        # 创建订阅器
        self.subscription = self.create_subscription(
            JointState,
            '/ruiyan_hand/left/set_angles',  # 话题名称
            self.joint_state_callback,
            10  # QoS 队列深度
        )
        
        self.frame_count = 0
        self.get_logger().info('PSI 手套订阅器已启动')
    
    def joint_state_callback(self, msg: JointState):
        """
        关节状态回调函数
        
        Args:
            msg: JointState 消息
        """
        self.frame_count += 1
        
        # 每 10 帧打印一次
        if self.frame_count % 10 == 0:
            self.get_logger().info(f'\n--- 帧 #{self.frame_count} ---')
            self.get_logger().info(f'时间戳: {msg.header.stamp.sec}.{msg.header.stamp.nanosec}')
            self.get_logger().info(f'坐标系: {msg.header.frame_id}')
            self.get_logger().info(f'关节数量: {len(msg.name)}')
            
            # 打印拇指数据(前 5 个关节)
            self.get_logger().info('拇指关节位置:')
            for i in range(min(5, len(msg.name))):
                self.get_logger().info(f'  {msg.name[i]}: {msg.position[i]:.3f}')


def main(args=None):
    """主函数"""
    # 初始化 ROS2
    rclpy.init(args=args)
    
    # 创建订阅器节点
    subscriber = PSIGloveSubscriber()
    
    try:
        # 保持节点运行
        rclpy.spin(subscriber)
    except KeyboardInterrupt:
        subscriber.get_logger().info('用户中断')
    finally:
        # 清理
        subscriber.destroy_node()
        rclpy.shutdown()


if __name__ == '__main__':
    main()

4.2.2 C++ 订阅器示例

cpp
#include <rclcpp/rclcpp.hpp>
#include <sensor_msgs/msg/joint_state.hpp>

class PSIGloveSubscriber : public rclcpp::Node
{
public:
  PSIGloveSubscriber() : Node("psi_glove_subscriber"), frame_count_(0)
  {
    // 创建订阅器
    subscription_ = this->create_subscription<sensor_msgs::msg::JointState>(
      "/ruiyan_hand/left/set_angles",
      10,
      std::bind(&PSIGloveSubscriber::joint_state_callback, this, std::placeholders::_1)
    );
    
    RCLCPP_INFO(this->get_logger(), "PSI 手套订阅器已启动");
  }

private:
  void joint_state_callback(const sensor_msgs::msg::JointState::SharedPtr msg)
  {
    frame_count_++;
    
    // 每 10 帧打印一次
    if (frame_count_ % 10 == 0) {
      RCLCPP_INFO(this->get_logger(), "\n--- 帧 #%d ---", frame_count_);
      RCLCPP_INFO(this->get_logger(), "坐标系: %s", msg->header.frame_id.c_str());
      RCLCPP_INFO(this->get_logger(), "关节数量: %zu", msg->name.size());
      
      // 打印拇指数据(前 5 个关节)
      RCLCPP_INFO(this->get_logger(), "拇指关节位置:");
      for (size_t i = 0; i < std::min(size_t(5), msg->name.size()); ++i) {
        RCLCPP_INFO(this->get_logger(), "  %s: %.3f", 
                    msg->name[i].c_str(), msg->position[i]);
      }
    }
  }
  
  rclcpp::Subscription<sensor_msgs::msg::JointState>::SharedPtr subscription_;
  int frame_count_;
};

int main(int argc, char** argv)
{
  rclcpp::init(argc, argv);
  auto node = std::make_shared<PSIGloveSubscriber>();
  rclcpp::spin(node);
  rclcpp::shutdown();
  return 0;
}

4.3 数据格式和单位

数据单位和范围

  • 数据类型: double (在 ROS2 消息中)
  • 数值范围: 0.0 - 4095.0 (12位 ADC)
  • 物理含义: 关节传感器的电压值(经过模数转换)
  • 数值对应:
    • 0.0: 传感器最小值(手指完全伸直)
    • 2048.0: 传感器中间值
    • 4095.0: 传感器最大值(手指完全弯曲)

数据处理示例(Python)

python
def normalize_joint(value: float) -> float:
    """将 ADC 值归一化到 [0.0, 1.0]"""
    return value / 4095.0


def adc_to_angle(value: float, min_adc: float, max_adc: float, 
                 min_angle: float, max_angle: float) -> float:
    """将 ADC 值映射到角度"""
    normalized = (value - min_adc) / (max_adc - min_adc)
    return min_angle + normalized * (max_angle - min_angle)


def joint_state_callback(self, msg: JointState):
    """回调函数中处理数据"""
    # 归一化拇指第一关节
    normalized = normalize_joint(msg.position[0])
    self.get_logger().info(f"归一化值: {normalized:.3f}")
    
    # 映射到角度(假设校准范围)
    angle = adc_to_angle(msg.position[0], 500.0, 3500.0, 0.0, 90.0)
    self.get_logger().info(f"关节角度: {angle:.2f}°")

4.4 高级用法示例

4.4.1 同时订阅双手数据

python
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState


class DualHandSubscriber(Node):
    """同时订阅双手数据的节点"""
    
    def __init__(self):
        super().__init__('dual_hand_subscriber')
        
        # 订阅左手
        self.left_sub = self.create_subscription(
            JointState,
            '/ruiyan_hand/left/set_angles',
            self.left_callback,
            10
        )
        
        # 订阅右手
        self.right_sub = self.create_subscription(
            JointState,
            '/ruiyan_hand/right/set_angles',
            self.right_callback,
            10
        )
        
        self.left_data = None
        self.right_data = None
        
        self.get_logger().info('双手订阅器已启动')
    
    def left_callback(self, msg: JointState):
        self.left_data = msg
        self.process_data()
    
    def right_callback(self, msg: JointState):
        self.right_data = msg
        self.process_data()
    
    def process_data(self):
        """当双手数据都可用时处理"""
        if self.left_data and self.right_data:
            left_thumb = self.left_data.position[0]
            right_thumb = self.right_data.position[0]
            
            self.get_logger().info(
                f"左手拇指: {left_thumb:.1f}, 右手拇指: {right_thumb:.1f}"
            )


def main():
    rclpy.init()
    node = DualHandSubscriber()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

4.4.2 数据记录到 CSV

python
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
import csv
from datetime import datetime


class DataRecorder(Node):
    """记录手套数据到 CSV 文件"""
    
    def __init__(self):
        super().__init__('data_recorder')
        
        # 创建 CSV 文件
        filename = f"glove_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
        self.csv_file = open(filename, 'w', newline='')
        self.csv_writer = csv.writer(self.csv_file)
        
        # 写入表头
        header = ['timestamp'] + [f'joint_{i}' for i in range(21)]
        self.csv_writer.writerow(header)
        
        # 订阅数据
        self.subscription = self.create_subscription(
            JointState,
            '/ruiyan_hand/left/set_angles',
            self.callback,
            10
        )
        
        self.frame_count = 0
        self.get_logger().info(f'开始记录数据到 {filename}')
    
    def callback(self, msg: JointState):
        """记录数据"""
        self.frame_count += 1
        
        # 写入一行数据
        timestamp = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
        row = [timestamp] + list(msg.position)
        self.csv_writer.writerow(row)
        
        # 每 100 帧打印一次
        if self.frame_count % 100 == 0:
            self.get_logger().info(f'已记录 {self.frame_count} 帧数据')
    
    def __del__(self):
        """析构函数,关闭文件"""
        if hasattr(self, 'csv_file'):
            self.csv_file.close()
            self.get_logger().info('数据文件已关闭')


def main():
    rclpy.init()
    node = DataRecorder()
    
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        node.get_logger().info('用户中断,保存数据...')
    finally:
        node.destroy_node()
        rclpy.shutdown()


if __name__ == '__main__':
    main()