隔了几天,我们继续。

接下来我们要接入旗帜检测的节点以及启动脚本。

先创建节点文件src/kibot_one_control/kibot_one_control/flag_detector.py:

from typing import cast

import rclpy
from kibot_one_interface.msg import FlagDetection # type: ignore
from rclpy.node import Node
from rclpy.qos import qos_profile_sensor_data
from sensor_msgs.msg import Image # type: ignore

from kibot_one_control.flag_detection_core import detect_red_flag_pixels

上面这些是基本的骨架,下面我们需要加上这个节点的类了:

class FlagDetector(Node):
    def __init__(
        self,
        node_name: str,
        *,
        context: rclpy.Context | None = None,
        cli_args: List[str] | None = None,
        namespace: str | None = None,
        use_global_arguments: bool = True,
        enable_rosout: bool = True,
        start_parameter_services: bool = True,
        parameter_overrides: List[Parameter] | None = None,
        allow_undeclared_parameters: bool = False,
        automatically_declare_parameters_from_overrides: bool = False,
        enable_logger_service: bool = False,
    ) -> None:
        super().__init__(
            node_name="flag_detector",
            context=context,
            cli_args=cli_args,
            namespace=namespace,
            use_global_arguments=use_global_arguments,
            enable_rosout=enable_rosout,
            start_parameter_services=start_parameter_services,
            parameter_overrides=parameter_overrides,
            allow_undeclared_parameters=allow_undeclared_parameters,
            automatically_declare_parameters_from_overrides=automatically_declare_parameters_from_overrides,
            enable_logger_service=enable_logger_service,
        )

        self.declare_parameters(
            namespace="",
            parameters=[
                ("image_topic", "/camera/image_raw"),
                ("detection_topic", "/flag_dectection"),
                ("min_red", 120),
                ("red_margin", 45),
                ("min_pixel_count", 80),
            ] # type: ignore
        )
        self.image_sub = self.create_subscription(
            msg_type=Image,
            topic=cast(str, self.get_parameter(name="image_topic").value),
            callback=self._image_callback,
            qos_profile=qos_profile_sensor_data
        )
        self.detection_pub = self.create_publisher(
            msg_type=FlagDetection,
            topic=cast(str, self.get_parameter(name="detection_topic").value),
            qos_profile=10
        )

_imgae_callback这个回调函数我们还没有定义,我们在下面定义一下:

    def _image_callback(self, msg: Image) -> None:
        detection: ColorDetection = detect_red_flag_pixels(
            image_data=bytes(msg.data),
            width=msg.width,
            height=msg.height,
            encoding=msg.encoding,
            min_red=cast(int, self.get_parameter(name="min_red").value),
            red_margin=cast(int, self.get_parameter(name="red_margin").value),
            min_pixel_count=cast(int, self.get_parameter(name="min_pixel_count").value)
        )
        output = FlagDetection()
        output.header = msg.header
        output.detected = detection.detected
        output.center_x = detection.center_x
        output.center_y = detection.center_y
        output.image_width = msg.width
        output.image_height = msg.height
        output.pixel_count = detection.pixel_count
        output.confidence = detection.confidence
      
        self.detection_pub.publish(msg=output)

最后创建节点入口:

def main() -> None:
    rclpy.init()
    node = FlagDetector()
    try:
        rclpy.spin(node=node)
    finally:
        node.destroy_node()
        rclpy.shutdown()

然后把我们的这个节点注册到 src/kibot_one_control/setup.py 中:

    entry_points={
        'console_scripts': [
            'cmd_vel_watchdog = kibot_one_control.cmd_vel_watchdog:main',
            'control_console = kibot_one_control.control_console:main',
            'keyboard_teleop = kibot_one_control.keyboard_teleop:main',
            'mode_control = kibot_one_control.mode_control:main',
            'flag_pose_publisher = kibot_one_control.flag_pose_publisher:main',
            'follow_controller = kibot_one_control.follow_controller:main',
            'flag_detector = kibot_one_control.flag_detector:main',
            'frontier_explorer = kibot_one_control.frontier_explorer:main',
        ],
    },

接下来我们创建启动文件 src/kibot_one_control/launch/flag_detection.launch.py:

from pathlib import Path

