隔了几天,我们继续。
接下来我们要接入旗帜检测的节点以及启动脚本。
先创建节点文件src/kibot_one_control/kibot_one_control/flag_detector.py:
from typing import cast
import rclpy
from kibot_one_interface.msg import FlagDetection # type: ignore
from rclpy.node import Node
from rclpy.qos import qos_profile_sensor_data
from sensor_msgs.msg import Image # type: ignore
from kibot_one_control.flag_detection_core import detect_red_flag_pixels
上面这些是基本的骨架,下面我们需要加上这个节点的类了:
class FlagDetector(Node):
def __init__(
self,
node_name: str,
*,
context: rclpy.Context | None = None,
cli_args: List[str] | None = None,
namespace: str | None = None,
use_global_arguments: bool = True,
enable_rosout: bool = True,
start_parameter_services: bool = True,
parameter_overrides: List[Parameter] | None = None,
allow_undeclared_parameters: bool = False,
automatically_declare_parameters_from_overrides: bool = False,
enable_logger_service: bool = False,
) -> None:
super().__init__(
node_name="flag_detector",
context=context,
cli_args=cli_args,
namespace=namespace,
use_global_arguments=use_global_arguments,
enable_rosout=enable_rosout,
start_parameter_services=start_parameter_services,
parameter_overrides=parameter_overrides,
allow_undeclared_parameters=allow_undeclared_parameters,
automatically_declare_parameters_from_overrides=automatically_declare_parameters_from_overrides,
enable_logger_service=enable_logger_service,
)
self.declare_parameters(
namespace="",
parameters=[
("image_topic", "/camera/image_raw"),
("detection_topic", "/flag_dectection"),
("min_red", 120),
("red_margin", 45),
("min_pixel_count", 80),
] # type: ignore
)
self.image_sub = self.create_subscription(
msg_type=Image,
topic=cast(str, self.get_parameter(name="image_topic").value),
callback=self._image_callback,
qos_profile=qos_profile_sensor_data
)
self.detection_pub = self.create_publisher(
msg_type=FlagDetection,
topic=cast(str, self.get_parameter(name="detection_topic").value),
qos_profile=10
)
_imgae_callback这个回调函数我们还没有定义,我们在下面定义一下:
def _image_callback(self, msg: Image) -> None:
detection: ColorDetection = detect_red_flag_pixels(
image_data=bytes(msg.data),
width=msg.width,
height=msg.height,
encoding=msg.encoding,
min_red=cast(int, self.get_parameter(name="min_red").value),
red_margin=cast(int, self.get_parameter(name="red_margin").value),
min_pixel_count=cast(int, self.get_parameter(name="min_pixel_count").value)
)
output = FlagDetection()
output.header = msg.header
output.detected = detection.detected
output.center_x = detection.center_x
output.center_y = detection.center_y
output.image_width = msg.width
output.image_height = msg.height
output.pixel_count = detection.pixel_count
output.confidence = detection.confidence
self.detection_pub.publish(msg=output)
最后创建节点入口:
def main() -> None:
rclpy.init()
node = FlagDetector()
try:
rclpy.spin(node=node)
finally:
node.destroy_node()
rclpy.shutdown()
然后把我们的这个节点注册到 src/kibot_one_control/setup.py 中:
entry_points={
'console_scripts': [
'cmd_vel_watchdog = kibot_one_control.cmd_vel_watchdog:main',
'control_console = kibot_one_control.control_console:main',
'keyboard_teleop = kibot_one_control.keyboard_teleop:main',
'mode_control = kibot_one_control.mode_control:main',
'flag_pose_publisher = kibot_one_control.flag_pose_publisher:main',
'follow_controller = kibot_one_control.follow_controller:main',
'flag_detector = kibot_one_control.flag_detector:main',
'frontier_explorer = kibot_one_control.frontier_explorer:main',
],
},
接下来我们创建启动文件 src/kibot_one_control/launch/flag_detection.launch.py:
from pathlib import Path
from ament_index_python.packages import get_package_share_directory # type: ignore
from launch import LaunchDescription # type: ignore
from launch.actions import DeclareLaunchArgument, IncludeLaunchDescription # type: ignore
from launch.conditions import IfCondition # type: ignore
from launch.launch_description_sources import PythonLaunchDescriptionSource # type: ignore
from launch.substitutions import LaunchConfiguration # type: ignore
from launch_ros.actions import Node # type: ignore
def generate_launch_description() -> LaunchDescription:
sim_share = Path(get_package_share_directory(package_name="kibot_one_sim"))
sim_launch = sim_share / "launch" / "sim_with_bridge.launch.py"
world_arg = DeclareLaunchArgument(
name="world",
default_value=str(sim_share / "worlds" / "kibot_one.world.sdf"),
description="Gazebo 世界的绝对路径。",
)
start_sim_arg = DeclareLaunchArgument(
name="start_sim",
default_value="true",
description="是否启动 Gazebo 与 ros_gz_bridge。",
)
run_on_start_arg = DeclareLaunchArgument(
name="run_on_start",
default_value="true",
description="是否让 Gazebo 启动后立即运行仿真。",
)
start_sim = IncludeLaunchDescription(
launch_description_source=PythonLaunchDescriptionSource(launch_file_path=str(sim_launch)),
launch_arguments={
"world": LaunchConfiguration(variable_name="world"),
"run_on_start": LaunchConfiguration(variable_name="run_on_start"),
}.items(),
condition=IfCondition(predicate_expression=LaunchConfiguration(variable_name="start_sim")),
)
detector = Node(
package="kibot_one_control",
executable="flag_detector",
name="flag_detector",
output="screen",
parameters=[{"use_sim_time": True}],
)
return LaunchDescription([
world_arg,
start_sim_arg,
run_on_start_arg,
start_sim,
detector,
])
然后也注册到我们的 src/kibot_one_control/setup.py 中:
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
('share/' + package_name + '/launch', [
'launch/control_console.launch.py',
'launch/flag_detection.launch.py',
'launch/frontier_exploration.launch.py',
'launch/follow_phase1.launch.py',
'launch/follow_phase2.launch.py',
]),
],
接下来我们构建一下看看:
(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotTwo$ make build
colcon build
Starting >>> kibot_one_interface
Starting >>> kibot_one_sim
Finished <<< kibot_one_sim [0.39s]
Finished <<< kibot_one_interface [0.62s]
Starting >>> kibot_one_control
Finished <<< kibot_one_control [0.90s]
Summary: 3 packages finished [1.72s]
可以看到构建正常通过。
在开始进行视觉的检测之前,我们先检查一下看看我们的算法是否可以正常通过测试:
(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotTwo$ python -m pytest -q src/kibot_one_control/test/test_frontier_core.py src/kibot_one_control/test/test_flag_detection_core.py
...... [100%]
6 passed in 0.01s
全部通过。
然后启动一下我们视觉检测的启动文件看看:
ros2 launch kibot_one_control flag_detection.launch.py start_sim:=true run_on_start:=true
正常启动,没有错误。
看看我们的相机话题:
(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotTwo$ ros2 topic echo /camera/image_raw --once
header:
stamp:
sec: 30
nanosec: 164000000
frame_id: camera_link
height: 240
width: 320
encoding: rgb8
is_bigendian: 0
step: 960
data:
- 231
...
正常工作。
旗帜检测话题:
(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotTwo$ ros2 topic echo /flag_dectection --once
header:
stamp:
sec: 87
nanosec: 648000000
frame_id: camera_link
detected: true
center_x: 44.15916061401367
center_y: 31.16216278076172
image_width: 320
image_height: 240
pixel_count: 333
confidence: 0.0043359375558793545
---
依旧正常。
然后放一个红色圆柱体在小车面前:

看看检测的结果:
(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotTwo$ ros2 topic echo /flag_dectection --once
header:
stamp:
sec: 192
nanosec: 192000000
frame_id: camera_link
detected: true
center_x: 159.5
center_y: 119.5
image_width: 320
image_height: 240
pixel_count: 76800
confidence: 1.0
---
可以看到完全符合我们的预期。
最后我们试着从相机视角看看:
可以看到也是完全符合我们的预期的。
综上,我们完成了我们对于红色旗帜颜色检测接入。