260408

我们目前为止都是直接通过 cli 来发布话题,这样会显得很不方便。

因此我们可以新增一个节点,用来监听我们键盘的输入,然后转为 cmd_vel来进行发布,以此来实现我们的控制功能。

我们预期是直接加到满速,而非慢慢加速,按键方案是 WASD + 空格/X 停止 + Q 退出。

以及,由于我们当前的目标限制,我们不怎么考虑组合键的情况,比如 W+A 的情况我们不予考虑。

我们可以创建 src/kibot_one_control/kibot_one_control/keyboard_teleop.py 文件并写入下面的代码来创建这个节点和对应的控制功能:

import select
import sys
import termios
import tty

import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node

from geometry_msgs.msg import Twist


class KeyboardTeleop(Node):

    def __init__(self) -> None:
        super().__init__('keyboard_teleop')

        self.declare_parameter('cmd_vel_raw_topic', 'cmd_vel_raw')
        self.declare_parameter('linear_speed', 0.8)
        self.declare_parameter('angular_speed', 1.5)
        self.declare_parameter('poll_timeout', 0.1)

        cmd_vel_raw_topic = self.get_parameter(
            'cmd_vel_raw_topic'
        ).get_parameter_value().string_value

        self.linear_speed = self.get_parameter(
            'linear_speed'
        ).get_parameter_value().double_value
        self.angular_speed = self.get_parameter(
            'angular_speed'
        ).get_parameter_value().double_value
        self.poll_timeout = self.get_parameter(
            'poll_timeout'
        ).get_parameter_value().double_value

        self.cmd_vel_raw_publisher = self.create_publisher(Twist, cmd_vel_raw_topic, 10)

        self._settings = termios.tcgetattr(sys.stdin)

    def print_help(self) -> None:
        print(
            '控制器正在运行.\n'
            '使用 WASD 进行移动.\n'
            '  w: 前进\n'
            '  s: 后退\n'
            '  a: 原地左转\n'
            '  d: 原地右转\n'
            'space/x: 停止\n'
            'q: 退出\n'
        )

    def run(self) -> None:
        self.print_help()

        try:
            while rclpy.ok():
                key = self.read_key()
                if key is None:
                    rclpy.spin_once(self, timeout_sec=0.0)
                    continue

                if key == 'q':
                    self.publish_stop()
                    break

                if key in ('x', ' '):
                    self.publish_stop()
                    continue

                cmd = self.key_to_twist(key)
                if cmd is not None:
                    self.cmd_vel_raw_publisher.publish(cmd)
        finally:
            self.restore_terminal()

    def read_key(self) -> str | None:
        tty.setraw(sys.stdin.fileno())
        readable, _, _ = select.select([sys.stdin], [], [], self.poll_timeout)
        if not readable:
            termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self._settings)
            return None

        key = sys.stdin.read(1)
        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self._settings)
        return key.lower()

    def key_to_twist(self, key: str) -> Twist | None:
        msg = Twist()

        if key == 'w':
            msg.linear.x = self.linear_speed
            print("W - 向前")
        elif key == 's':
            msg.linear.x = -self.linear_speed
            print("S - 向后")
        elif key == 'a':
            msg.angular.z = self.angular_speed
            print("A - 原地左转")
        elif key == 'd':
            msg.angular.z = -self.angular_speed
            print("D - 原地右转")
        else:
            return None

        return msg

    def publish_stop(self) -> None:
        self.cmd_vel_raw_publisher.publish(Twist())

    def restore_terminal(self) -> None:
        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self._settings)


def main(args=None) -> None:
    keyboard_teleop: KeyboardTeleop | None = None

    try:
        rclpy.init(args=args)
        keyboard_teleop = KeyboardTeleop()
        keyboard_teleop.run()
    except (KeyboardInterrupt, ExternalShutdownException):
        if keyboard_teleop is not None:
            keyboard_teleop.publish_stop()
    finally:
        if keyboard_teleop is not None:
            keyboard_teleop.restore_terminal()
            keyboard_teleop.destroy_node()
        rclpy.shutdown()


if __name__ == '__main__':
    main()