from ament_index_python.packages import get_package_share_directory  # type: ignore
from launch import LaunchDescription  # type: ignore
from launch.actions import DeclareLaunchArgument, IncludeLaunchDescription  # type: ignore
from launch.conditions import IfCondition  # type: ignore
from launch.launch_description_sources import PythonLaunchDescriptionSource  # type: ignore
from launch.substitutions import LaunchConfiguration  # type: ignore
from launch_ros.actions import Node  # type: ignore

def generate_launch_description() -> LaunchDescription:
    sim_share = Path(get_package_share_directory(package_name="kibot_one_sim"))
    sim_launch = sim_share / "launch" / "sim_with_bridge.launch.py"

    world_arg = DeclareLaunchArgument(
        name="world",
        default_value=str(sim_share / "worlds" / "kibot_one.world.sdf"),
        description="Gazebo 世界的绝对路径。",
    )
    start_sim_arg = DeclareLaunchArgument(
        name="start_sim",
        default_value="true",
        description="是否启动 Gazebo 与 ros_gz_bridge。",
    )
    run_on_start_arg = DeclareLaunchArgument(
        name="run_on_start",
        default_value="true",
        description="是否让 Gazebo 启动后立即运行仿真。",
    )
    start_sim = IncludeLaunchDescription(
        launch_description_source=PythonLaunchDescriptionSource(launch_file_path=str(sim_launch)),
        launch_arguments={
            "world": LaunchConfiguration(variable_name="world"),
            "run_on_start": LaunchConfiguration(variable_name="run_on_start"),
        }.items(),
        condition=IfCondition(predicate_expression=LaunchConfiguration(variable_name="start_sim")),
    )

    detector = Node(
        package="kibot_one_control",
        executable="flag_detector",
        name="flag_detector",
        output="screen",
        parameters=[{"use_sim_time": True}],
    )

    return LaunchDescription([
        world_arg,
        start_sim_arg,
        run_on_start_arg,
        start_sim,
        detector,
    ])

然后也注册到我们的 src/kibot_one_control/setup.py 中:

    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
        ('share/' + package_name + '/launch', [
            'launch/control_console.launch.py',
            'launch/flag_detection.launch.py',
            'launch/frontier_exploration.launch.py',
            'launch/follow_phase1.launch.py',
            'launch/follow_phase2.launch.py',
        ]),
    ],

接下来我们构建一下看看:

(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotTwo$ make build 
colcon build
Starting >>> kibot_one_interface
Starting >>> kibot_one_sim
Finished <<< kibot_one_sim [0.39s]                                                                   
Finished <<< kibot_one_interface [0.62s]                   
Starting >>> kibot_one_control
Finished <<< kibot_one_control [0.90s]        

Summary: 3 packages finished [1.72s]

可以看到构建正常通过。

在开始进行视觉的检测之前,我们先检查一下看看我们的算法是否可以正常通过测试:

(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotTwo$ python -m pytest -q src/kibot_one_control/test/test_frontier_core.py src/kibot_one_control/test/test_flag_detection_core.py 
......                                                                                                                                                                                [100%]
6 passed in 0.01s

全部通过。

然后启动一下我们视觉检测的启动文件看看:

ros2 launch kibot_one_control flag_detection.launch.py start_sim:=true run_on_start:=true

正常启动,没有错误。

看看我们的相机话题:

(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotTwo$ ros2 topic echo /camera/image_raw --once
header:
  stamp:
    sec: 30
    nanosec: 164000000
  frame_id: camera_link
height: 240
width: 320
encoding: rgb8
is_bigendian: 0
step: 960
data:
- 231
...

正常工作。

旗帜检测话题:

(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotTwo$ ros2 topic echo /flag_dectection --once
header:
  stamp:
    sec: 87
    nanosec: 648000000
  frame_id: camera_link
detected: true
center_x: 44.15916061401367
center_y: 31.16216278076172
image_width: 320
image_height: 240
pixel_count: 333
confidence: 0.0043359375558793545
---

依旧正常。

然后放一个红色圆柱体在小车面前:

看看检测的结果:

(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotTwo$ ros2 topic echo /flag_dectection --once
header:
  stamp:
    sec: 192
    nanosec: 192000000
  frame_id: camera_link
detected: true
center_x: 159.5
center_y: 119.5
image_width: 320
image_height: 240
pixel_count: 76800
confidence: 1.0
---

可以看到完全符合我们的预期。

最后我们试着从相机视角看看:

可以看到也是完全符合我们的预期的。

综上,我们完成了我们对于红色旗帜颜色检测接入。