260408
我们目前为止都是直接通过 cli 来发布话题,这样会显得很不方便。
因此我们可以新增一个节点,用来监听我们键盘的输入,然后转为 cmd_vel来进行发布,以此来实现我们的控制功能。
我们预期是直接加到满速,而非慢慢加速,按键方案是 WASD + 空格/X 停止 + Q 退出。
以及,由于我们当前的目标限制,我们不怎么考虑组合键的情况,比如 W+A 的情况我们不予考虑。
我们可以创建 src/kibot_one_control/kibot_one_control/keyboard_teleop.py 文件并写入下面的代码来创建这个节点和对应的控制功能:
import select
import sys
import termios
import tty
import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from geometry_msgs.msg import Twist
class KeyboardTeleop(Node):
def __init__(self) -> None:
super().__init__('keyboard_teleop')
self.declare_parameter('cmd_vel_raw_topic', 'cmd_vel_raw')
self.declare_parameter('linear_speed', 0.8)
self.declare_parameter('angular_speed', 1.5)
self.declare_parameter('poll_timeout', 0.1)
cmd_vel_raw_topic = self.get_parameter(
'cmd_vel_raw_topic'
).get_parameter_value().string_value
self.linear_speed = self.get_parameter(
'linear_speed'
).get_parameter_value().double_value
self.angular_speed = self.get_parameter(
'angular_speed'
).get_parameter_value().double_value
self.poll_timeout = self.get_parameter(
'poll_timeout'
).get_parameter_value().double_value
self.cmd_vel_raw_publisher = self.create_publisher(Twist, cmd_vel_raw_topic, 10)
self._settings = termios.tcgetattr(sys.stdin)
def print_help(self) -> None:
print(
'控制器正在运行.\n'
'使用 WASD 进行移动.\n'
' w: 前进\n'
' s: 后退\n'
' a: 原地左转\n'
' d: 原地右转\n'
'space/x: 停止\n'
'q: 退出\n'
)
def run(self) -> None:
self.print_help()
try:
while rclpy.ok():
key = self.read_key()
if key is None:
rclpy.spin_once(self, timeout_sec=0.0)
continue
if key == 'q':
self.publish_stop()
break
if key in ('x', ' '):
self.publish_stop()
continue
cmd = self.key_to_twist(key)
if cmd is not None:
self.cmd_vel_raw_publisher.publish(cmd)
finally:
self.restore_terminal()
def read_key(self) -> str | None:
tty.setraw(sys.stdin.fileno())
readable, _, _ = select.select([sys.stdin], [], [], self.poll_timeout)
if not readable:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self._settings)
return None
key = sys.stdin.read(1)
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self._settings)
return key.lower()
def key_to_twist(self, key: str) -> Twist | None:
msg = Twist()
if key == 'w':
msg.linear.x = self.linear_speed
print("W - 向前")
elif key == 's':
msg.linear.x = -self.linear_speed
print("S - 向后")
elif key == 'a':
msg.angular.z = self.angular_speed
print("A - 原地左转")
elif key == 'd':
msg.angular.z = -self.angular_speed
print("D - 原地右转")
else:
return None
return msg
def publish_stop(self) -> None:
self.cmd_vel_raw_publisher.publish(Twist())
def restore_terminal(self) -> None:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self._settings)
def main(args=None) -> None:
keyboard_teleop: KeyboardTeleop | None = None
try:
rclpy.init(args=args)
keyboard_teleop = KeyboardTeleop()
keyboard_teleop.run()
except (KeyboardInterrupt, ExternalShutdownException):
if keyboard_teleop is not None:
keyboard_teleop.publish_stop()
finally:
if keyboard_teleop is not None:
keyboard_teleop.restore_terminal()
keyboard_teleop.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
这里出现了我们没有见过的方法——self._settings = termios.tcgetattr(sys.stdin)。
这里有两个新的方法,分别是termios.tcgetattr(...),这是 python 的标准库,用来读取一个终端设备当前的属性。
然后是sys.stdin了,这个方法是当前进程的标准输入。
所以这个语句的实际意思就是,把“当前这个终端现在的配置”读出来,保存到 self._settings 里。
为什么要这么干呢?
这就要牵扯到我们代码中的tty.setraw(sys.stdin.fileno())这行语句了。
setraw() 会把终端切到一种更底层的输入模式,也就是 raw mode。
这样我们不需要输入回车,在当前终端下按下任何一个键就可以被检测到。
但是这个方法有个副作用——终端的正常行为会被改变。
因此我们需要在退出来的时候把终端恢复回来,不然我们的终端很可能会显得很奇怪。
同时,我们的 while 循环也使用的是 rclpy.ok() 来作为判断条件,这个语句的意思就是当 ROS 仍在运行的时候,就继续这个循环,而不会当 ROS 停止的时候该循环不会退出或对 ROS 造成阻塞。
将我们的这个脚本加入到我们的入口中:
entry_points={
'console_scripts': [
'cmd_vel_watchdog = kibot_one_control.cmd_vel_watchdog:main',
'keyboard_teleop = kibot_one_control.keyboard_teleop:main',
],
},
然后构建,运行:
ros2 run kibot_one_control keyboard_teleop
就这样,我们就完成了我们的键盘控制器。
以上。
接下来,我们可以去做一下我们要完成的三个服务了,分别是:
- stop:停止,持续输出零速度。
- cruise:固定线速度巡航。
- manual:手动操作,在切换到该模式前不会接受 cmd_vel_raw 中的任何消息。
- follow:跟随目标点。
后面我们会创建一个动作叫做 manual_goal, 用于让我们的小车导航到指定的目标点,但是在这之前我们需要先完成我们的服务。
前两个都是比较简单的,唯有第三个比较麻烦,虽然现在不需要上一个传感器(这是我们 Target 2 才要考虑的事情),但是依旧要考虑到比较多的东西。
我们就将这个节点命名为 mode_control 吧。
我们可以创建 src/kibot_one_interface/msg/Mode.msg, 在其中写入下面的内容:
uint8 STOP = 0
uint8 CRUISE = 1
uint8 MANUAL = 2
uint8 FOLLOW = 3
然后创建 src/kibot_one_interface/srv/Mode.srv , 在其中写入下面的内容:
uint8 target_mode
---
bool success
string message
然后在我们的 src/kibot_one_interface/CMakeLists.txt 增加下面的内容,放到 ament_package() 的上方:
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/Mode.msg"
"srv/Mode.srv"
)
然后在 src/kibot_one_interface/package.xml 中 package 中加入下面这一行来声明这个是接口包:
<member_of_group>rosidl_interface_packages</member_of_group>
接下来进行构建,就可以在我们的 python 中使用这些消息和服务的类型了。
我们可以在 src/kibot_one_control/kibot_one_control/mode_control.py 中写入如下的代码:
from typing import Any, cast
from enum import IntEnum
import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from rclpy.parameter import Parameter
from kibot_one_interface.srv import Mode as ModeSrv
class Mode(IntEnum):
STOP = 0
CRUISE = 1
MANUAL = 2
FOLLOW = 3
class ModeControl(Node):
def __init__(self) -> None:
super().__init__(node_name="ModeControl")
params = [
("mode", Mode.MANUAL)
]
self.declare_parameters(namespace="", parameters=cast(Any, params))
self.srv = self.create_service(srv_type=ModeSrv, srv_name="mode_control", callback=self._change_mode)
def _change_mode(self, request: ModeSrv.Request, response: ModeSrv.Response) -> ModeSrv.Response:
message = ""
try:
current_mode = Mode(self.get_parameter("mode"))
new_mode = Mode(value=request.target_mode)
new_mode_param = Parameter(
name="mode",
type_=Parameter.Type.INTEGER,
value=new_mode.value
)
all_new_parameters = [new_mode_param]
self.set_parameters(all_new_parameters)
response.success = True
response.message = f"成功改变运行模式 {current_mode.name} -> {new_mode.name}"
return response
except ValueError:
message += f"不支持的运行模式枚举值: {request.target_mode}\n"
for member in Mode:
message += f"支持的运行模式:{member.value} -> {member.name}\n"
self.get_logger().error(message=message)
response.success = False
response.message = message
return response
except Exception as e:
message = f"改变运行模式时出现错误 {e}"
self.get_logger().error(message=message)
response.success = False
response.message = message
return response
def main(args = None) -> None:
mode_control = None
try:
rclpy.init(args=args)
mode_control = ModeControl()
rclpy.spin(mode_control)
except (KeyboardInterrupt, ExternalShutdownException):
pass
finally:
if mode_control is not None:
mode_control.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()
添加入口:
entry_points={
'console_scripts': [
'cmd_vel_watchdog = kibot_one_control.cmd_vel_watchdog:main',
'keyboard_teleop = kibot_one_control.keyboard_teleop:main',
'mode_control = kibot_one_control.mode_control:main',
],
},
构建,然后运行:
ros2 run kibot_one_control mode_control
我们可以查阅我们现在有哪些节点:
(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotOne$ ros2 node list
/ModeControl
/cmd_vel_watchdog
/kibot_one_bridge
可以看到已经有我们的 ModeControl 节点了,但是命名不太对。
我们修改运行命名后重新启动,查看节点列表:
(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotOne$ ros2 node list
/cmd_vel_watchdog
/kibot_one_bridge
/mode_control
可以看到现在这个名称就符合我们的预期了。
接下来查看我们有哪些服务:
(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotOne$ ros2 service list | grep "mode_control"
/mode_control
/mode_control/describe_parameters
/mode_control/get_parameter_types
/mode_control/get_parameters
/mode_control/get_type_description
/mode_control/list_parameters
/mode_control/set_parameters
/mode_control/set_parameters_atomically
可以看到我们的这个服务也有了,然后就是我们的这个节点的参数了:
(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotOne$ ros2 param list | grep "mode"
/mode_control:
mode
(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotOne$ ros2 param get /mode_control mode
Integer value is: 2
可以看到默认的值是我们的 2,即 MANUAL 手动。
我们尝试发一条请求来更改它的模式:
(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotOne$ ros2 service call /mode_control kibot_one_interface/srv/Mode "{target_mode: 1}"
waiting for service to become available...
requester: making request: kibot_one_interface.srv.Mode_Request(target_mode=1)
response:
kibot_one_interface.srv.Mode_Response(success=False, message='不支持的运行模式枚举值: 1\n支持的运行模式:0 -> STOP\n支持的运行模式:1 -> CRUISE\n支持的运行模式:2 -> MANUAL\n支持的运行模式:3 -> FOLLOW\n')
嗯,请求失败了。
可以看到有下面的问题:错误声称为不支持的枚举值。
但是我们可以看到我们的值按理说是正常的。
经过排查,我们发现问题出在我们获取参数值这里:
current_mode = Mode(self.get_parameter("mode"))
这里我们直接获取了参数,但是没有获取它的值。
因此应该改成下面这样的:
current_mode = Mode(self.get_parameter("mode").value)
重启,我们现在可以看到我们成功请求了:
(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotOne$ ros2 service call /mode_control kibot_one_interface/srv/Mode "{target_mode: 1}"
waiting for service to become available...
requester: making request: kibot_one_interface.srv.Mode_Request(target_mode=1)
response:
kibot_one_interface.srv.Mode_Response(success=True, message='成功改变运行模式 MANUAL -> CRUISE')
然后再次查看这个参数,看看现在这个参数是多少了:
(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotOne$ ros2 param get /mode_control mode
Integer value is: 1
可以看到变成了我们的目标值——1。
就这样,我们创建好了我们的状态切换服务。
以上。