260321
通过我们之前的学习,我们已经可以自行创建对应的包了。
接下来我们自己来创建一个简单的发布者和订阅者吧。
官方教程中的发布者是提供了代码可供下载的,我们可以通过 wget来下载对应的代码:
jese--ki@KiBall:~/Projects/learn/ros2/pub_sub/src/py_pubsub/py_pubsub$wget https://raw.githubusercontent.com/ros2/examples/kilted/rclpy/topics/minimal_publisher/examples_rclpy_minimal_publisher/publisher_member_function.py
--2026-03-21 13:37:03-- https://raw.githubusercontent.com/ros2/examples/kilted/rclpy/topics/minimal_publisher/examples_rclpy_minimal_publisher/publisher_member_function.py
正在解析主机 raw.githubusercontent.com (raw.githubusercontent.com)... 172.29.0.36
正在连接 raw.githubusercontent.com (raw.githubusercontent.com)|172.29.0.36|:443... 已连接。
已发出 HTTP 请求,正在等待回应... 200 OK
长度: 1526 (1.5K) [text/plain]
正在保存至: ‘publisher_member_function.py’
publisher_member_fu 100%[===================>] 1.49K --.-KB/s 用时 0s
2026-03-21 13:37:04 (39.0 MB/s) - 已保存 ‘publisher_member_function.py’ [1526/1526])
这里我们可以看到在 init.py旁边有了一个新的 publisher_member_function.py 文件:
jese--ki@KiBall:~/Projects/learn/ros2/pub_sub/src/py_pubsub/py_pubsub$ ls
__init__.py publisher_member_function.py py.typed
我们可以用 VSCode 打开其中的内容看看:
# Copyright 2016 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from std_msgs.msg import String
class MinimalPublisher(Node):
def __init__(self):
super().__init__('minimal_publisher')
self.publisher_ = self.create_publisher(String, 'topic', 10)
timer_period = 0.5 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0
def timer_callback(self):
msg = String()
msg.data = 'Hello World: %d' % self.i
self.publisher_.publish(msg)
self.get_logger().info('Publishing: "%s"' % msg.data)
self.i += 1
def main(args=None):
try:
with rclpy.init(args=args):
minimal_publisher = MinimalPublisher()
rclpy.spin(minimal_publisher)
except (KeyboardInterrupt, ExternalShutdownException):
pass
if __name__ == '__main__':
main()
这里的 rclpy, std_msgs等都是 ROS2 官方维护的 python 客户端依赖。
这里可以看到,我们的这个最小发布者,其中有几项关键的方法调用:
self.create_publisher(String, 'topic', 10)
这是直接调用超类的方法来创建一个发布者,该方法的三个参数分别为:msg_type, topic, qos_profile,分别代表消息类型,话题名称以及服务质量。
这里它的消息类型为字符串String, 话题为 topic,服务质量的消息队列为 10。
这里值得注意的是,ROS2 官方提供了一个语法糖,这里 qos_profile允许之间传入 10,其实等价于下面的代码:
my_qos = QoSProfile(
history=HistoryPolicy.KEEP_LAST, # 策略:只保留最新的消息
depth=10, # 深度:队列里最多塞 10 条
reliability=ReliabilityPolicy.RELIABLE, # 可靠性:保证送达 (系统默认)
durability=DurabilityPolicy.VOLATILE # 持久性:阅后即焚 (系统默认)
)
即,直接传入整数的话,该配置项表示队列里最多只塞 10 条消息,而剩下的则为默认:策略为只保留最新的信息,可靠性为保证送达,持久性为阅后即焚。
这样我们就创建了一个发布者。
但是光有发布者还不够,我们还需要关注发布者发布的频率:
timer_period = 0.5 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
这里 timer_period代表了时间间隔,然后通过 self.create_timer这一超类的方法来创建了一个计时器。
这里第一个参数是 timer_period_sec,代表时间间隔,浮点数类型,然后callback则是到时间后触发的回调函数。
我们这里是在下面自定义了一个新的函数:
def timer_callback(self):
msg = String()
msg.data = 'Hello World: %d' % self.i
self.publisher_.publish(msg)
self.get_logger().info('Publishing: "%s"' % msg.data)
self.i += 1
这段函数是使用发布者来主动发送一条字符串消息,并累加次数和打印日志。
我们将该函数作为回调函数传入到self.create_timer后,便会每过 0.5s 就发送一条Hello Wolrd $N的消息到 topic这一话题中,$N是次数累计。
最后就是 main函数中来实例化这一节点:
def main(args=None):
try:
with rclpy.init(args=args):
minimal_publisher = MinimalPublisher()
rclpy.spin(minimal_publisher)
except (KeyboardInterrupt, ExternalShutdownException):
pass
在完成了代码编写后,我们尝试使用 mypy来运行一下...
(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ mypy .
src/py_pubsub/py_pubsub/publisher_member_function.py:19: error: Skipping analyzing "std_msgs.msg": module is installed, but missing library stubs or py.typed marker [import-untyped]
src/py_pubsub/py_pubsub/publisher_member_function.py:19: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports
src/py_pubsub/test/test_xmllint.py:15: error: Skipping analyzing "ament_xmllint.main": module is installed, but missing library stubs or py.typed marker [import-untyped]
src/py_pubsub/test/test_pep257.py:15: error: Skipping analyzing "ament_pep257.main": module is installed, but missing library stubs or py.typed marker [import-untyped]
src/py_pubsub/test/test_mypy.py:15: error: Cannot find implementation or library stub for module named "ament_mypy.main" [import-not-found]
src/py_pubsub/test/test_flake8.py:15: error: Skipping analyzing "ament_flake8.main": module is installed, but missing library stubs or py.typed marker [import-untyped]
src/py_pubsub/test/test_copyright.py:15: error: Skipping analyzing "ament_copyright.main": module is installed, but missing library stubs or py.typed marker [import-untyped]
Found 6 errors in 6 files (checked 8 source files)
会发现它完全识别不了 ROS2 的 rclpy 和 std_msgs等依赖。
这是因为这些代码都是通过执行 colcon build 时,通过一个叫 rosidl_generator_py 的 C++ 工具,实时读取 .msg 文件在内存里动态绑定生成的 C 扩展,代码是动态生成的也就没有源码作为静态存根,mypy根本无从分析。
因此我们能做的便是选择性的进行忽略...
[[tool.mypy.overrides]]
module = [
"std_msgs.*",
"rclpy.*",
"geometry_msgs.*",
"sensor_msgs.*",
"ament_xmllint.*",
"ament_pep257.*",
"ament_mypy.*",
"ament_flake8.*",
"ament_copyright.*"
]
ignore_missing_imports = true
这会忽略掉 ROS2 中常见的特殊依赖,从而避免 mypy的报错。
同时,因为这两个依赖的特殊性,我们也需要将它们放入到 package.xml中,而非作为常规的 python 以来放在 pyproject.toml中:
<exec_depend>rclpy</exec_depend>
<exec_depend>std_msgs</exec_depend>
然后在 setup.py的 entry_points中添加这个入口:
entry_points={
'console_scripts': [
'talker = py_pubsub.publisher_member_function:main'
],
},
确保我们的 setup.cfg正确:
[develop]
script_dir=$base/lib/py_pubsub
[install]
install_scripts=$base/lib/py_pubsub
这会确保在构建后将我们的文件复制到 install/lib目录下,当然,我们也可以使用 --symlink-install来仅进行链接而非进行复制。
现在我们就可以通过下面的命令来构建并运行了:
colcon build --symlink-install \
source install/setup.sh \
ros2 run py_pubsub talker
效果如下:
(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ colcon build --symlink-install
Starting >>> py_pubsub
Finished <<< py_pubsub [0.95s]
Summary: 1 package finished [1.09s]
(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ source install/setup.sh
(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ ros2 run py_pubsub talker
[INFO] [1774076582.551990025] [minimal_publisher]: Publishing: "Hello World: 0"
[INFO] [1774076583.043444520] [minimal_publisher]: Publishing: "Hello World: 1"
[INFO] [1774076583.543407498] [minimal_publisher]: Publishing: "Hello World: 2"
[INFO] [1774076584.043507654] [minimal_publisher]: Publishing: "Hello World: 3"
当然,只有发布的节点是远远不够的,我们还需要订阅的节点。
我们可以通过下面的链接来 wget 下载得到订阅节点的代码:
jese--ki@KiBall:~/Projects/learn/ros2/pub_sub/src/py_pubsub/py_pubsub$ wget https://raw.githubusercontent.com/ros2/examples/kilted/rclpy/topics/minimal_subscriber/examples_rclpy_minimal_subscriber/subscriber_member_function.py
--2026-03-21 15:07:20-- https://raw.githubusercontent.com/ros2/examples/kilted/rclpy/topics/minimal_subscriber/examples_rclpy_minimal_subscriber/subscriber_member_function.py
正在解析主机 raw.githubusercontent.com (raw.githubusercontent.com)... 172.29.0.36
正在连接 raw.githubusercontent.com (raw.githubusercontent.com)|172.29.0.36|:443... 已连接。
已发出 HTTP 请求,正在等待回应... 200 OK
长度: 1418 (1.4K) [text/plain]
正在保存至: ‘subscriber_member_function.py’
subscriber_member_function.py 100%[=======================================================================================================>] 1.38K --.-KB/s 用时 0s
2026-03-21 15:07:21 (39.3 MB/s) - 已保存 ‘subscriber_member_function.py’ [1418/1418])
订阅者的代码就相对来说简单很多了:
# Copyright 2016 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from std_msgs.msg import String
class MinimalSubscriber(Node):
def __init__(self):
super().__init__('minimal_subscriber')
self.subscription = self.create_subscription(
String,
'topic',
self.listener_callback,
10)
self.subscription # prevent unused variable warning
def listener_callback(self, msg):
self.get_logger().info('I heard: "%s"' % msg.data)
def main(args=None):
try:
with rclpy.init(args=args):
minimal_subscriber = MinimalSubscriber()
rclpy.spin(minimal_subscriber)
except (KeyboardInterrupt, ExternalShutdownException):
pass
if __name__ == '__main__':
main()
直接通过 self.create_subscription来创建一个订阅者就可以了,类似的,这四个参数分别代表msg_type, topic, callback和 qos_profile。
编辑 setup.py中的配置文件:
entry_points={
'console_scripts': [
'talker = py_pubsub.publisher_member_function:main',
'listener = py_pubsub.subscriber_member_function:main'
],
},
接下来我们便可以进行构建了。
由于经常输入这样的构建命令显得十分麻烦,我们便可以增加一个 Makefile,在其中写入这样的代码:
.PHONY: build dev lint test check
build:
colcon build --symlink-install
这样便可以快捷进行构建了:
(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ make build
colcon build --symlink-install
Starting >>> py_pubsub
Finished <<< py_pubsub [0.82s]
Summary: 1 package finished [0.91s]
现在便可以看到我们有两个可执行文件:
(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ ls install/py_pubsub/lib/py_pubsub/
listener talker
我们先启动监听节点:
(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ ros2 run py_pubsub listener
可以看到没有任何输出。
接着我们启动发布节点:
(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ ros2 run py_pubsub talker
[INFO] [1774078523.781142096] [minimal_publisher]: Publishing: "Hello World: 0"
[INFO] [1774078524.269543087] [minimal_publisher]: Publishing: "Hello World: 1"
[INFO] [1774078524.769698778] [minimal_publisher]: Publishing: "Hello World: 2"
[INFO] [1774078525.269694938] [minimal_publisher]: Publishing: "Hello World: 3"
现在便可以看到订阅节点正常的接收了所有消息:
(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ ros2 run py_pubsub listener
[INFO] [1774078523.781082734] [minimal_subscriber]: I heard: "Hello World: 0"
[INFO] [1774078524.269728636] [minimal_subscriber]: I heard: "Hello World: 1"
[INFO] [1774078524.769766094] [minimal_subscriber]: I heard: "Hello World: 2"
[INFO] [1774078525.269789336] [minimal_subscriber]: I heard: "Hello World: 3"
以上。
在完成了对话题的理解后,我们接下来就可以进入到对服务的学习了。
有了上面的前车之鉴,我们就可以直接在创建包的时候就直接指定要用到的依赖了:
ros2 pkg create --build-type ament_python --license Apache-2.0 py_srvcli --dependencies rclpy example_interfaces
这样我们创建的包中的 package.xml就会自动包含 rclpy和 example_interfaces这两个依赖。
其中,example_interfaces这个依赖会为我们增加我们所需的 .srv文件:
int64 a
int64 b
---
int64 sum
上面是请求格式,下面是返回格式,我们可以看到都是int64。
接下来我们创建一个新的文件 src/py_srvcli/py_srvcli/service_member_function.py,并写入以下代码:
from example_interfaces.srv import AddTwoInts
import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
class MinimalService(Node):
def __init__(self):
super().__init__('minimal_service')
self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback)
def add_two_ints_callback(self, request, response):
response.sum = request.a + request.b
self.get_logger().info('Incoming request\na: %d b: %d' % (request.a, request.b))
return response
def main():
try:
with rclpy.init():
minimal_service = MinimalService()
rclpy.spin(minimal_service)
except (KeyboardInterrupt, ExternalShutdownException):
pass
if __name__ == '__main__':
main()
可以看到,这里注册一个服务的方式依旧非常简单:self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback)
通过超类的 self.create_service,然后传入服务的数据类型(srv_type)、服务名称(srv_name)和服务的回调函数(callback)便可以直接创建一个对应的服务了。
这里我们设置的服务数据类型为 AddTwoInts,服务名称为 add_two_ints,服务的回调函数则为我们创建的加法函数:
def add_two_ints_callback(self, request, response):
response.sum = request.a + request.b
self.get_logger().info('Incoming request\na: %d b: %d' % (request.a, request.b))
return response
就这样,我们创建了一个服务。
将这个服务添加到我们的入口里:
entry_points={
'console_scripts': [
'service = py_srvcli.service_member_function:main',
],
},
为了保证打包的顺利,我们可以运行下面的命令来确保安装了所需的依赖:
rosdep install -i --from-path src --rosdistro kilted -y
然后进行打包,如果我们想只打包这一个包的话,可以使用这样的命令:
colcon build --symlink-install --packages-select py_srvcli
然后 source install/setup.sh后,就可以通过下面的命令进行运行了:
ros2 run py_srvcli service
打开 rqt,就可以看到我们的加法服务了:
我们接下来输入两个数字并使其相加:
这里我们输入 10 和 100,可以看到最终的结果是 110:
同时控制台上也打印了对应的日志:
(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ ros2 run py_srvcli service
[INFO] [1774081387.091999421] [minimal_service]: Incoming request
a: 10 b: 100
除了编写服务的服务端,我们还可以编写该服务的客户端。
我们新增这样的一个文件 src/py_srvcli/py_srvcli/client_member_function.py:
from example_interfaces.srv import AddTwoInts
import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
import random
class MinimalClientAsync(Node):
def __init__(self):
super().__init__('minimal_client_async')
self.cli = self.create_client(AddTwoInts, 'add_two_ints')
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('service not available, waiting again...')
self.req = AddTwoInts.Request()
self.rng = random.Random()
def send_request(self):
self.req.a = self.rng.randint(1, 1000)
self.req.b = self.rng.randint(-1000, -1)
return self.cli.call_async(self.req)
def main(args=None):
try:
with rclpy.init(args=args):
minimal_client = MinimalClientAsync()
future = minimal_client.send_request()
rclpy.spin_until_future_complete(minimal_client, future)
response = future.result()
assert response is not None
minimal_client.get_logger().info(
'Result of add_two_ints: for %d + %d = %d' %
(minimal_client.req.a, minimal_client.req.b, response.sum))
except (KeyboardInterrupt, ExternalShutdownException):
pass
if __name__ == '__main__':
main()
这里我们和 ROS2 官方教程不同的点在于,我们引入了 random库来随机生成整数并作为参数进行传入。
这里客户端因为用到了 spin_until_future_complete这一方法,因此只会执行一次然后就会退出。
将这个客户端添加到入口,然后构建、运行:
(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ ros2 run py_srvcli client
[INFO] [1774082341.253676577] [minimal_client_async]: Result of add_two_ints: for 260 + -48 = 212
可以看到我们正常的得到了我们的响应。
当然,我们也可以发多次,比如我们可以修改 main函数中的代码,通过在实例化节点后增加一个 for循环就可以多次响应了:
def main(args=None):
try:
with rclpy.init(args=args):
minimal_client = MinimalClientAsync()
for i in range(5):
future = minimal_client.send_request()
rclpy.spin_until_future_complete(minimal_client, future)
response = future.result()
assert response is not None
minimal_client.get_logger().info(
'Result of add_two_ints: for %d + %d = %d' %
(minimal_client.req.a, minimal_client.req.b, response.sum))
except (KeyboardInterrupt, ExternalShutdownException):
pass
(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ ros2 run py_srvcli client
[INFO] [1774082868.394516464] [minimal_client_async]: Result of add_two_ints: for 493 + -322 = 171
[INFO] [1774082868.395555659] [minimal_client_async]: Result of add_two_ints: for 524 + -886 = -362
[INFO] [1774082868.396186695] [minimal_client_async]: Result of add_two_ints: for 47 + -339 = -292
[INFO] [1774082868.396819726] [minimal_client_async]: Result of add_two_ints: for 353 + -124 = 229
[INFO] [1774082868.397435404] [minimal_client_async]: Result of add_two_ints: for 515 + -145 = 370
我们也可以参考我们话题发布者,通过增加一个 timer来进行定时且循环的调用服务。
但是由于我们的 call_async是一个异步的方法,直接按照同步的方式去获取响应会直接出现响应为 None的报错。
因此,我们需要用到 add_done_callback这个 Future类的方法来去增加这样的一个回调函数进行解包响应:
def get_result_callback(self, future: Future, a: int, b: int) -> None:
try:
response = future.result()
assert response is not None
self.get_logger().info(f'Result: {a} + {b} = {response.sum}')
except Exception as e:
self.get_logger().error(f'Service call failed: {e}')
这是我们新的 send_request函数,同时将 req 的作用域从全局改到了函数内部,避免由于异步并发造成的内存污染:
def send_request(self) -> None:
req = AddTwoInts.Request()
req.a = self.rng.randint(1, 1000)
req.b = self.rng.randint(-1000, -1)
future = self.cli.call_async(req)
future.add_done_callback(
lambda future_msg: self.get_result_callback(future=future_msg, a=req.a, b=req.b)
)
整个代码就长这样:
from example_interfaces.srv import AddTwoInts, AddTwoInts_Response
import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from rclpy import Future
import random
class MinimalClientAsync(Node):
def __init__(self) -> None:
super().__init__('minimal_client_async')
self.cli = self.create_client(AddTwoInts, 'add_two_ints')
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('service not available, waiting again...')
self.rng = random.Random()
time_perioid = 1
self.timer = self.create_timer(timer_period_sec=time_perioid, callback=self.send_request)
def send_request(self) -> None:
req = AddTwoInts.Request()
req.a = self.rng.randint(1, 1000)
req.b = self.rng.randint(-1000, -1)
future = self.cli.call_async(req)
future.add_done_callback(
lambda future_msg: self.get_result_callback(future=future_msg, a=req.a, b=req.b)
)
def get_result_callback(self, future: Future, a: int, b: int) -> None:
try:
response = future.result()
assert response is not None
self.get_logger().info(f'Result: {a} + {b} = {response.sum}')
except Exception as e:
self.get_logger().error(f'Service call failed: {e}')
def main(args=None):
try:
with rclpy.init(args=args):
minimal_client = MinimalClientAsync()
rclpy.spin(minimal_client)
except (KeyboardInterrupt, ExternalShutdownException):
pass
if __name__ == '__main__':
main()
我们现在运行看看效果:
(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ ros2 run py_srvcli client
[INFO] [1774084191.517621670] [minimal_client_async]: Result: 86 + -781 = -695
[INFO] [1774084192.507794472] [minimal_client_async]: Result: 493 + -385 = 108
[INFO] [1774084193.507892261] [minimal_client_async]: Result: 956 + -944 = 12
[INFO] [1774084194.507889853] [minimal_client_async]: Result: 314 + -480 = -166
[INFO] [1774084195.507741740] [minimal_client_async]: Result: 660 + -852 = -192
[INFO] [1774084196.507707792] [minimal_client_async]: Result: 806 + -405 = 401
[INFO] [1774084197.507797126] [minimal_client_async]: Result: 306 + -597 = -291
[INFO] [1774084198.507713285] [minimal_client_async]: Result: 931 + -442 = 489
[INFO] [1774084199.507710615] [minimal_client_async]: Result: 484 + -641 = -157
[INFO] [1774084200.507786244] [minimal_client_async]: Result: 240 + -824 = -584
以上。