DDS編程示例
我們嘗試在代碼中配置DDS,以之前Hello World話題通信為例。
運行效果
啟動兩個終端,分別運行發(fā)布者和訂閱者節(jié)點:
$ ros2 run learning_qos qos_helloworld_pub
$ ros2 run learning_qos qos_helloworld_sub
可以看到兩個終端中的通信效果如下,和之前貌似并沒有太大區(qū)別。
看效果確實差不多,不過底層通信機(jī)理上可是有所不同的。
發(fā)布者代碼解析
我們看下在代碼中,如果加入QoS的配置。
learning_qos/qos_helloworld_pub.py
#!/usr/bin/env python3# -*- coding: utf-8 -*-"""@作者: 古月居(www.guyuehome.com)@說明: ROS2 QoS示例-發(fā)布“Hello World”話題"""import rclpy # ROS2 Python接口庫from rclpy.node import Node # ROS2 節(jié)點類from std_msgs.msg import String # 字符串消息類型from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy # ROS2 QoS類"""創(chuàng)建一個發(fā)布者節(jié)點"""class PublisherNode(Node): def __init__(self, name): super().__init__(name) # ROS2節(jié)點父類初始化 qos_profile = QoSProfile( # 創(chuàng)建一個QoS原則 # reliability=QoSReliabilityPolicy.BEST_EFFORT, reliability=QoSReliabilityPolicy.RELIABLE, history=QoSHistoryPolicy.KEEP_LAST, depth=1 ) self.pub = self.create_publisher(String, "chatter", qos_profile) # 創(chuàng)建發(fā)布者對象(消息類型、話題名、QoS原則) self.timer = self.create_timer(0.5, self.timer_callback) # 創(chuàng)建一個定時器(單位為秒的周期,定時執(zhí)行的回調(diào)函數(shù)) def timer_callback(self): # 創(chuàng)建定時器周期執(zhí)行的回調(diào)函數(shù) msg = String() # 創(chuàng)建一個String類型的消息對象 msg.data = 'Hello World' # 填充消息對象中的消息數(shù)據(jù) self.pub.publish(msg) # 發(fā)布話題消息 self.get_logger().info('Publishing: "%s"' % msg.data)# 輸出日志信息,提示已經(jīng)完成話題發(fā)布def main(args=None): # ROS2節(jié)點主入口main函數(shù) rclpy.init(args=args) # ROS2 Python接口初始化 node = PublisherNode("qos_helloworld_pub") # 創(chuàng)建ROS2節(jié)點對象并進(jìn)行初始化 rclpy.spin(node) # 循環(huán)等待ROS2退出 node.destroy_node() # 銷毀節(jié)點對象 rclpy.shutdown() # 關(guān)閉ROS2 Python接口
完成代碼的編寫后需要設(shè)置功能包的編譯選項,讓系統(tǒng)知道Python程序的入口,打開功能包的setup.py文件,加入如下入口點的配置:
entry_points={ 'console_scripts': [ 'qos_helloworld_pub = learning_qos.qos_helloworld_pub:main',},
訂閱者代碼解析
訂閱者中的QoS配置和發(fā)布者類似。
learning_qos/qos_helloworld_sub.py
#!/usr/bin/env python3# -*- coding: utf-8 -*-"""@作者: 古月居@說明: ROS2 QoS示例-訂閱“Hello World”話題消息"""import rclpy # ROS2 Python接口庫
from rclpy.node import Node # ROS2 節(jié)點類
from std_msgs.msg import String # ROS2標(biāo)準(zhǔn)定義的String消息from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy # ROS2 QoS類"""創(chuàng)建一個訂閱者節(jié)點"""class SubscriberNode(Node): def __init__(self, name): super().__init__(name) # ROS2節(jié)點父類初始化 qos_profile = QoSProfile( # 創(chuàng)建一個QoS原則 # reliability=QoSReliabilityPolicy.BEST_EFFORT, reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST, depth=1 ) self.sub = self.create_subscription( String, "chatter", self.listener_callback, qos_profile) # 創(chuàng)建訂閱者對象(消息類型、話題名、訂閱者回調(diào)函數(shù)、QoS原則)
def listener_callback(self, msg): # 創(chuàng)建回調(diào)函數(shù),執(zhí)行收到話題消息后對數(shù)據(jù)的處理 self.get_logger().info('I heard: "%s"' % msg.data) # 輸出日志信息,提示訂閱收到的話題消息def main(args=None): # ROS2節(jié)點主入口main函數(shù) rclpy.init(args=args) # ROS2 Python接口初始化 node = SubscriberNode("qos_helloworld_sub") # 創(chuàng)建ROS2節(jié)點對象并進(jìn)行初始化 rclpy.spin(node) # 循環(huán)等待ROS2退出
node.destroy_node() # 銷毀節(jié)點對象 rclpy.shutdown() # 關(guān)閉ROS2 Python接口
完成代碼的編寫后需要設(shè)置功能包的編譯選項,讓系統(tǒng)知道Python程序的入口,打開功能包的setup.py文件,加入如下入口點的配置:
entry_points={ 'console_scripts': [ 'qos_helloworld_pub = learning_qos
-
機(jī)器人
+關(guān)注
關(guān)注
211文章
28390瀏覽量
206947 -
通信
+關(guān)注
關(guān)注
18文章
6029瀏覽量
135954 -
DDS
+關(guān)注
關(guān)注
21文章
633瀏覽量
152635 -
代碼
+關(guān)注
關(guān)注
30文章
4780瀏覽量
68535
發(fā)布評論請先 登錄
相關(guān)推薦
評論