260413

我们上次已经确定了我们对于跟随功能的 RoadMap,接下来就来实现一下看看吧。

我们可以先实现一个旗帜,用于给我们的小车进行跟随:

<?xml version="1.0"?>
<sdf version="1.9">
  <model name="follow_flag">
    <static>true</static>

    <link name="flag_base">
      <collision name="base_collision">
        <pose>0 0 0.05 0 0 0</pose>
        <geometry>
          <cylinder>
            <radius>0.12</radius>
            <length>0.10</length>
          </cylinder>
        </geometry>
      </collision>
      <visual name="base_visual">
        <pose>0 0 0.05 0 0 0</pose>
        <geometry>
          <cylinder>
            <radius>0.12</radius>
            <length>0.10</length>
          </cylinder>
        </geometry>
        <material>
          <ambient>0.15 0.15 0.15 1</ambient>
          <diffuse>0.20 0.20 0.20 1</diffuse>
          <specular>0.02 0.02 0.02 1</specular>
        </material>
      </visual>

      <collision name="pole_collision">
        <pose>0 0 0.60 0 0 0</pose>
        <geometry>
          <cylinder>
            <radius>0.02</radius>
            <length>1.00</length>
          </cylinder>
        </geometry>
      </collision>
      <visual name="pole_visual">
        <pose>0 0 0.60 0 0 0</pose>
        <geometry>
          <cylinder>
            <radius>0.02</radius>
            <length>1.00</length>
          </cylinder>
        </geometry>
        <material>
          <ambient>0.70 0.70 0.70 1</ambient>
          <diffuse>0.80 0.80 0.80 1</diffuse>
          <specular>0.05 0.05 0.05 1</specular>
        </material>
      </visual>

      <visual name="flag_visual">
        <pose>0.18 0 1.00 0 0 0</pose>
        <geometry>
          <box>
            <size>0.30 0.02 0.20</size>
          </box>
        </geometry>
        <material>
          <ambient>0.80 0.10 0.10 1</ambient>
          <diffuse>0.90 0.15 0.15 1</diffuse>
          <specular>0.08 0.08 0.08 1</specular>
        </material>
      </visual>
    </link>
  </model>
</sdf>

很简单的旗帜,整个长得就像我们下面这样:

然后把我们的这个旗帜加入到我们的世界里:

    <include>
      <uri>model://follow_flag</uri>
      <pose>2.5 1.0 0 0 0 0</pose>
    </include>

现在的地图就是长这样了:

然后我们需要试着修改一下我们的 mode 参数的获取,我们之前都是让键盘控制器来手动拉取这个参数的值,我们接下来要写一个跟随控制器的话,也这么写就会发现很别扭。

我们最好是将该模式的信息单独发布为一个话题,然后我们的键盘控制器和跟随控制器直接订阅这个主题就可以了。

我们先创建一个新的 src/kibot_one_interface/msg/ModeState.msg 文件用于话题通信,在其中写入下面的内容:

uint8 STOP = 0
uint8 CRUISE = 1
uint8 MANUAL = 2
uint8 FOLLOW = 3

uint8 current_mode

然后修改我们的 src/kibot_one_control/kibot_one_control/mode_control.py ,先创建模式发布的 QoS:

        self.mode_qos = QoSProfile(
            history=QoSHistoryPolicy.KEEP_LAST,
            depth=1,
            reliability=QoSReliabilityPolicy.RELIABLE,
            durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
        )

我们发布这个话题的指定话题也加入到参数里面:

        params = [
            ("mode", Mode.MANUAL),
            ("mode_pub_timer_period", 0.01667),
            ('mode_topic', 'mode'),
        ]

发布者:

        self.mode_state_publisher = self.create_publisher(
            msg_type=ModeState,
            topic=cast(str, self.get_parameter('mode_topic').value),
            qos_profile=self.mode_qos,
        )

