260324

昨天我们新建了自己的 msg文件,并进行了编译和自行调用。

接下来我们就试着去自行创建对应的服务并调用吧。

上面我们已经进行了构建了,因此我们在这下面并不需要进行重复的构建。

直接修改服务端和客户端的代码就可以了:

from tutorial_interfaces.srv import AddThreeInts # 引入我们自定义的服务数据类型

import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node, MsgT


class MinimalService(Node):

    def __init__(self):
        super().__init__('minimal_service')
        self.srv = self.create_service(AddThreeInts, 'add_three_ints', self.add_three_ints_callback) # 修改服务数据类型和服务名称

    def add_three_ints_callback(self, request: AddThreeInts.Request, response: AddThreeInts.Response): # 修改函数签名
        response.sum = request.a + request.b + request.c # 修改计算方法
        self.get_logger().info('Incoming request\na: %d b: %d c: %d' % (request.a, request.b, request.c)) # 修改打印的内容

        return response


def main():
    try:
        with rclpy.init():
            minimal_service = MinimalService()

            rclpy.spin(minimal_service)
    except (KeyboardInterrupt, ExternalShutdownException):
        pass


if __name__ == '__main__':
    main()
from tutorial_interfaces.srv import AddThreeInts # 引入我们自定义的服务数据类型

import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from rclpy import Future

import random

class MinimalClientAsync(Node):

    def __init__(self) -> None:
        super().__init__('minimal_client_async')
        self.cli = self.create_client(AddThreeInts, 'add_three_ints') # 修改目标服务的数据类型和名称
        while not self.cli.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('service not available, waiting again...')

        self.rng = random.Random()
        time_perioid = 1
        self.timer = self.create_timer(timer_period_sec=time_perioid, callback=self.send_request)


    def send_request(self) -> None:
        req = AddThreeInts.Request() # 修改请求的数据类型
        req.a = self.rng.randint(1, 1000)
        req.b = self.rng.randint(-1000, -1)
        req.c = self.rng.randint(-500, 500) # 新增 c 参数
        future = self.cli.call_async(req)

        future.add_done_callback(
            lambda future_msg: self.get_result_callback(future=future_msg, a=req.a, b=req.b, c=req.c) # 传入 c
        )

    def get_result_callback(self, future: Future, a: int, b: int, c: int) -> None:# 修改函数签名,新增 c 参数
        try:
            response = future.result()
            assert type(response) is AddThreeInts.Response # 修改断言的对象类型
            self.get_logger().info(f'Result: {a} + {b} + {c}= {response.sum}') # 修改打印的数据类型
        except Exception as e:
            self.get_logger().error(f'Service call failed: {e}')

def main(args=None):
    try:
        with rclpy.init(args=args):
            minimal_client = MinimalClientAsync()
            rclpy.spin(minimal_client)

    except (KeyboardInterrupt, ExternalShutdownException):
        pass


if __name__ == '__main__':
    main()

package.xml中添加下面的行:

<exec_depend>tutorial_interfaces</exec_depend>

然后进行构建、运行:

