#include <chrono> #include <functional> #include <memory> #include "rclcpp/rclcpp.hpp" #include "std_srvs/srv/trigger.hpp" #include "geometry_msgs/msg/pose_stamped.hpp" using namespace std::chrono_literals; class ShelfDetector : public rclcpp::Node { public: ShelfDetector() : Node("shelf_detector") { // 创建服务响应上位机请求 service_ = create_service<std_srvs::srv::Trigger>( "detect_shelf", std::bind(&ShelfDetector::handle_detect_request, this, std::placeholders::_1, std::placeholders::_2)); // 创建货架位姿发布者 pose_publisher_ = create_publisher<geometry_msgs::msg::PoseStamped>("shelf_pose", 10); RCLCPP_INFO(get_logger(), "Shelf Detector ready. Waiting for detection request..."); } private: void handle_detect_request( const std::shared_ptr<std_srvs::srv::Trigger::Request> request, std::shared_ptr<std_srvs::srv::Trigger::Response> response) { (void)request; // 避免未使用参数警告 RCLCPP_INFO(get_logger(), "Received detection request. Starting shelf detection..."); // 模拟检测过程(实际应替换为真实检测逻辑) bool detection_success = false; auto start_time = now(); rclcpp::Rate loop_rate(10); // 10Hz检测频率 // 循环等待检测结果(最多5秒) while (rclcpp::ok() && (now() - start_time) < 5s) { // 模拟检测逻辑(真实场景使用传感器数据) if ((now() - start_time) > 2s) { // 2秒后"检测到"货架 detection_success = true; break; } RCLCPP_INFO_THROTTLE(get_logger(), *get_clock(), 500, "Detecting shelf..."); loop_rate.sleep(); } if (detection_success) { // 发布检测到的货架位姿 auto pose = geometry_msgs::msg::PoseStamped(); pose.header.stamp = now(); pose.header.frame_id = "map"; pose.pose.position.x = 1.5; pose.pose.position.y = 3.2; pose.pose.orientation.w = 1.0; pose_publisher_->publish(pose); response->success = true; response->message = "Shelf detected at x:1.5, y:3.2"; RCLCPP_INFO(get_logger(), "Detection succeeded. Pose published."); } else { response->success = false; response->message = "Detection timeout"; RCLCPP_ERROR(get_logger(), "Detection failed!"); } } rclcpp::Service<std_srvs::srv::Trigger>::SharedPtr service_; rclcpp::Publisher<geometry_msgs::msg::PoseStamped>::SharedPtr pose_publisher_; }; int main(int argc, char **argv) { rclcpp::init(argc, argv); auto node = std::make_shared<ShelfDetector>(); rclcpp::spin(node); rclcpp::shutdown(); return 0; }基于该段代码,扩充将laserscan转换为pointcloud后重新发布,并通过订阅转换后的点云在rviz中查看,增加实时位姿/fusion_pose的订阅,并将最近位姿存储即latest_pose = *msg,增加收到客户端调用时先回复收到请求,然后performShelfDetection计算货架中心位姿并发布,订阅后在rviz中可视化
时间: 2025-08-01 19:42:42 AIGC 浏览: 38
<think>我们正在构建一个ROS2节点,需要实现以下功能:1.将LaserScan数据转换为PointCloud并发布到新话题。2.订阅该点云话题并在RViz中显示(这一步主要是用户操作,但我们需要确保发布点云话题)。3.订阅/fusion_pose话题以获取实时位姿,并保存最新的位姿数据。4.在服务回调函数中,先发送一个响应表示已收到请求,然后执行货架检测算法计算货架中心位姿,最后将结果发布并在RViz中可视化。根据引用[1]和[2]的内容,我们知道在ROS1中可以通过创建一个节点来订阅/scan并转换为PointCloud或PointCloud2。在ROS2中,我们可以使用类似的思路。具体步骤:1.首先,我们需要一个转换器将LaserScan转换为PointCloud。在ROS2中,我们可以使用`laser_geometry`包,但注意ROS2的`laser_geometry`可能没有直接提供转换功能(或者需要从ROS1移植)。不过,我们可以自己实现转换,因为转换关系是明确的:将每个激光点的距离和角度转换为笛卡尔坐标。2.发布转换后的点云到新话题,比如`/laser_pointcloud`。3.订阅`/fusion_pose`话题(假设是`geometry_msgs/msg/PoseStamped`类型),并保存最新的位姿(用一个成员变量存储,并注意线程安全,因为回调是并发的)。4.创建一个服务,当服务被调用时,先立即回复一个响应(表示请求已收到),然后执行货架检测算法(这个算法我们假设需要用到最近的点云和最近的位姿),计算货架中心位姿,最后将结果发布(比如发布一个`geometry_msgs/msg/PoseStamped`)并在RViz中可视化(可以通过发布Marker等方式)。由于ROS2中节点和话题的管理与ROS1有所不同,我们将使用rclpy来编写节点。代码结构:-导入必要的模块(rclpy,sensor_msgs.msg,geometry_msgs.msg,以及自定义的服务消息等)-创建一个类,比如叫做`ShelfDetectionNode`,继承自`rclpy.node.Node`。-在初始化中:a.创建一个订阅者订阅`/scan`话题(类型`sensor_msgs.msg.LaserScan`),设置回调函数`scan_callback`。b.创建一个发布者,发布点云到`/laser_pointcloud`(类型`sensor_msgs.msg.PointCloud`或`PointCloud2`,这里根据需求选择。注意:ROS2中更常用PointCloud2,但原要求是PointCloud。不过,PointCloud在ROS2中已不推荐使用,所以我们可以用PointCloud2。但原问题要求是PointCloud,所以需要确认。根据引用[1]和[2],他们分别转换成了PointCloud和PointCloud2。这里为了通用性,我们使用PointCloud2,因为PointCloud2是ROS2中标准的点云格式)。-如果使用PointCloud2,则需要用到`laser_geometry`包中的`LaserProjection`,但ROS2的`laser_geometry`包可能不可用。因此,我们可以自己写转换函数。c.创建一个订阅者订阅`/fusion_pose`(类型`geometry_msgs.msg.PoseStamped`),回调函数`pose_callback`更新最新的位姿。d.创建一个服务,用于触发货架检测(服务类型需要自定义,比如`example_interfaces/srv/Trigger`,或者自定义一个空的服务,因为只需要触发,不需要参数。但响应部分需要先回复,然后异步处理,所以服务类型应该有一个空请求和一个立即回复的响应,然后节点再异步处理并发布结果。这里我们可以自定义一个服务,例如`Empty.srv`,其中请求为空,响应为一个字符串表示已收到请求)。e.创建发布者,用于发布检测到的货架中心位姿(类型`geometry_msgs.msg.PoseStamped`)和用于RViz可视化的Marker(类型`visualization_msgs.msg.Marker`)等。注意:由于服务回调中需要执行耗时操作(货架检测),为了避免阻塞服务回调(进而阻塞ROS2的executor),我们可以在服务回调中启动一个线程来处理检测任务,或者使用异步方式。但是,ROS2的回调默认是单线程的(除非使用MultiThreadedExecutor),所以我们可以将检测任务放到另一个线程中。另一种方法是:在服务回调中,先发送响应,然后将检测任务放入一个异步任务(比如通过一个Future)或者使用一个定时器来触发后续处理。这里我们采用先发送响应,然后利用节点的executor来调度检测任务。但是,由于检测任务可能耗时,我们最好使用单独的线程处理,避免阻塞其他回调。因此,我们可以在服务回调中:-立即发送响应(表示已收到请求)。-然后,将检测任务放入一个线程池或启动一个新线程来执行检测(注意:访问共享数据如最新位姿和点云时需要加锁)。由于我们只需要存储最新的位姿和最新的点云(或者一定时间内的点云?根据需求,这里我们只保存最新的点云),所以我们可以:-在`scan_callback`中保存最新的点云(同样用一个成员变量存储,并加锁保护)。-在`pose_callback`中保存最新的位姿(加锁保护)。然后,在检测线程中,我们获取这两个数据(注意:获取时加锁,并且尽量缩短加锁时间)进行货架检测。货架检测算法:问题中未给出具体算法,我们假设它已经由其他部分实现,我们只需要调用即可。因此,我们用一个函数`detect_shelf`来表示,该函数输入是点云和当前位姿(可能用于坐标变换?),输出是货架中心的位姿(一个PoseStamped)。最后,发布这个位姿,并发布一个Marker(例如一个箭头或立方体)在RViz中显示货架中心位置。实现细节:1.点云转换:由于ROS2没有现成的`laser_geometry`,我们可以自己实现一个转换函数,将`LaserScan`转换为`PointCloud2`。转换方法如下:-创建一个`PointCloud2`消息。-设置header(frame_id和stamp)与LaserScan相同。-定义点云字段(x,y,z,intensity等,这里我们只需要x,y,z)。-遍历LaserScan中的每个距离值,根据角度计算出每个点的x,y(z设为0,因为是2D激光)。-注意:无效的距离值(小于range_min或大于range_max)跳过。2.由于我们同时需要点云和位姿,且它们可能来自不同的回调,所以我们需要确保在检测时使用的点云和位姿是同一时刻的?但是,这里需求是存储最近的位姿和点云,所以可能不是严格同步的。如果要求同步,可能需要使用消息过滤器(message_filters)进行时间同步,但问题中只要求存储最近的,所以我们就用最新的。3.服务类型:我们可以使用一个空的服务。例如,创建一个`srv/Trigger.srv`,内容为:```---stringmessage```请求为空,响应为字符串。这样,在服务回调中,我们可以立即回复一个响应,然后进行检测。4.线程安全:使用互斥锁(`threading.Lock`)来保护最新的位姿和点云数据。5.发布检测结果:检测完成后,发布货架中心的位姿(到某个话题,比如`/shelf_pose`)和一个Marker(到`/shelf_marker`)用于可视化。代码实现步骤:首先,创建一个ROS2包,并添加依赖:rclpy,sensor_msgs,geometry_msgs,visualization_msgs,以及自定义服务消息所需的依赖(如example_interfaces或自定义srv)。然后,编写节点代码。由于代码较长,下面给出一个框架:</think>基于您的需求,我将分步骤说明如何在ROS2节点中实现所需功能,并提供关键代码实现。以下是完整解决方案:
### 1. LaserScan转PointCloud实现
```python
from sensor_msgs.msg import LaserScan, PointCloud2
from sensor_msgs_py import point_cloud2
import numpy as np
class ShelfDetectionNode(Node):
def __init__(self):
super().__init__('shelf_detection_node')
# 创建LaserScan订阅
self.scan_sub = self.create_subscription(
LaserScan,
'/scan',
self.scan_callback,
10
)
# 创建PointCloud发布器
self.pointcloud_pub = self.create_publisher(
PointCloud2,
'/laser_pointcloud',
10
)
self.latest_scan = None
def scan_callback(self, msg):
# 保存最新扫描数据
self.latest_scan = msg
# 转换为PointCloud2
points = []
angle = msg.angle_min
for r in msg.ranges:
if msg.range_min <= r <= msg.range_max:
# 极坐标转笛卡尔坐标
x = r * np.cos(angle)
y = r * np.sin(angle)
z = 0.0 # 2D激光雷达z=0
points.append([x, y, z])
angle += msg.angle_increment
# 创建PointCloud2消息
header = msg.header
header.frame_id = "laser_frame" # 确保与TF树一致
pc2_msg = point_cloud2.create_cloud_xyz32(header, points)
self.pointcloud_pub.publish(pc2_msg)
```
### 2. 位姿订阅与存储
```python
from geometry_msgs.msg import PoseStamped
class ShelfDetectionNode(Node):
def __init__(self):
# ...其他初始化...
# 位姿订阅
self.pose_sub = self.create_subscription(
PoseStamped,
'/fusion_pose',
self.pose_callback,
10
)
self.latest_pose = None
self.pose_lock = threading.Lock() # 线程安全锁
def pose_callback(self, msg):
with self.pose_lock:
self.latest_pose = msg
```
### 3. 服务回调与货架检测
```python
from example_interfaces.srv import Trigger
from visualization_msgs.msg import Marker
class ShelfDetectionNode(Node):
def __init__(self):
# ...其他初始化...
# 创建服务
self.srv = self.create_service(
Trigger,
'detect_shelf',
self.detect_shelf_callback
)
# 结果发布器
self.shelf_pose_pub = self.create_publisher(PoseStamped, '/shelf_pose', 10)
self.marker_pub = self.create_publisher(Marker, '/shelf_marker', 10)
def detect_shelf_callback(self, request, response):
# 立即响应
response.success = True
response.message = "Detection started"
# 异步执行检测(避免阻塞服务)
threading.Thread(target=self.execute_detection).start()
return response
def execute_detection(self):
# 获取当前数据(线程安全)
with self.pose_lock:
current_pose = self.latest_pose
current_scan = self.latest_scan
# 货架检测算法(伪代码)
shelf_pose = self.detect_shelf(current_scan, current_pose)
# 发布结果
self.shelf_pose_pub.publish(shelf_pose)
self.publish_marker(shelf_pose)
def detect_shelf(self, scan, pose):
# 实际检测逻辑实现
# 示例:基于点云的聚类和几何特征检测
# 返回geometry_msgs.msg.PoseStamped
pass
def publish_marker(self, pose):
marker = Marker()
marker.header = pose.header
marker.type = Marker.CUBE
marker.pose = pose.pose
marker.scale.x = 0.5
marker.scale.y = 0.3
marker.scale.z = 1.0
marker.color.a = 0.8
marker.color.r = 0.0
marker.color.g = 1.0
marker.color.b = 0.0
self.marker_pub.publish(marker)
```
### 4. RViz可视化配置
在RViz中添加以下显示项:
1. **PointCloud2**:订阅`/laser_pointcloud`话题
2. **Pose**:订阅`/shelf_pose`话题
3. **Marker**:订阅`/shelf_marker`话题
4. **TF**:确保`laser_frame`到`map`的变换可用
### 关键实现说明
1. **坐标转换**:
- 激光数据需转换到与位姿相同的坐标系(如`map`)
- 使用`tf2_ros`进行坐标变换:
```python
from tf2_ros import Buffer, TransformListener
self.tf_buffer = Buffer()
self.tf_listener = TransformListener(self.tf_buffer, self)
transform = self.tf_buffer.lookup_transform('map', 'laser_frame', rclpy.time.Time())
```
2. **线程安全**:
- 使用`threading.Lock`保护共享数据(位姿)
- 服务回调中异步执行检测避免阻塞
3. **货架检测优化**:
```python
def detect_shelf(self, scan, pose):
# 1. 点云聚类 (DBSCAN示例)
from sklearn.cluster import DBSCAN
clusters = DBSCAN(eps=0.2).fit(points)
# 2. 筛选货架特征(矩形几何特征)
for cluster in clusters:
if is_rectangle(cluster):
# 3. 计算中心位姿(相对地图坐标系)
center = compute_center(cluster)
return transform_to_map(center, pose)
```
### 部署流程
1. 安装依赖:
```bash
sudo apt install ros-${ROS_DISTRO}-sensor-msgs-py
pip install scikit-learn
```
2. 运行节点:
```bash
ros2 run your_package shelf_detection_node
```
3. 触发检测:
```bash
ros2 service call /detect_shelf example_interfaces/srv/Trigger
```
### 注意事项
1. 确保所有传感器数据的坐标系在TF树中正确关联
2. 调整点云转换参数以匹配您的激光雷达特性
3. 根据实际货架特征优化检测算法中的几何判断条件
4. 使用`ros2 topic echo /shelf_pose`验证输出结果
> 此实现参考了ROS中激光数据转换的标准方法[^1][^2],并结合了点云处理技术[^4]和ROS2服务异步处理的最佳实践。检测算法部分需根据实际货架特征进行优化,建议先用仿真环境验证[^2][^5]。
阅读全文
相关推荐