回调:

    def _publish_mode_state(self, mode: Mode) -> None:
        mode_state = ModeState()
        mode_state.current_mode = mode.value
        self.mode_state_publisher.publish(mode_state)`

然后就可以指定进行发布了:

    def _change_mode(self, request: ModeSrv.Request, response: ModeSrv.Response) -> ModeSrv.Response:
        message = ""
      
        try:
            current_mode = self._get_current_mode()
            new_mode = Mode(value=request.target_mode)
            new_mode_param = Parameter(
                name="mode",
                type_=Parameter.Type.INTEGER,
                value=new_mode.value
            )
            all_new_parameters = [new_mode_param]
            self.set_parameters(all_new_parameters)
            if new_mode == Mode.CRUISE:
                self.linear_velocity = request.linear_velocity
            self._publish_mode_state(new_mode)

由于我们已经通过发布话题来控制模式了,因此我们的键盘控制器可以更简单一些:

class KeyboardTeleop(Node):

    def __init__(self) -> None:
        super().__init__('keyboard_teleop')

        self.declare_parameter('cmd_vel_raw_topic', 'cmd_vel_raw')
        self.declare_parameter('linear_speed', 0.8)
        self.declare_parameter('angular_speed', 1.5)
        self.declare_parameter('poll_timeout', 0.1)
        self.declare_parameter('mode_topic', 'mode')

        cmd_vel_raw_topic = self.get_parameter('cmd_vel_raw_topic').get_parameter_value().string_value
        mode_topic = self.get_parameter('mode_topic').get_parameter_value().string_value

        self.linear_speed = self.get_parameter(
            'linear_speed'
        ).get_parameter_value().double_value
        self.angular_speed = self.get_parameter(
            'angular_speed'
        ).get_parameter_value().double_value
        self.poll_timeout = self.get_parameter(
            'poll_timeout'
        ).get_parameter_value().double_value
        self.current_mode = MANUAL_MODE_VALUE
        self.mode_qos = QoSProfile(
            history=QoSHistoryPolicy.KEEP_LAST,
            depth=1,
            reliability=QoSReliabilityPolicy.RELIABLE,
            durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
        )

        self.cmd_vel_raw_publisher = self.create_publisher(msg_type=Twist, topic=cmd_vel_raw_topic, qos_profile=10)
        self.mode_sub = self.create_subscription(msg_type=ModeState, topic=mode_topic, callback=self.mode_callback, qos_profile=self.mode_qos)

        self._settings = termios.tcgetattr(sys.stdin)

    def print_help(self) -> None:
        print(
            '控制器正在运行.\n'
            '使用 WASD 进行移动.\n'
            '  w: 前进\n'
            '  s: 后退\n'
            '  a: 原地左转\n'
            '  d: 原地右转\n'
            'space/x: 停止\n'
            'q: 退出\n'
        )

    def run(self) -> None:
        self.print_help()
        try:
            while rclpy.ok():
                key = self.read_key()
                if key is None:
                    rclpy.spin_once(self, timeout_sec=0.0)
                    continue

                if key == 'q':
                    self.publish_stop()
                    break

                if key in ('x', ' '):
                    self.publish_stop()
                    continue

                cmd = self.key_to_twist(key)

                if cmd is not None and self.current_mode == MANUAL_MODE_VALUE:
                    self.cmd_vel_raw_publisher.publish(cmd)
        finally:
            self.restore_terminal()

    def mode_callback(self, msg: ModeState) -> None:
        self.current_mode = msg.current_mode

经过测试,系统工作符合预期:

由于我们目前不打算上视觉模型,因此使用直接发布旗帜的位姿的方式来达到定位旗帜的目的。

我们可以在 src/kibot_one_control/kibot_one_control/flag_pose_publisher.py 中写入下面的内容:

from typing import Any, cast

import rclpy
from geometry_msgs.msg import PoseStamped  # type: ignore
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node


class FlagPosePublisher(Node):
    def __init__(self) -> None:
        super().__init__(node_name='flag_pose_publisher')

        params = [
            ('flag_pose_topic', 'flag_pose'),
            ('frame_id', 'odom'),
            ('publish_period', 0.1),
            ('flag_x', 2.5),
            ('flag_y', 1.0),
            ('flag_z', 0.0),
        ]
        self.declare_parameters(namespace='', parameters=cast(Any, params))

        topic_name = cast(str, self.get_parameter(name='flag_pose_topic').value)
        publish_period = cast(float, self.get_parameter(name='publish_period').value)

        self.flag_pose_publisher = self.create_publisher(msg_type=PoseStamped, topic=topic_name, qos_profile=10)
        self.flag_pose_msg = PoseStamped()
        self.flag_pose_msg.header.frame_id = cast(str, self.get_parameter(name='frame_id').value)
        self.flag_pose_msg.pose.position.x = cast(float, self.get_parameter(name='flag_x').value)
        self.flag_pose_msg.pose.position.y = cast(float, self.get_parameter(name='flag_y').value)
        self.flag_pose_msg.pose.position.z = cast(float, self.get_parameter(name='flag_z').value)
        self.flag_pose_msg.pose.orientation.w = 1.0

        self.create_timer(timer_period_sec=publish_period, callback=self._publish_flag_pose)

    def _publish_flag_pose(self) -> None:
        self.flag_pose_msg.header.stamp = self.get_clock().now().to_msg()
        self.flag_pose_publisher.publish(msg=self.flag_pose_msg)


def main(args=None) -> None:
    flag_pose_publisher = None
    try:
        rclpy.init(args=args)
        flag_pose_publisher = FlagPosePublisher()
        rclpy.spin(node=flag_pose_publisher)
    except (KeyboardInterrupt, ExternalShutdownException):
        pass
    finally:
        if flag_pose_publisher is not None:
            flag_pose_publisher.destroy_node()
        if rclpy.ok():
            rclpy.shutdown()


if __name__ == '__main__':
    main()

这里我们定义了一个名为 flag_pose_publisher 的节点,直接发布固定的位姿信息,我们当前第一阶段是这样的目的,后面再改成实时读取的,主要是验证是否可以跟随目标

然后创建 src/kibot_one_control/kibot_one_control/follow_controller.py文件,写入下面的内容:

import math
from typing import cast

import rclpy
from geometry_msgs.msg import PoseStamped, Twist  # type: ignore
from kibot_one_interface.msg import ModeState  # type: ignore
from nav_msgs.msg import Odometry  # type: ignore
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy


FOLLOW_MODE_VALUE = 3


class FollowController(Node):
    def __init__(self) -> None:
        super().__init__(node_name='follow_controller')

        params = [
            ('mode_topic', 'mode'),
            ('odom_topic', 'odom'),
            ('flag_pose_topic', 'flag_pose'),
            ('cmd_vel_raw_topic', 'cmd_vel_raw'),
            ('control_period', 0.05),
            ('stop_distance', 0.60),
            ('heading_tolerance', 0.20),
            ('linear_gain', 0.60),
            ('angular_gain', 1.80),
            ('max_linear_speed', 0.50),
            ('max_angular_speed', 1.20),
        ]

        self.declare_parameters(namespace='', parameters=params)


        self.robot_x: float | None = None
        self.robot_y: float | None = None
        self.robot_yaw: float | None = None
        self.flag_x: float | None = None
        self.flag_y: float | None = None
        self.current_mode = 2
        self.mode_qos = QoSProfile(
            history=QoSHistoryPolicy.KEEP_LAST,
            depth=1,
            reliability=QoSReliabilityPolicy.RELIABLE,
            durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
        )

        self.cmd_vel_raw_publisher = self.create_publisher(
            msg_type=Twist,
            topic=cast(str, self.get_parameter('cmd_vel_raw_topic').value),
            qos_profile=10,
        )
        self.create_subscription(
            msg_type=ModeState,
            topic=cast(str, self.get_parameter('mode_topic').value),
            callback=self._mode_callback,
            qos_profile=self.mode_qos,
        )
        self.create_subscription(
            msg_type=Odometry,
            topic=cast(str, self.get_parameter('odom_topic').value),
            callback=self._odom_callback,
            qos_profile=10,
        )
        self.create_subscription(
            msg_type=PoseStamped,
            topic=cast(str, self.get_parameter('flag_pose_topic').value),
            callback=self._flag_pose_callback,
            qos_profile=10,
        )
        self.create_timer(
            timer_period_sec=cast(float, self.get_parameter('control_period').value),
            callback=self._control_loop,
        )

    def _mode_callback(self, msg: ModeState) -> None:
        self.current_mode = msg.current_mode

    def _odom_callback(self, msg: Odometry) -> None:
        self.robot_x = msg.pose.pose.position.x
        self.robot_y = msg.pose.pose.position.y

        orientation = msg.pose.pose.orientation
        siny_cosp = 2.0 * (orientation.w * orientation.z + orientation.x * orientation.y)
        cosy_cosp = 1.0 - 2.0 * (orientation.y * orientation.y + orientation.z * orientation.z)
        self.robot_yaw = math.atan2(siny_cosp, cosy_cosp)

    def _flag_pose_callback(self, msg: PoseStamped) -> None:
        self.flag_x = msg.pose.position.x
        self.flag_y = msg.pose.position.y

    def _control_loop(self) -> None:
        if self.current_mode != FOLLOW_MODE_VALUE:
            return

        if self.robot_x is None or self.robot_y is None or self.robot_yaw is None:
            self.cmd_vel_raw_publisher.publish(Twist())
            return

        if self.flag_x is None or self.flag_y is None:
            self.cmd_vel_raw_publisher.publish(Twist())
            return

        dx = self.flag_x - self.robot_x
        dy = self.flag_y - self.robot_y
        distance = math.hypot(dx, dy)
        stop_distance = cast(float, self.get_parameter('stop_distance').value)
        if distance <= stop_distance:
            self.cmd_vel_raw_publisher.publish(Twist())
            return

        target_yaw = math.atan2(dy, dx)
        heading_error = self._normalize_angle(target_yaw - self.robot_yaw)

        linear_speed = min(
            cast(float, self.get_parameter('max_linear_speed').value),
            cast(float, self.get_parameter('linear_gain').value) * max(0.0, distance - stop_distance),
        )
        angular_speed = max(
            -cast(float, self.get_parameter('max_angular_speed').value),
            min(
                cast(float, self.get_parameter('max_angular_speed').value),
                cast(float, self.get_parameter('angular_gain').value) * heading_error,
            ),
        )

        command = Twist()
        command.angular.z = angular_speed
        if abs(heading_error) < cast(float, self.get_parameter('heading_tolerance').value):
            command.linear.x = linear_speed

        self.cmd_vel_raw_publisher.publish(command)

    @staticmethod
    def _normalize_angle(angle: float) -> float:
        return math.atan2(math.sin(angle), math.cos(angle))


def main(args=None) -> None:
    follow_controller = None
    try:
        rclpy.init(args=args)
        follow_controller = FollowController()
        rclpy.spin(follow_controller)
    except (KeyboardInterrupt, ExternalShutdownException):
        pass
    finally:
        if follow_controller is not None:
            follow_controller.destroy_node()
        if rclpy.ok():
            rclpy.shutdown()


if __name__ == '__main__':
    main()

这里我们创建了一个名为 follow_controller 的节点,它会订阅 /mode这个当前模式、机器人自身位置 /odom 和旗帜位置 flag_pose