(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ ros2 run py_srvcli service 
[INFO] [1774340823.923939023] [minimal_service]: Incoming request
a: 124 b: -151 c: 469
[INFO] [1774340824.915489769] [minimal_service]: Incoming request
a: 109 b: -30 c: 197
[INFO] [1774340825.915422369] [minimal_service]: Incoming request
a: 998 b: -607 c: -286

我们可以看到响应符合我们的预期:

(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ ros2 run py_srvcli client 
[INFO] [1774340823.932370309] [minimal_client_async]: Result: 124 + -151 + 469= 442
[INFO] [1774340824.915951765] [minimal_client_async]: Result: 109 + -30 + 197= 276
[INFO] [1774340825.915881961] [minimal_client_async]: Result: 998 + -607 + -286= 105
[INFO] [1774340826.915971606] [minimal_client_async]: Result: 740 + -534 + 293= 499

以上。

我们目前已经将消息和服务如何创建接口的部分学习了,接下来就可以来看看如何在类中使用参数了。

我们创建一个新包吧:

ros2 pkg create --build-type ament_python --license Apache-2.0 python_parameters --dependencies rclpy --description "类使用参数示例"

然后创建一个 src/python_parameters/python_parameters/python_parameters_node.py的新文件,并写入如下的内容:

import random
import string

import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from rclpy.parameter import Parameter

class MinimalParam(Node):
    def __init__(self):
        super().__init__('minimal_param_node')

        self.declare_parameter('my_parameter', 'world')

        self.timer = self.create_timer(1, self.timer_callback)
        self.rng = random.Random()

    def timer_callback(self):
        my_param = self.get_parameter('my_parameter').get_parameter_value().string_value

        self.get_logger().info('Hello %s!' % my_param)
      
        ran_str = ''.join(random.choices(string.ascii_letters + string.digits, k=5))
        my_new_param = Parameter(
            'my_parameter',
            Parameter.Type.STRING,
            ran_str
        )
        all_new_parameters = [my_new_param]
        self.set_parameters(all_new_parameters)

def main():
    try:
        with rclpy.init():
            node = MinimalParam()
            rclpy.spin(node)
    except (KeyboardInterrupt, ExternalShutdownException):
        pass

if __name__ == '__main__':
    main()

这样我们就获得了一个有我们自定义参数的节点。

我们这里看一下代码:

        self.declare_parameter('my_parameter', 'world')

首先是这里,我们通过调用超类的 declare_parameter 方法创建了一个名为 my_parameter,默认值为 world的参数。

这里我们可以发现,我们没有显式传入字符串类型,因为 ROS2 官方对于 declare_parameter 方法是提供了隐式推导的功能了的,相当于是语法糖,我们就不需要显式传入类型定义了

同时,ROS2 的底层也不允许我们自定义参数类型,因此我们想直接传图片或者音频的二进制数据是不可以的。

这个其实也很合理,我们在做 Web 开发的时候其实也不会在 .env 里直接放图片...而是放图片的路径之类的轻量配置。

然后我们设置了一个计时器,每秒触发一次,效果是打印当前参数的内容,并生成一个长度为五的随机数字符串,并将该字符串设置为新参数值。

self.get_parameter('my_parameter').get_parameter_value().string_value

这里是获取我们的 my_parameter 这个参数对象,并通过 get_parameter_value 来获取到具体值。

        my_new_param = Parameter(
            'my_parameter',
            Parameter.Type.STRING,
            ran_str
        )

这里是手动创建了一个新的参数对象,名称为 my_parameter,然后手动创建为数组后,通过 set_parameters覆盖掉所有同名的参数。

后续就是正常的节点初始化了。

添加入口:

    entry_points={
        'console_scripts': [
            'minimal_param_node = python_parameters.python_parameters_node:main'
        ],
    },

我们进行构建并运行该节点:

(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ ros2 run python_parameters minimal_param_node 
[INFO] [1774344934.327909438] [minimal_param_node]: Hello world!
[INFO] [1774344935.320099303] [minimal_param_node]: Hello oAtma!
[INFO] [1774344936.320076827] [minimal_param_node]: Hello L7OGJ!
[INFO] [1774344937.320064441] [minimal_param_node]: Hello UuDrS!
[INFO] [1774344938.320098100] [minimal_param_node]: Hello zkJdu!
[INFO] [1774344939.320157879] [minimal_param_node]: Hello M2eM4!

可以看到第一次打印的是 Hello world,后续的 world则被替换为了其他的随机字符串。

通过 ros2 param list,我们也可以看到我们自己创建的这个 my_parameter参数:

(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ ros2 param list
/minimal_param_node:
  my_parameter
  start_type_description_service
  use_sim_time

接下来我们就可以通过 set命令来将我们这个节点的这个参数设置为我们指定的值,比如我们这里设置为 Jese

(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ ros2 param set /minimal_param_node my_parameter Jese
Set parameter successful
...
[INFO] [1774345056.065165718] [minimal_param_node]: Hello hSHsG!
[INFO] [1774345057.065171195] [minimal_param_node]: Hello Jese!
[INFO] [1774345058.065177293] [minimal_param_node]: Hello B1JP7!
...

除了直接在代码中设置默认值以外,我们也可以通过启动文件来在启动的时候设置默认值。

我们需要先创建对应的启动文件 src/python_parameters/launch/python_parameters_launch.py ,写入如下内容:

from launch import LaunchDescription
from launch_ros.actions import Node


def generate_launch_description():
    return LaunchDescription([
        Node(
            package='python_parameters',
            executable='minimal_param_node',
            name='custom_minimal_param_node',
            output='screen',
            emulate_tty=True,
            parameters=[
                {'my_parameter': 'Beta'}
            ]
        )
    ])

然后编辑我们的 setup.py,在 data_files字段那里将该目录加进去:

import os
from glob import glob
from setuptools import find_packages, setup

package_name = 'python_parameters'

setup(
    name=package_name,
    version='0.0.0',
    packages=find_packages(exclude=['test']),
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
        (os.path.join('share', package_name, 'launch'), glob('launch/*')), # 在这里插入该目录
    ],
    package_data={'': ['py.typed']},
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='JeseKi',
    maintainer_email='2094901072@qq.com',
    description='类使用参数示例',
    license='Apache-2.0',
    extras_require={
        'test': [
            'pytest',
        ],
    },
    entry_points={
        'console_scripts': [
            'minimal_param_node = python_parameters.python_parameters_node:main'
        ],
    },
)

接下来就可以进行构建并运行看看了:

(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ ros2 launch python_parameters python_parameters_launch.py
[INFO] [launch]: All log files can be found below /home/jese--ki/.ros/log/2026-03-24-17-57-56-933487-KiBall-162474
[INFO] [launch]: Default logging verbosity is set to INFO
[INFO] [minimal_param_node-1]: process started with pid [162477]
[minimal_param_node-1] [INFO] [1774346278.229177702] [custom_minimal_param_node]: Hello Beta!
[minimal_param_node-1] [INFO] [1774346279.221235028] [custom_minimal_param_node]: Hello qjIuS!
[minimal_param_node-1] [INFO] [1774346280.221146589] [custom_minimal_param_node]: Hello yZ4KE!

可以看到我们参数的初始值就被改为了 Beta

以上。

在主线教程里,我们还可以了解到一个新的工具,叫做 doctor

嗯,字面意义上的医生,用来诊断我们当前 ROS2 整体设置是否正常:

(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ ros2 doctor 
/opt/ros/kilted/lib/python3.12/site-packages/ros2doctor/api/package.py: 122: UserWarning: generate_parameter_library_py has been updated to a new version. local: 0.7.0 < latest: 0.7.1
/opt/ros/kilted/lib/python3.12/site-packages/ros2doctor/api/package.py: 122: UserWarning: parameter_traits has been updated to a new version. local: 0.7.0 < latest: 0.7.1
/opt/ros/kilted/lib/python3.12/site-packages/ros2doctor/api/package.py: 122: UserWarning: rqt_reconfigure has been updated to a new version. local: 1.7.0 < latest: 1.7.1
/opt/ros/kilted/lib/python3.12/site-packages/ros2doctor/api/package.py: 122: UserWarning: rqt_joint_trajectory_controller has been updated to a new version. local: 5.13.0 < latest: 5.13.1
/opt/ros/kilted/lib/python3.12/site-packages/ros2doctor/api/package.py: 122: UserWarning: generate_parameter_library has been updated to a new version. local: 0.7.0 < latest: 0.7.1

All 5 checks passed

可以看到全部通过,不过有新版本就是了,这些 UserWarning都是版本更新的提示。

除了检查这些设置以外,也可以用来检查我们当前运行中的 ROS2 应用是否正常运行。

我们分别启动教程时候用到的 turtlesim的节点和控制节点,不进行操作的话,可以看到多出来了两个警告:

(.venv) jese--ki@KiBall:~/Projects/learn/ros2/pub_sub$ ros2 doctor 
/opt/ros/kilted/lib/python3.12/site-packages/ros2doctor/api/package.py: 122: UserWarning: generate_parameter_library_py has been updated to a new version. local: 0.7.0 < latest: 0.7.1
/opt/ros/kilted/lib/python3.12/site-packages/ros2doctor/api/package.py: 122: UserWarning: parameter_traits has been updated to a new version. local: 0.7.0 < latest: 0.7.1
/opt/ros/kilted/lib/python3.12/site-packages/ros2doctor/api/package.py: 122: UserWarning: rqt_reconfigure has been updated to a new version. local: 1.7.0 < latest: 1.7.1
/opt/ros/kilted/lib/python3.12/site-packages/ros2doctor/api/package.py: 122: UserWarning: rqt_joint_trajectory_controller has been updated to a new version. local: 5.13.0 < latest: 5.13.1
/opt/ros/kilted/lib/python3.12/site-packages/ros2doctor/api/package.py: 122: UserWarning: generate_parameter_library has been updated to a new version. local: 0.7.0 < latest: 0.7.1
/opt/ros/kilted/lib/python3.12/site-packages/ros2doctor/api/topic.py: 42: UserWarning: Publisher without subscriber detected on /turtle1/color_sensor.
/opt/ros/kilted/lib/python3.12/site-packages/ros2doctor/api/topic.py: 42: UserWarning: Publisher without subscriber detected on /turtle1/pose.

All 5 checks passed

这里的最下面两个警告代表检测到了有发布者正在朝没有订阅的话题发布消息。

如果我们进行订阅的话,这个警告便会消失。

若要获取到完整的报告,我们可以运行 **ros2 doctor --report**命令。