260408
光有状态切换的服务还不够——这些状态还要有对应的话题来发布对应的话题。
以及,考虑到我们的服务中有巡航模式,这个需要一定的线速度才行。
因此我们也需要修改一下我们的服务文件,添加 linear_velocity 这个参数:
uint8 target_mode
geometry_msgs/Twist linear_velocity
---
bool success
string message
然后在我们的 src/kibot_one_interface/CMakeLists.txt 中加入 geometry_msgs 作为依赖项:
find_package(geometry_msgs REQUIRED)
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/Mode.msg"
"srv/Mode.srv"
DEPENDENCIES geometry_msgs
)
src/kibot_one_interface/package.xml 中加入这个依赖:
<depend>geometry_msgs</depend>
<buildtool_depend>rosidl_default_generators</buildtool_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
然后我们可以写上我们的发布者和 timer 了:
self.linear_velocity = Twist()
params = [
("mode", Mode.MANUAL),
("mode_pub_timer_period", 0.01667)
]
self.declare_parameters(namespace="", parameters=cast(Any, params))
self.mode_vel_publisher = self.create_publisher(
msg_type=Twist,
topic="cmd_vel_raw",
qos_profile=10
)
self.pub_timer = self.create_timer(
timer_period_sec=cast(
float, self.get_parameter("mode_pub_timer_period").value
),
callback=self._pub_timer_callback
)
这里我们也新增了一个参数为 mode_pub_timer_period ,以及新增了一个变量名为 self.linear_velocity,用来控制线速度。
我们的 _change_mode 的函数那里也需要在设置参数后修改这个变量:
all_new_parameters = [new_mode_param]
self.set_parameters(all_new_parameters)
if new_mode == Mode.CRUISE:
self.linear_velocity = request.linear_velocity
下面这个就是我们目前的这个 _pub_timer_callback 回调了。
def _pub_timer_callback(self) -> None:
current_mode = self._get_current_mode()
match current_mode:
case Mode.STOP:
self.linear_velocity = Twist()
case Mode.CRUISE:
pass
case Mode.MANUAL:
self.linear_velocity = Twist()
case Mode.FOLLOW:
pass
if current_mode not in [Mode.MANUAL, Mode.FOLLOW]:
self.mode_vel_publisher.publish(self.linear_velocity)
除了修改这个以外,我们还需要修改一下我们的键盘控制器,让它仅在模式控制器节点的模式为手动的情况下才会发布消息到 cmd_vel_raw。
首先是增加这个参数客户端来获取我们 mode_control 的参数:
self.mode_control_param_client = AsyncParameterClient(node=self, remote_node_name=MODE_CONTROL_NAME)
self.mode_control_ok = self.mode_control_param_client.wait_for_services(timeout_sec=3.0)
if not self.mode_control_ok:
self.get_logger().error(f"{MODE_CONTROL_NAME} 节点参数不可用")
然后修改我们的 run 方法:
def run(self) -> None:
self.print_help()
current_mode = 2
try:
while rclpy.ok():
key = self.read_key()
if key is None:
rclpy.spin_once(self, timeout_sec=0.0)
continue
if key == 'q':
self.publish_stop()
break
if key in ('x', ' '):
self.publish_stop()
continue
cmd = self.key_to_twist(key)
if self.mode_control_ok:
future = self.mode_control_param_client.get_parameters(names=["mode"])
rclpy.spin_until_future_complete(node=self, future=future, timeout_sec=3.0)
if not future.done() or future.result() is None:
self.get_logger().error(f"获取 {MODE_CONTROL_NAME} 参数失败")
else:
response = cast(GetParameters_Response, future.result())
if not response.values:
self.get_logger().error(f"{MODE_CONTROL_NAME} 未返回 mode 参数")
else:
current_mode = response.values[0].integer_value # type: ignore
if cmd is not None and current_mode == 2:
self.cmd_vel_raw_publisher.publish(cmd)
finally:
self.restore_terminal()
现在就只有在手动模式下才会发布这个消息了。
接下来把我们的这个模式控制器也加入到我们的 src/kibot_one_sim/launch/kibot_one.launch.py 启动文件中:
mode_arg = DeclareLaunchArgument(
name="mode",
default_value="2",
description="小车的运行模式"
)
mode_pub_timer_period_arg = DeclareLaunchArgument(
name="mode_pub_timer_period",
default_value="0.01667",
description="模式控制器发布 STOP, CRUISE 模式下速度的时间间隔,单位秒"
)
# ...
start_mode_control = Node(
package="kibot_one_control",
executable="mode_control",
name="mode_control",
output="screen",
parameters=[{
"mode": LaunchConfiguration("mode"),
"mode_pub_timer_period": LaunchConfiguration("mode_pub_timer_period")
}]
)
return LaunchDescription([
world_arg,
stop_time_period_arg,
watch_time_period_arg,
mode_arg,
mode_pub_timer_period_arg,
start_sim,
start_cmd_vel_watchdog,
start_mode_control,
])
构建启动后,看看我们的节点列表:
(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotOne$ ros2 node list
/cmd_vel_watchdog
/kibot_one_bridge
/mode_control
可以看到我们的模式控制器已经加入到了这个里面。
我们测试一下改到巡航:
(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotOne$ ros2 service call /mode_control kibot_one_interface/srv/Mode "{
target_mode: 1,
linear_velocity: {
linear: {x: 0.8, y: 0.0, z: 0.0},
angular: {x: 0.0, y: 0.0, z: 0.8}
}
}"
waiting for service to become available...
requester: making request: kibot_one_interface.srv.Mode_Request(target_mode=1, linear_velocity=geometry_msgs.msg.Twist(linear=geometry_msgs.msg.Vector3(x=0.8, y=0.0, z=0.0), angular=geometry_msgs.msg.Vector3(x=0.0, y=0.0, z=0.8)))
response:
kibot_one_interface.srv.Mode_Response(success=True, message='成功改变运行模式 STOP -> CRUISE')
效果:
改到停止:
(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotOne$ ros2 service call /mode_control kibot_one_interface/srv/Mode "{
target_mode: 0,
linear_velocity: {
linear: {x: 0.0, y: 0.0, z: 0.0},
angular: {x: 0.0, y: 0.0, z: 0.0}
}
}"
waiting for service to become available...
requester: making request: kibot_one_interface.srv.Mode_Request(target_mode=0, linear_velocity=geometry_msgs.msg.Twist(linear=geometry_msgs.msg.Vector3(x=0.0, y=0.0, z=0.0), angular=geometry_msgs.msg.Vector3(x=0.0, y=0.0, z=0.0)))
response:
kibot_one_interface.srv.Mode_Response(success=True, message='成功改变运行模式 CRUISE -> STOP')
效果:
改到手动:
(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotOne$ ros2 service call /mode_control kibot_one_interface/srv/Mode "{
target_mode: 2,
linear_velocity: {
linear: {x: 0.0, y: 0.0, z: 0.0},
angular: {x: 0.0, y: 0.0, z: 0.0}
}
}"
waiting for service to become available...
requester: making request: kibot_one_interface.srv.Mode_Request(target_mode=2, linear_velocity=geometry_msgs.msg.Twist(linear=geometry_msgs.msg.Vector3(x=0.0, y=0.0, z=0.0), angular=geometry_msgs.msg.Vector3(x=0.0, y=0.0, z=0.0)))
response:
kibot_one_interface.srv.Mode_Response(success=True, message='成功改变运行模式 STOP -> MANUAL')
效果:
就这样,我们完成了我们的停止、巡航和手动操作等状态。
以上。