260413
我们上次已经确定了我们对于跟随功能的 RoadMap,接下来就来实现一下看看吧。
我们可以先实现一个旗帜,用于给我们的小车进行跟随:
<?xml version="1.0"?>
<sdf version="1.9">
<model name="follow_flag">
<static>true</static>
<link name="flag_base">
<collision name="base_collision">
<pose>0 0 0.05 0 0 0</pose>
<geometry>
<cylinder>
<radius>0.12</radius>
<length>0.10</length>
</cylinder>
</geometry>
</collision>
<visual name="base_visual">
<pose>0 0 0.05 0 0 0</pose>
<geometry>
<cylinder>
<radius>0.12</radius>
<length>0.10</length>
</cylinder>
</geometry>
<material>
<ambient>0.15 0.15 0.15 1</ambient>
<diffuse>0.20 0.20 0.20 1</diffuse>
<specular>0.02 0.02 0.02 1</specular>
</material>
</visual>
<collision name="pole_collision">
<pose>0 0 0.60 0 0 0</pose>
<geometry>
<cylinder>
<radius>0.02</radius>
<length>1.00</length>
</cylinder>
</geometry>
</collision>
<visual name="pole_visual">
<pose>0 0 0.60 0 0 0</pose>
<geometry>
<cylinder>
<radius>0.02</radius>
<length>1.00</length>
</cylinder>
</geometry>
<material>
<ambient>0.70 0.70 0.70 1</ambient>
<diffuse>0.80 0.80 0.80 1</diffuse>
<specular>0.05 0.05 0.05 1</specular>
</material>
</visual>
<visual name="flag_visual">
<pose>0.18 0 1.00 0 0 0</pose>
<geometry>
<box>
<size>0.30 0.02 0.20</size>
</box>
</geometry>
<material>
<ambient>0.80 0.10 0.10 1</ambient>
<diffuse>0.90 0.15 0.15 1</diffuse>
<specular>0.08 0.08 0.08 1</specular>
</material>
</visual>
</link>
</model>
</sdf>
很简单的旗帜,整个长得就像我们下面这样:

然后把我们的这个旗帜加入到我们的世界里:
<include>
<uri>model://follow_flag</uri>
<pose>2.5 1.0 0 0 0 0</pose>
</include>
现在的地图就是长这样了:

