260321

通过我们之前的学习,我们已经可以自行创建对应的包了。

接下来我们自己来创建一个简单的发布者和订阅者吧。

官方教程中的发布者是提供了代码可供下载的,我们可以通过 wget​来下载对应的代码:

jese--ki@KiBall:~/Projects/learn/ros2/pub_sub/src/py_pubsub/py_pubsub$wget https://raw.githubusercontent.com/ros2/examples/kilted/rclpy/topics/minimal_publisher/examples_rclpy_minimal_publisher/publisher_member_function.py
--2026-03-21 13:37:03--  https://raw.githubusercontent.com/ros2/examples/kilted/rclpy/topics/minimal_publisher/examples_rclpy_minimal_publisher/publisher_member_function.py
正在解析主机 raw.githubusercontent.com (raw.githubusercontent.com)... 172.29.0.36
正在连接 raw.githubusercontent.com (raw.githubusercontent.com)|172.29.0.36|:443... 已连接。
已发出 HTTP 请求,正在等待回应... 200 OK
长度: 1526 (1.5K) [text/plain]
正在保存至: ‘publisher_member_function.py’

publisher_member_fu 100%[===================>]   1.49K  --.-KB/s    用时 0s    

2026-03-21 13:37:04 (39.0 MB/s) - 已保存 ‘publisher_member_function.py’ [1526/1526])

这里我们可以看到在 init.py​旁边有了一个新的 publisher_member_function.py​ 文件:

jese--ki@KiBall:~/Projects/learn/ros2/pub_sub/src/py_pubsub/py_pubsub$ ls
__init__.py  publisher_member_function.py  py.typed

我们可以用 VSCode 打开其中的内容看看:

# Copyright 2016 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node

from std_msgs.msg import String


class MinimalPublisher(Node):

    def __init__(self):
        super().__init__('minimal_publisher')
        self.publisher_ = self.create_publisher(String, 'topic', 10)
        timer_period = 0.5  # seconds
        self.timer = self.create_timer(timer_period, self.timer_callback)
        self.i = 0

    def timer_callback(self):
        msg = String()
        msg.data = 'Hello World: %d' % self.i
        self.publisher_.publish(msg)
        self.get_logger().info('Publishing: "%s"' % msg.data)
        self.i += 1


def main(args=None):
    try:
        with rclpy.init(args=args):
            minimal_publisher = MinimalPublisher()

            rclpy.spin(minimal_publisher)
    except (KeyboardInterrupt, ExternalShutdownException):
        pass


if __name__ == '__main__':
    main()

这里的 rclpy​, std_msgs​等都是 ROS2 官方维护的 python 客户端依赖。

这里可以看到,我们的这个最小发布者,其中有几项关键的方法调用:

  • ​self.create_publisher(String, 'topic', 10)​

这是直接调用超类的方法来创建一个发布者,该方法的三个参数分别为:msg_type​, topic​, qos_profile​,分别代表消息类型,话题名称以及服务质量。

这里它的消息类型为字符串String​, 话题为 topic​,服务质量的消息队列为 10​。

这里值得注意的是,ROS2 官方提供了一个语法糖,这里 qos_profile​允许之间传入 10,其实等价于下面的代码:

my_qos = QoSProfile(
    history=HistoryPolicy.KEEP_LAST,  # 策略:只保留最新的消息
    depth=10,                         # 深度:队列里最多塞 10 条
    reliability=ReliabilityPolicy.RELIABLE, # 可靠性:保证送达 (系统默认)
    durability=DurabilityPolicy.VOLATILE    # 持久性:阅后即焚 (系统默认)
)

即,直接传入整数的话,该配置项表示队列里最多只塞 10 条消息,而剩下的则为默认:策略为只保留最新的信息,可靠性为保证送达,持久性为阅后即焚。

这样我们就创建了一个发布者。

但是光有发布者还不够,我们还需要关注发布者发布的频率:

        timer_period = 0.5  # seconds
        self.timer = self.create_timer(timer_period, self.timer_callback)

这里 timer_period​代表了时间间隔,然后通过 self.create_timer​这一超类的方法来创建了一个计时器。

这里第一个参数是 timer_period_sec​,代表时间间隔,浮点数类型,然后callback​则是到时间后触发的回调函数。

我们这里是在下面自定义了一个新的函数:

    def timer_callback(self):
        msg = String()
        msg.data = 'Hello World: %d' % self.i
        self.publisher_.publish(msg)
        self.get_logger().info('Publishing: "%s"' % msg.data)
        self.i += 1