这里出现了我们没有见过的方法——self._settings = termios.tcgetattr(sys.stdin)。

这里有两个新的方法,分别是termios.tcgetattr(...),这是 python 的标准库,用来读取一个终端设备当前的属性。

然后是sys.stdin了,这个方法是当前进程的标准输入。

所以这个语句的实际意思就是,把“当前这个终端现在的配置”读出来,保存到 self._settings 里。

为什么要这么干呢?

这就要牵扯到我们代码中的tty.setraw(sys.stdin.fileno())这行语句了。

setraw() 会把终端切到一种更底层的输入模式,也就是 raw mode。

这样我们不需要输入回车,在当前终端下按下任何一个键就可以被检测到。

但是这个方法有个副作用——终端的正常行为会被改变。

因此我们需要在退出来的时候把终端恢复回来,不然我们的终端很可能会显得很奇怪。

同时,我们的 while 循环也使用的是 rclpy.ok() 来作为判断条件,这个语句的意思就是当 ROS 仍在运行的时候,就继续这个循环,而不会当 ROS 停止的时候该循环不会退出或对 ROS 造成阻塞。

将我们的这个脚本加入到我们的入口中:

    entry_points={
        'console_scripts': [
            'cmd_vel_watchdog = kibot_one_control.cmd_vel_watchdog:main',
            'keyboard_teleop = kibot_one_control.keyboard_teleop:main',
        ],
    },

然后构建,运行:

ros2 run kibot_one_control keyboard_teleop 

就这样,我们就完成了我们的键盘控制器。

以上。

接下来,我们可以去做一下我们要完成的三个服务了,分别是:

  • stop:停止,持续输出零速度。
  • cruise:固定线速度巡航。
  • manual:手动操作,在切换到该模式前不会接受 cmd_vel_raw 中的任何消息。
  • follow:跟随目标点。

后面我们会创建一个动作叫做 manual_goal, 用于让我们的小车导航到指定的目标点,但是在这之前我们需要先完成我们的服务。

前两个都是比较简单的,唯有第三个比较麻烦,虽然现在不需要上一个传感器(这是我们 Target 2 才要考虑的事情),但是依旧要考虑到比较多的东西。

我们就将这个节点命名为 mode_control 吧。

我们可以创建 src/kibot_one_interface/msg/Mode.msg, 在其中写入下面的内容:

uint8 STOP = 0
uint8 CRUISE = 1
uint8 MANUAL = 2
uint8 FOLLOW = 3

然后创建 src/kibot_one_interface/srv/Mode.srv , 在其中写入下面的内容:

uint8 target_mode
---
bool success
string message

然后在我们的 src/kibot_one_interface/CMakeLists.txt 增加下面的内容,放到 ament_package() 的上方:

find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/Mode.msg"
  "srv/Mode.srv"
)

然后在 src/kibot_one_interface/package.xml 中 package 中加入下面这一行来声明这个是接口包:

<member_of_group>rosidl_interface_packages</member_of_group>

接下来进行构建,就可以在我们的 python 中使用这些消息和服务的类型了。

我们可以在 src/kibot_one_control/kibot_one_control/mode_control.py 中写入如下的代码:

from typing import Any, cast
from enum import IntEnum

import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from rclpy.parameter import Parameter

from kibot_one_interface.srv import Mode as ModeSrv

class Mode(IntEnum):
    STOP = 0
    CRUISE = 1
    MANUAL = 2
    FOLLOW = 3

class ModeControl(Node):
    def __init__(self) -> None:
        super().__init__(node_name="ModeControl")

        params = [
            ("mode", Mode.MANUAL)
        ]
        self.declare_parameters(namespace="", parameters=cast(Any, params))
      
        self.srv = self.create_service(srv_type=ModeSrv, srv_name="mode_control", callback=self._change_mode)

    def _change_mode(self, request: ModeSrv.Request, response: ModeSrv.Response) -> ModeSrv.Response:
        message = ""
      
        try:
            current_mode = Mode(self.get_parameter("mode"))
            new_mode = Mode(value=request.target_mode)
            new_mode_param = Parameter(
                name="mode",
                type_=Parameter.Type.INTEGER,
                value=new_mode.value
            )
            all_new_parameters = [new_mode_param]
            self.set_parameters(all_new_parameters)
          
            response.success = True
            response.message = f"成功改变运行模式 {current_mode.name} -> {new_mode.name}"

            return response

        except ValueError:
            message += f"不支持的运行模式枚举值: {request.target_mode}\n"

            for member in Mode:
                message += f"支持的运行模式:{member.value} -> {member.name}\n"

            self.get_logger().error(message=message)

            response.success = False
            response.message = message

            return response
      
        except Exception as e:
            message = f"改变运行模式时出现错误 {e}"

            self.get_logger().error(message=message)

            response.success = False
            response.message = message

            return response
      