然后我们需要试着修改一下我们的 mode 参数的获取,我们之前都是让键盘控制器来手动拉取这个参数的值,我们接下来要写一个跟随控制器的话,也这么写就会发现很别扭。
我们最好是将该模式的信息单独发布为一个话题,然后我们的键盘控制器和跟随控制器直接订阅这个主题就可以了。
我们先创建一个新的 src/kibot_one_interface/msg/ModeState.msg 文件用于话题通信,在其中写入下面的内容:
uint8 STOP = 0
uint8 CRUISE = 1
uint8 MANUAL = 2
uint8 FOLLOW = 3
uint8 current_mode
然后修改我们的 src/kibot_one_control/kibot_one_control/mode_control.py ,先创建模式发布的 QoS:
self.mode_qos = QoSProfile(
history=QoSHistoryPolicy.KEEP_LAST,
depth=1,
reliability=QoSReliabilityPolicy.RELIABLE,
durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
)
我们发布这个话题的指定话题也加入到参数里面:
params = [
("mode", Mode.MANUAL),
("mode_pub_timer_period", 0.01667),
('mode_topic', 'mode'),
]
发布者:
self.mode_state_publisher = self.create_publisher(
msg_type=ModeState,
topic=cast(str, self.get_parameter('mode_topic').value),
qos_profile=self.mode_qos,
)
回调:
def _publish_mode_state(self, mode: Mode) -> None:
mode_state = ModeState()
mode_state.current_mode = mode.value
self.mode_state_publisher.publish(mode_state)`
然后就可以指定进行发布了:
def _change_mode(self, request: ModeSrv.Request, response: ModeSrv.Response) -> ModeSrv.Response:
message = ""
try:
current_mode = self._get_current_mode()
new_mode = Mode(value=request.target_mode)
new_mode_param = Parameter(
name="mode",
type_=Parameter.Type.INTEGER,
value=new_mode.value
)
all_new_parameters = [new_mode_param]
self.set_parameters(all_new_parameters)
if new_mode == Mode.CRUISE:
self.linear_velocity = request.linear_velocity
self._publish_mode_state(new_mode)
由于我们已经通过发布话题来控制模式了,因此我们的键盘控制器可以更简单一些:
class KeyboardTeleop(Node):
def __init__(self) -> None:
super().__init__('keyboard_teleop')
self.declare_parameter('cmd_vel_raw_topic', 'cmd_vel_raw')
self.declare_parameter('linear_speed', 0.8)
self.declare_parameter('angular_speed', 1.5)
self.declare_parameter('poll_timeout', 0.1)
self.declare_parameter('mode_topic', 'mode')
cmd_vel_raw_topic = self.get_parameter('cmd_vel_raw_topic').get_parameter_value().string_value
mode_topic = self.get_parameter('mode_topic').get_parameter_value().string_value
self.linear_speed = self.get_parameter(
'linear_speed'
).get_parameter_value().double_value
self.angular_speed = self.get_parameter(
'angular_speed'
).get_parameter_value().double_value
self.poll_timeout = self.get_parameter(
'poll_timeout'
).get_parameter_value().double_value
self.current_mode = MANUAL_MODE_VALUE
self.mode_qos = QoSProfile(
history=QoSHistoryPolicy.KEEP_LAST,
depth=1,
reliability=QoSReliabilityPolicy.RELIABLE,
durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
)
self.cmd_vel_raw_publisher = self.create_publisher(msg_type=Twist, topic=cmd_vel_raw_topic, qos_profile=10)
self.mode_sub = self.create_subscription(msg_type=ModeState, topic=mode_topic, callback=self.mode_callback, qos_profile=self.mode_qos)
self._settings = termios.tcgetattr(sys.stdin)
def print_help(self) -> None:
print(
'控制器正在运行.\n'
'使用 WASD 进行移动.\n'
' w: 前进\n'
' s: 后退\n'
' a: 原地左转\n'
' d: 原地右转\n'
'space/x: 停止\n'
'q: 退出\n'
)
def run(self) -> None:
self.print_help()
try:
while rclpy.ok():
key = self.read_key()
if key is None:
rclpy.spin_once(self, timeout_sec=0.0)
continue
if key == 'q':
self.publish_stop()
break
if key in ('x', ' '):
self.publish_stop()
continue
cmd = self.key_to_twist(key)
if cmd is not None and self.current_mode == MANUAL_MODE_VALUE:
self.cmd_vel_raw_publisher.publish(cmd)
finally:
self.restore_terminal()
def mode_callback(self, msg: ModeState) -> None:
self.current_mode = msg.current_mode
经过测试,系统工作符合预期:
由于我们目前不打算上视觉模型,因此使用直接发布旗帜的位姿的方式来达到定位旗帜的目的。
我们可以在 src/kibot_one_control/kibot_one_control/flag_pose_publisher.py 中写入下面的内容:
from typing import Any, cast
import rclpy
from geometry_msgs.msg import PoseStamped # type: ignore
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
class FlagPosePublisher(Node):
def __init__(self) -> None:
super().__init__(node_name='flag_pose_publisher')
params = [
('flag_pose_topic', 'flag_pose'),
('frame_id', 'odom'),
('publish_period', 0.1),
('flag_x', 2.5),
('flag_y', 1.0),
('flag_z', 0.0),
]
self.declare_parameters(namespace='', parameters=cast(Any, params))
topic_name = cast(str, self.get_parameter(name='flag_pose_topic').value)
publish_period = cast(float, self.get_parameter(name='publish_period').value)
self.flag_pose_publisher = self.create_publisher(msg_type=PoseStamped, topic=topic_name, qos_profile=10)
self.flag_pose_msg = PoseStamped()
self.flag_pose_msg.header.frame_id = cast(str, self.get_parameter(name='frame_id').value)
self.flag_pose_msg.pose.position.x = cast(float, self.get_parameter(name='flag_x').value)
self.flag_pose_msg.pose.position.y = cast(float, self.get_parameter(name='flag_y').value)
self.flag_pose_msg.pose.position.z = cast(float, self.get_parameter(name='flag_z').value)
self.flag_pose_msg.pose.orientation.w = 1.0
self.create_timer(timer_period_sec=publish_period, callback=self._publish_flag_pose)
def _publish_flag_pose(self) -> None:
self.flag_pose_msg.header.stamp = self.get_clock().now().to_msg()
self.flag_pose_publisher.publish(msg=self.flag_pose_msg)
def main(args=None) -> None:
flag_pose_publisher = None
try:
rclpy.init(args=args)
flag_pose_publisher = FlagPosePublisher()
rclpy.spin(node=flag_pose_publisher)
except (KeyboardInterrupt, ExternalShutdownException):
pass
finally:
if flag_pose_publisher is not None:
flag_pose_publisher.destroy_node()
if rclpy.ok():
rclpy.shutdown()
if __name__ == '__main__':
main()
这里我们定义了一个名为 flag_pose_publisher 的节点,直接发布固定的位姿信息,我们当前第一阶段是这样的目的,后面再改成实时读取的,主要是验证是否可以跟随目标。
然后创建 src/kibot_one_control/kibot_one_control/follow_controller.py文件,写入下面的内容:
import math
from typing import cast
import rclpy
from geometry_msgs.msg import PoseStamped, Twist # type: ignore
from kibot_one_interface.msg import ModeState # type: ignore
from nav_msgs.msg import Odometry # type: ignore
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
FOLLOW_MODE_VALUE = 3
class FollowController(Node):
def __init__(self) -> None:
super().__init__(node_name='follow_controller')
params = [
('mode_topic', 'mode'),
('odom_topic', 'odom'),
('flag_pose_topic', 'flag_pose'),
('cmd_vel_raw_topic', 'cmd_vel_raw'),
('control_period', 0.05),
('stop_distance', 0.60),
('heading_tolerance', 0.20),
('linear_gain', 0.60),
('angular_gain', 1.80),
('max_linear_speed', 0.50),
('max_angular_speed', 1.20),
]
self.declare_parameters(namespace='', parameters=params)
self.robot_x: float | None = None
self.robot_y: float | None = None
self.robot_yaw: float | None = None
self.flag_x: float | None = None
self.flag_y: float | None = None
self.current_mode = 2
self.mode_qos = QoSProfile(
history=QoSHistoryPolicy.KEEP_LAST,
depth=1,
reliability=QoSReliabilityPolicy.RELIABLE,
durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
)
self.cmd_vel_raw_publisher = self.create_publisher(
msg_type=Twist,
topic=cast(str, self.get_parameter('cmd_vel_raw_topic').value),
qos_profile=10,
)
self.create_subscription(
msg_type=ModeState,
topic=cast(str, self.get_parameter('mode_topic').value),
callback=self._mode_callback,
qos_profile=self.mode_qos,
)
self.create_subscription(
msg_type=Odometry,
topic=cast(str, self.get_parameter('odom_topic').value),
callback=self._odom_callback,
qos_profile=10,
)
self.create_subscription(
msg_type=PoseStamped,
topic=cast(str, self.get_parameter('flag_pose_topic').value),
callback=self._flag_pose_callback,
qos_profile=10,
)
self.create_timer(
timer_period_sec=cast(float, self.get_parameter('control_period').value),
callback=self._control_loop,
)
def _mode_callback(self, msg: ModeState) -> None:
self.current_mode = msg.current_mode
def _odom_callback(self, msg: Odometry) -> None:
self.robot_x = msg.pose.pose.position.x
self.robot_y = msg.pose.pose.position.y
orientation = msg.pose.pose.orientation
siny_cosp = 2.0 * (orientation.w * orientation.z + orientation.x * orientation.y)
cosy_cosp = 1.0 - 2.0 * (orientation.y * orientation.y + orientation.z * orientation.z)
self.robot_yaw = math.atan2(siny_cosp, cosy_cosp)
def _flag_pose_callback(self, msg: PoseStamped) -> None:
self.flag_x = msg.pose.position.x
self.flag_y = msg.pose.position.y
def _control_loop(self) -> None:
if self.current_mode != FOLLOW_MODE_VALUE:
return
if self.robot_x is None or self.robot_y is None or self.robot_yaw is None:
self.cmd_vel_raw_publisher.publish(Twist())
return
if self.flag_x is None or self.flag_y is None:
self.cmd_vel_raw_publisher.publish(Twist())
return
dx = self.flag_x - self.robot_x
dy = self.flag_y - self.robot_y
distance = math.hypot(dx, dy)
stop_distance = cast(float, self.get_parameter('stop_distance').value)
if distance <= stop_distance:
self.cmd_vel_raw_publisher.publish(Twist())
return
target_yaw = math.atan2(dy, dx)
heading_error = self._normalize_angle(target_yaw - self.robot_yaw)
linear_speed = min(
cast(float, self.get_parameter('max_linear_speed').value),
cast(float, self.get_parameter('linear_gain').value) * max(0.0, distance - stop_distance),
)
angular_speed = max(
-cast(float, self.get_parameter('max_angular_speed').value),
min(
cast(float, self.get_parameter('max_angular_speed').value),
cast(float, self.get_parameter('angular_gain').value) * heading_error,
),
)
command = Twist()
command.angular.z = angular_speed
if abs(heading_error) < cast(float, self.get_parameter('heading_tolerance').value):
command.linear.x = linear_speed
self.cmd_vel_raw_publisher.publish(command)
@staticmethod
def _normalize_angle(angle: float) -> float:
return math.atan2(math.sin(angle), math.cos(angle))
def main(args=None) -> None:
follow_controller = None
try:
rclpy.init(args=args)
follow_controller = FollowController()
rclpy.spin(follow_controller)
except (KeyboardInterrupt, ExternalShutdownException):
pass
finally:
if follow_controller is not None:
follow_controller.destroy_node()
if rclpy.ok():
rclpy.shutdown()
if __name__ == '__main__':
main()
这里我们创建了一个名为 follow_controller 的节点,它会订阅 /mode这个当前模式、机器人自身位置 /odom 和旗帜位置 flag_pose。