这段函数是使用发布者来主动发送一条字符串消息,并累加次数和打印日志。

我们将该函数作为回调函数传入到self.create_timer​后,便会每过 0.5s 就发送一条Hello Wolrd $N​的消息到 topic​这一话题中,$N​是次数累计。

最后就是 main​函数中来实例化这一节点:

def main(args=None):
    try:
        with rclpy.init(args=args):
            minimal_publisher = MinimalPublisher()

            rclpy.spin(minimal_publisher)
    except (KeyboardInterrupt, ExternalShutdownException):
        pass

在完成了代码编写后,我们尝试使用 mypy​来运行一下...

(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ mypy .
src/py_pubsub/py_pubsub/publisher_member_function.py:19: error: Skipping analyzing "std_msgs.msg": module is installed, but missing library stubs or py.typed marker  [import-untyped]
src/py_pubsub/py_pubsub/publisher_member_function.py:19: note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports
src/py_pubsub/test/test_xmllint.py:15: error: Skipping analyzing "ament_xmllint.main": module is installed, but missing library stubs or py.typed marker  [import-untyped]
src/py_pubsub/test/test_pep257.py:15: error: Skipping analyzing "ament_pep257.main": module is installed, but missing library stubs or py.typed marker  [import-untyped]
src/py_pubsub/test/test_mypy.py:15: error: Cannot find implementation or library stub for module named "ament_mypy.main"  [import-not-found]
src/py_pubsub/test/test_flake8.py:15: error: Skipping analyzing "ament_flake8.main": module is installed, but missing library stubs or py.typed marker  [import-untyped]
src/py_pubsub/test/test_copyright.py:15: error: Skipping analyzing "ament_copyright.main": module is installed, but missing library stubs or py.typed marker  [import-untyped]
Found 6 errors in 6 files (checked 8 source files)

会发现它完全识别不了 ROS2 的 rclpy​ 和 std_msgs​等依赖。

这是因为这些代码都是通过执行 colcon build​ 时,通过一个叫 rosidl_generator_py​ 的 C++ 工具,实时读取 .msg​ 文件在内存里动态绑定生成的 C 扩展,代码是动态生成的也就没有源码作为静态存根,mypy​根本无从分析。

因此我们能做的便是选择性的进行忽略...

[[tool.mypy.overrides]]
module = [
    "std_msgs.*",
    "rclpy.*",
    "geometry_msgs.*",
    "sensor_msgs.*",
    "ament_xmllint.*",
    "ament_pep257.*",
    "ament_mypy.*",
    "ament_flake8.*",
    "ament_copyright.*"
]
ignore_missing_imports = true

这会忽略掉 ROS2 中常见的特殊依赖,从而避免 mypy​的报错。

同时,因为这两个依赖的特殊性,我们也需要将它们放入到 package.xml​中,而非作为常规的 python 以来放在 pyproject.toml​中:

<exec_depend>rclpy</exec_depend>
<exec_depend>std_msgs</exec_depend>

然后在 setup.py​的 entry_points​中添加这个入口:

    entry_points={
        'console_scripts': [
            'talker = py_pubsub.publisher_member_function:main'
        ],
    },

确保我们的 setup.cfg​正确:

[develop]
script_dir=$base/lib/py_pubsub
[install]
install_scripts=$base/lib/py_pubsub

这会确保在构建后将我们的文件复制到 install/lib​目录下,当然,我们也可以使用 --symlink-install​来仅进行链接而非进行复制。

现在我们就可以通过下面的命令来构建并运行了:

colcon build --symlink-install \
source install/setup.sh \
ros2 run py_pubsub talker

效果如下:

(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ colcon build --symlink-install
Starting >>> py_pubsub
Finished <<< py_pubsub [0.95s]          

Summary: 1 package finished [1.09s]
(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ source install/setup.sh 
(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ ros2 run py_pubsub talker
[INFO] [1774076582.551990025] [minimal_publisher]: Publishing: "Hello World: 0"
[INFO] [1774076583.043444520] [minimal_publisher]: Publishing: "Hello World: 1"
[INFO] [1774076583.543407498] [minimal_publisher]: Publishing: "Hello World: 2"
[INFO] [1774076584.043507654] [minimal_publisher]: Publishing: "Hello World: 3"

当然,只有发布的节点是远远不够的,我们还需要订阅的节点。

我们可以通过下面的链接来 wget​ 下载得到订阅节点的代码:

jese--ki@KiBall:~/Projects/learn/ros2/pub_sub/src/py_pubsub/py_pubsub$ wget https://raw.githubusercontent.com/ros2/examples/kilted/rclpy/topics/minimal_subscriber/examples_rclpy_minimal_subscriber/subscriber_member_function.py
--2026-03-21 15:07:20--  https://raw.githubusercontent.com/ros2/examples/kilted/rclpy/topics/minimal_subscriber/examples_rclpy_minimal_subscriber/subscriber_member_function.py
正在解析主机 raw.githubusercontent.com (raw.githubusercontent.com)... 172.29.0.36
正在连接 raw.githubusercontent.com (raw.githubusercontent.com)|172.29.0.36|:443... 已连接。
已发出 HTTP 请求,正在等待回应... 200 OK
长度: 1418 (1.4K) [text/plain]
正在保存至: ‘subscriber_member_function.py’

subscriber_member_function.py                    100%[=======================================================================================================>]   1.38K  --.-KB/s    用时 0s    

2026-03-21 15:07:21 (39.3 MB/s) - 已保存 ‘subscriber_member_function.py’ [1418/1418])

订阅者的代码就相对来说简单很多了:

# Copyright 2016 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node

from std_msgs.msg import String


class MinimalSubscriber(Node):

    def __init__(self):
        super().__init__('minimal_subscriber')
        self.subscription = self.create_subscription(
            String,
            'topic',
            self.listener_callback,
            10)
        self.subscription  # prevent unused variable warning

    def listener_callback(self, msg):
        self.get_logger().info('I heard: "%s"' % msg.data)


def main(args=None):
    try:
        with rclpy.init(args=args):
            minimal_subscriber = MinimalSubscriber()

            rclpy.spin(minimal_subscriber)
    except (KeyboardInterrupt, ExternalShutdownException):
        pass


if __name__ == '__main__':
    main()

直接通过 self.create_subscription​来创建一个订阅者就可以了,类似的,这四个参数分别代表msg_type​, topic​, callback​和 qos_profile​。

编辑 setup.py​中的配置文件:

    entry_points={
        'console_scripts': [
            'talker = py_pubsub.publisher_member_function:main',
            'listener = py_pubsub.subscriber_member_function:main'
        ],
    },

接下来我们便可以进行构建了。

由于经常输入这样的构建命令显得十分麻烦,我们便可以增加一个 Makefile​,在其中写入这样的代码:

.PHONY: build dev lint test check

build:
	colcon build --symlink-install

这样便可以快捷进行构建了:

(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ make build 
colcon build --symlink-install
Starting >>> py_pubsub
Finished <<< py_pubsub [0.82s]          

Summary: 1 package finished [0.91s]

现在便可以看到我们有两个可执行文件:

(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ ls install/py_pubsub/lib/py_pubsub/
listener  talker

我们先启动监听节点:

(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ ros2 run py_pubsub listener 

可以看到没有任何输出。

接着我们启动发布节点:

(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ ros2 run py_pubsub talker 
[INFO] [1774078523.781142096] [minimal_publisher]: Publishing: "Hello World: 0"
[INFO] [1774078524.269543087] [minimal_publisher]: Publishing: "Hello World: 1"
[INFO] [1774078524.769698778] [minimal_publisher]: Publishing: "Hello World: 2"
[INFO] [1774078525.269694938] [minimal_publisher]: Publishing: "Hello World: 3"

现在便可以看到订阅节点正常的接收了所有消息:

(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ ros2 run py_pubsub listener 
[INFO] [1774078523.781082734] [minimal_subscriber]: I heard: "Hello World: 0"
[INFO] [1774078524.269728636] [minimal_subscriber]: I heard: "Hello World: 1"
[INFO] [1774078524.769766094] [minimal_subscriber]: I heard: "Hello World: 2"
[INFO] [1774078525.269789336] [minimal_subscriber]: I heard: "Hello World: 3"

以上。

在完成了对话题的理解后,我们接下来就可以进入到对服务的学习了。

有了上面的前车之鉴,我们就可以直接在创建包的时候就直接指定要用到的依赖了:

ros2 pkg create --build-type ament_python --license Apache-2.0 py_srvcli --dependencies rclpy example_interfaces

这样我们创建的包中的 package.xml​就会自动包含 rclpy​和 example_interfaces​这两个依赖。

其中,example_interfaces​这个依赖会为我们增加我们所需的 .srv​文件:

int64 a
int64 b
---
int64 sum

上面是请求格式,下面是返回格式,我们可以看到都是int64​。

接下来我们创建一个新的文件 src/py_srvcli/py_srvcli/service_member_function.py​,并写入以下代码:

from example_interfaces.srv import AddTwoInts

import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node


class MinimalService(Node):

    def __init__(self):
        super().__init__('minimal_service')
        self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback)

    def add_two_ints_callback(self, request, response):
        response.sum = request.a + request.b
        self.get_logger().info('Incoming request\na: %d b: %d' % (request.a, request.b))

        return response


def main():
    try:
        with rclpy.init():
            minimal_service = MinimalService()

            rclpy.spin(minimal_service)
    except (KeyboardInterrupt, ExternalShutdownException):
        pass


if __name__ == '__main__':
    main()

可以看到,这里注册一个服务的方式依旧非常简单:self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback)​

通过超类的 self.create_service​,然后传入服务的数据类型(srv_type​)、服务名称(srv_name​)和服务的回调函数(callback​)便可以直接创建一个对应的服务了。

这里我们设置的服务数据类型为 AddTwoInts​,服务名称为 add_two_ints​,服务的回调函数则为我们创建的加法函数:

    def add_two_ints_callback(self, request, response):
        response.sum = request.a + request.b
        self.get_logger().info('Incoming request\na: %d b: %d' % (request.a, request.b))

        return response

就这样,我们创建了一个服务。

将这个服务添加到我们的入口里:

    entry_points={
        'console_scripts': [
            'service = py_srvcli.service_member_function:main',
        ],
    },

为了保证打包的顺利,我们可以运行下面的命令来确保安装了所需的依赖:

rosdep install -i --from-path src --rosdistro kilted -y

然后进行打包,如果我们想只打包这一个包的话,可以使用这样的命令:

colcon build --symlink-install --packages-select py_srvcli

然后 source install/setup.sh​后,就可以通过下面的命令进行运行了:

ros2 run py_srvcli service

打开 rqt​,就可以看到我们的加法服务了:

我们接下来输入两个数字并使其相加:

这里我们输入 10 和 100,可以看到最终的结果是 110:

同时控制台上也打印了对应的日志:

(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ ros2 run py_srvcli service
[INFO] [1774081387.091999421] [minimal_service]: Incoming request
a: 10 b: 100

除了编写服务的服务端,我们还可以编写该服务的客户端。

我们新增这样的一个文件 src/py_srvcli/py_srvcli/client_member_function.py​:

from example_interfaces.srv import AddTwoInts

import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node

import random

class MinimalClientAsync(Node):

    def __init__(self):
        super().__init__('minimal_client_async')
        self.cli = self.create_client(AddTwoInts, 'add_two_ints')
        while not self.cli.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('service not available, waiting again...')
        self.req = AddTwoInts.Request()

        self.rng = random.Random()

    def send_request(self):
        self.req.a = self.rng.randint(1, 1000)
        self.req.b = self.rng.randint(-1000, -1)
        return self.cli.call_async(self.req)


def main(args=None):
    try:
        with rclpy.init(args=args):
            minimal_client = MinimalClientAsync()
            future = minimal_client.send_request()
            rclpy.spin_until_future_complete(minimal_client, future)
            response = future.result()
            assert response is not None
            minimal_client.get_logger().info(
                'Result of add_two_ints: for %d + %d = %d' %
                (minimal_client.req.a, minimal_client.req.b, response.sum))
    except (KeyboardInterrupt, ExternalShutdownException):
        pass


if __name__ == '__main__':
    main()

这里我们和 ROS2 官方教程不同的点在于,我们引入了 random​库来随机生成整数并作为参数进行传入。

这里客户端因为用到了 spin_until_future_complete​这一方法,因此只会执行一次然后就会退出。

将这个客户端添加到入口,然后构建、运行:

(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ ros2 run py_srvcli client 
[INFO] [1774082341.253676577] [minimal_client_async]: Result of add_two_ints: for 260 + -48 = 212

可以看到我们正常的得到了我们的响应。

当然,我们也可以发多次,比如我们可以修改 main​函数中的代码,通过在实例化节点后增加一个 for​循环就可以多次响应了:

def main(args=None):
    try:
        with rclpy.init(args=args):
            minimal_client = MinimalClientAsync()
            for i in range(5):
                future = minimal_client.send_request()
                rclpy.spin_until_future_complete(minimal_client, future)
                response = future.result()
                assert response is not None
                minimal_client.get_logger().info(
                    'Result of add_two_ints: for %d + %d = %d' %
                    (minimal_client.req.a, minimal_client.req.b, response.sum))
    except (KeyboardInterrupt, ExternalShutdownException):
        pass
(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ ros2 run py_srvcli client 
[INFO] [1774082868.394516464] [minimal_client_async]: Result of add_two_ints: for 493 + -322 = 171
[INFO] [1774082868.395555659] [minimal_client_async]: Result of add_two_ints: for 524 + -886 = -362
[INFO] [1774082868.396186695] [minimal_client_async]: Result of add_two_ints: for 47 + -339 = -292
[INFO] [1774082868.396819726] [minimal_client_async]: Result of add_two_ints: for 353 + -124 = 229
[INFO] [1774082868.397435404] [minimal_client_async]: Result of add_two_ints: for 515 + -145 = 370

我们也可以参考我们话题发布者,通过增加一个 timer​来进行定时且循环的调用服务。

但是由于我们的 call_async​是一个异步的方法,直接按照同步的方式去获取响应会直接出现响应为 None​的报错。

因此,我们需要用到 add_done_callback​这个 Future​类的方法来去增加这样的一个回调函数进行解包响应:

    def get_result_callback(self, future: Future, a: int, b: int) -> None:
        try:
            response = future.result()
            assert response is not None
            self.get_logger().info(f'Result: {a} + {b} = {response.sum}')
        except Exception as e:
            self.get_logger().error(f'Service call failed: {e}')

这是我们新的 send_request​函数,同时将 req​ 的作用域从全局改到了函数内部,避免由于异步并发造成的内存污染:

    def send_request(self) -> None:
        req = AddTwoInts.Request()
        req.a = self.rng.randint(1, 1000)
        req.b = self.rng.randint(-1000, -1)
        future = self.cli.call_async(req)

        future.add_done_callback(
            lambda future_msg: self.get_result_callback(future=future_msg, a=req.a, b=req.b)
        )

整个代码就长这样:

from example_interfaces.srv import AddTwoInts, AddTwoInts_Response

import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from rclpy import Future

import random

class MinimalClientAsync(Node):

    def __init__(self) -> None:
        super().__init__('minimal_client_async')
        self.cli = self.create_client(AddTwoInts, 'add_two_ints')
        while not self.cli.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('service not available, waiting again...')

        self.rng = random.Random()
        time_perioid = 1
        self.timer = self.create_timer(timer_period_sec=time_perioid, callback=self.send_request)


    def send_request(self) -> None:
        req = AddTwoInts.Request()
        req.a = self.rng.randint(1, 1000)
        req.b = self.rng.randint(-1000, -1)
        future = self.cli.call_async(req)

        future.add_done_callback(
            lambda future_msg: self.get_result_callback(future=future_msg, a=req.a, b=req.b)
        )

    def get_result_callback(self, future: Future, a: int, b: int) -> None:
        try:
            response = future.result()
            assert response is not None
            self.get_logger().info(f'Result: {a} + {b} = {response.sum}')
        except Exception as e:
            self.get_logger().error(f'Service call failed: {e}')

def main(args=None):
    try:
        with rclpy.init(args=args):
            minimal_client = MinimalClientAsync()
            rclpy.spin(minimal_client)

    except (KeyboardInterrupt, ExternalShutdownException):
        pass


if __name__ == '__main__':
    main()

我们现在运行看看效果:

(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ ros2 run py_srvcli client 
[INFO] [1774084191.517621670] [minimal_client_async]: Result: 86 + -781 = -695
[INFO] [1774084192.507794472] [minimal_client_async]: Result: 493 + -385 = 108
[INFO] [1774084193.507892261] [minimal_client_async]: Result: 956 + -944 = 12
[INFO] [1774084194.507889853] [minimal_client_async]: Result: 314 + -480 = -166
[INFO] [1774084195.507741740] [minimal_client_async]: Result: 660 + -852 = -192
[INFO] [1774084196.507707792] [minimal_client_async]: Result: 806 + -405 = 401
[INFO] [1774084197.507797126] [minimal_client_async]: Result: 306 + -597 = -291
[INFO] [1774084198.507713285] [minimal_client_async]: Result: 931 + -442 = 489
[INFO] [1774084199.507710615] [minimal_client_async]: Result: 484 + -641 = -157
[INFO] [1774084200.507786244] [minimal_client_async]: Result: 240 + -824 = -584

以上。