def main(args = None) -> None:
    mode_control = None
    try:
        rclpy.init(args=args)
        mode_control = ModeControl()
        rclpy.spin(mode_control)
    except (KeyboardInterrupt, ExternalShutdownException):
        pass
    finally:
        if mode_control is not None:
            mode_control.destroy_node()
        rclpy.shutdown()

if __name__ == "__main__":
    main()

添加入口:

    entry_points={
        'console_scripts': [
            'cmd_vel_watchdog = kibot_one_control.cmd_vel_watchdog:main',
            'keyboard_teleop = kibot_one_control.keyboard_teleop:main',
            'mode_control = kibot_one_control.mode_control:main',
        ],
    },

构建,然后运行:

ros2 run kibot_one_control mode_control 

我们可以查阅我们现在有哪些节点:

(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotOne$ ros2 node list
/ModeControl
/cmd_vel_watchdog
/kibot_one_bridge

可以看到已经有我们的 ModeControl 节点了,但是命名不太对。

我们修改运行命名后重新启动,查看节点列表:

(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotOne$ ros2 node list
/cmd_vel_watchdog
/kibot_one_bridge
/mode_control

可以看到现在这个名称就符合我们的预期了。

接下来查看我们有哪些服务:

(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotOne$ ros2 service list | grep "mode_control"
/mode_control
/mode_control/describe_parameters
/mode_control/get_parameter_types
/mode_control/get_parameters
/mode_control/get_type_description
/mode_control/list_parameters
/mode_control/set_parameters
/mode_control/set_parameters_atomically

可以看到我们的这个服务也有了,然后就是我们的这个节点的参数了:

(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotOne$ ros2 param list | grep "mode"
/mode_control:
  mode
(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotOne$ ros2 param get /mode_control mode
Integer value is: 2

可以看到默认的值是我们的 2,即 MANUAL 手动。

我们尝试发一条请求来更改它的模式:

(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotOne$ ros2 service call /mode_control kibot_one_interface/srv/Mode "{target_mode: 1}"
waiting for service to become available...
requester: making request: kibot_one_interface.srv.Mode_Request(target_mode=1)

response:
kibot_one_interface.srv.Mode_Response(success=False, message='不支持的运行模式枚举值: 1\n支持的运行模式:0 -> STOP\n支持的运行模式:1 -> CRUISE\n支持的运行模式:2 -> MANUAL\n支持的运行模式:3 -> FOLLOW\n')

嗯,请求失败了。

可以看到有下面的问题:错误声称为不支持的枚举值。

但是我们可以看到我们的值按理说是正常的。

经过排查,我们发现问题出在我们获取参数值这里:

            current_mode = Mode(self.get_parameter("mode"))

这里我们直接获取了参数,但是没有获取它的值。

因此应该改成下面这样的:

            current_mode = Mode(self.get_parameter("mode").value)

重启,我们现在可以看到我们成功请求了:

(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotOne$ ros2 service call /mode_control kibot_one_interface/srv/Mode "{target_mode: 1}"
waiting for service to become available...
requester: making request: kibot_one_interface.srv.Mode_Request(target_mode=1)

response:
kibot_one_interface.srv.Mode_Response(success=True, message='成功改变运行模式 MANUAL -> CRUISE')

然后再次查看这个参数,看看现在这个参数是多少了:

(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotOne$ ros2 param get /mode_control mode
Integer value is: 1

可以看到变成了我们的目标值——1。

就这样,我们创建好了我们的状态切换服务。

以上。