260424
经过我的考虑,我决定还是将我们的小车做得更完善一些,需要包括下面的内容:
- 一个核心的控制台的文件和对应的 launch文件,可以直接在这里切换控制模式,且在选择巡航模式的时候允许输入线速度和角速度,以及在手操模式下允许直接就在这个 TUI 进行手操,然后在手操模式的实际手操中退出后应当回到 TUI,该 TUI 的使用方式为方向键(WASD/方向键,且在手操模式下也同样的操作逻辑)+回车+q。
- 我们需要一个 Github Page 来展示我们的成果,包括手操、自主避障导航的 HTML 演示,以及视频演示。
我们先完成第一个:
import select
import sys
import termios
import tty
from enum import IntEnum
from typing import cast
import rclpy
from geometry_msgs.msg import Twist # type: ignore
from kibot_one_interface.msg import ModeState # type: ignore
from kibot_one_interface.srv import Mode as ModeSrv # type: ignore
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
class Mode(IntEnum):
STOP = 0
CRUISE = 1
MANUAL = 2
FOLLOW = 3
KEY_UP = 'up'
KEY_DOWN = 'down'
KEY_LEFT = 'left'
KEY_RIGHT = 'right'
KEY_ENTER = 'enter'
KEY_SPACE = 'space'
KEY_Q = 'q'
class ControlConsole(Node):
def __init__(self) -> None:
super().__init__(node_name='control_console')
self.declare_parameter('cmd_vel_raw_topic', 'cmd_vel_raw')
self.declare_parameter('mode_topic', 'mode')
self.declare_parameter('mode_service', 'mode_control')
self.declare_parameter('linear_speed', 0.8)
self.declare_parameter('angular_speed', 1.5)
self.declare_parameter('poll_timeout', 0.1)
self.declare_parameter('service_timeout', 3.0)
self.linear_speed = self.get_parameter('linear_speed').get_parameter_value().double_value
self.angular_speed = self.get_parameter('angular_speed').get_parameter_value().double_value
self.poll_timeout = self.get_parameter('poll_timeout').get_parameter_value().double_value
self.service_timeout = self.get_parameter('service_timeout').get_parameter_value().double_value
self.current_mode: Mode | None = None
self.status_message = '准备就绪'
self.menu_items = [
(Mode.STOP, 'STOP 停止'),
(Mode.CRUISE, 'CRUISE 巡航'),
(Mode.MANUAL, 'MANUAL 手操'),
(Mode.FOLLOW, 'FOLLOW 跟随'),
]
self.selected_index = 0
mode_qos = QoSProfile(
history=QoSHistoryPolicy.KEEP_LAST,
depth=1,
reliability=QoSReliabilityPolicy.RELIABLE,
durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
)
self.mode_client = self.create_client(
srv_type=ModeSrv,
srv_name=self.get_parameter('mode_service').get_parameter_value().string_value,
)
self.cmd_vel_raw_publisher = self.create_publisher(
msg_type=Twist,
topic=self.get_parameter('cmd_vel_raw_topic').get_parameter_value().string_value,
qos_profile=10,
)
self.create_subscription(
msg_type=ModeState,
topic=self.get_parameter('mode_topic').get_parameter_value().string_value,
callback=self._mode_callback,
qos_profile=mode_qos,
)
if not sys.stdin.isatty():
raise RuntimeError('control_console 需要交互式终端,请使用 ros2 run 或 control_console.launch.py 启动。')
self._settings = termios.tcgetattr(sys.stdin)
def run(self) -> None:
try:
if not self._wait_for_mode_service():
return
while rclpy.ok():
self._draw_menu()
key = self._read_key()
rclpy.spin_once(self, timeout_sec=0.0)
if key is None:
continue
if key == KEY_Q:
self.publish_stop()
break
if key in ('w', KEY_UP):
self.selected_index = (self.selected_index - 1) % len(self.menu_items)
continue
if key in ('s', KEY_DOWN):
self.selected_index = (self.selected_index + 1) % len(self.menu_items)
continue
if key == KEY_ENTER:
self._activate_selected_mode()
finally:
self.publish_stop()
self._restore_terminal()
def _mode_callback(self, msg: ModeState) -> None:
try:
self.current_mode = Mode(msg.current_mode)
except ValueError:
self.current_mode = None
def _wait_for_mode_service(self) -> bool:
self._clear_screen()
print('KiBotOne 控制台')
print('等待 /mode_control 服务...')
if self.mode_client.wait_for_service(timeout_sec=self.service_timeout):
return True
self.status_message = '未找到 mode_control 服务,请先启动 bringup。'
self._draw_menu()
return False
def _activate_selected_mode(self) -> None:
mode = self.menu_items[self.selected_index][0]
if mode == Mode.CRUISE:
self._start_cruise_mode()
return
if mode == Mode.MANUAL:
if self._request_mode(Mode.MANUAL):
self._run_manual_mode()
return
self._request_mode(mode)
def _start_cruise_mode(self) -> None:
self._restore_terminal()
self._clear_screen()
print('CRUISE 巡航模式')
print('输入速度后按回车;直接回车使用 0.0。')
try:
linear_x = self._read_float('线速度 linear.x (m/s): ', default=0.0)
angular_z = self._read_float('角速度 angular.z (rad/s): ', default=0.0)
except (KeyboardInterrupt, EOFError):
self.status_message = '已取消巡航设置。'
return
cruise_velocity = Twist()
cruise_velocity.linear.x = linear_x
cruise_velocity.angular.z = angular_z
self._request_mode(Mode.CRUISE, cruise_velocity)
def _run_manual_mode(self) -> None:
self.publish_stop()
self.status_message = '已进入 MANUAL 手操;按 q 返回菜单。'
while rclpy.ok():
self._draw_manual_screen()
key = self._read_key()
rclpy.spin_once(self, timeout_sec=0.0)
if key is None:
continue
if key == KEY_Q:
self.publish_stop()
self.status_message = '已退出手操,回到控制台菜单。'
return
if key in (KEY_ENTER, KEY_SPACE, 'x'):
self.publish_stop()
self.status_message = '已发送停止命令。'
continue
command = self._manual_key_to_twist(key)
if command is not None:
self.cmd_vel_raw_publisher.publish(command)
def _manual_key_to_twist(self, key: str) -> Twist | None:
command = Twist()
if key in ('w', KEY_UP):
command.linear.x = self.linear_speed
self.status_message = '手操:前进'
elif key in ('s', KEY_DOWN):
command.linear.x = -self.linear_speed
self.status_message = '手操:后退'
elif key in ('a', KEY_LEFT):
command.angular.z = self.angular_speed
self.status_message = '手操:原地左转'
elif key in ('d', KEY_RIGHT):
command.angular.z = -self.angular_speed
self.status_message = '手操:原地右转'
else:
return None
return command
def _request_mode(self, mode: Mode, velocity: Twist | None = None) -> bool:
request = ModeSrv.Request()
request.target_mode = mode.value
request.linear_velocity = velocity if velocity is not None else Twist()
future = self.mode_client.call_async(request)
rclpy.spin_until_future_complete(self, future, timeout_sec=self.service_timeout)
if not future.done():
self.status_message = f'切换到 {mode.name} 超时。'
return False
response = future.result()
if response is None:
self.status_message = f'切换到 {mode.name} 失败:服务无响应。'
return False
if cast(bool, response.success):
self.status_message = cast(str, response.message)
return True
self.status_message = cast(str, response.message)
return False
def publish_stop(self) -> None:
self.cmd_vel_raw_publisher.publish(Twist())
def _draw_menu(self) -> None:
self._clear_screen()
print('KiBotOne 控制台')
print()
print(f'当前模式: {self._current_mode_label()}')
print(f'状态: {self.status_message}')
print()
print('W/S 或 ↑/↓ 选择模式,Enter 确认,q 退出')
print()
for index, (_, label) in enumerate(self.menu_items):
prefix = '>' if index == self.selected_index else ' '
print(f'{prefix} {label}')
def _draw_manual_screen(self) -> None:
self._clear_screen()
print('KiBotOne 控制台 / MANUAL 手操')
print()
print(f'当前模式: {self._current_mode_label()}')
print(f'状态: {self.status_message}')
print()
print('W/↑ 前进 S/↓ 后退')
print('A/← 左转 D/→ 右转')
print('Enter/Space 停止,q 返回菜单')
def _current_mode_label(self) -> str:
if self.current_mode is None:
return '未知'
return f'{self.current_mode.value} {self.current_mode.name}'
def _read_key(self) -> str | None:
tty.setraw(sys.stdin.fileno())
try:
readable, _, _ = select.select([sys.stdin], [], [], self.poll_timeout)
if not readable:
return None
key = sys.stdin.read(1)
if key == '\x1b':
return self._read_escape_key()
if key in ('\r', '\n'):
return KEY_ENTER
if key == ' ':
return KEY_SPACE
return key.lower()
finally:
self._restore_terminal()
def _read_escape_key(self) -> str | None:
readable, _, _ = select.select([sys.stdin], [], [], 0.01)
if not readable:
return None
sequence = sys.stdin.read(2)
match sequence:
case '[A':
return KEY_UP
case '[B':
return KEY_DOWN
case '[C':
return KEY_RIGHT
case '[D':
return KEY_LEFT
case _:
return None
def _read_float(self, prompt: str, default: float) -> float:
while True:
value = input(prompt).strip()
if value == '':
return default
try:
return float(value)
except ValueError:
print('请输入有效数字。')
def _restore_terminal(self) -> None:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self._settings)
@staticmethod
def _clear_screen() -> None:
print('\033[2J\033[H', end='')
def main(args=None) -> None:
control_console: ControlConsole | None = None
try:
rclpy.init(args=args)
control_console = ControlConsole()
control_console.run()
except RuntimeError as error:
print(error)
except (KeyboardInterrupt, ExternalShutdownException):
if control_console is not None:
control_console.publish_stop()
finally:
if control_console is not None:
control_console._restore_terminal()
control_console.destroy_node()
if rclpy.ok():
rclpy.shutdown()
if __name__ == '__main__':
main()
通过 select, termios, tty 等依赖,我们实现了这样的一个 TUI。
手操:
巡航:
跟随:
就这样,我们完成了我们的 TUI。
然后我们还需要这个 Github Page,我们创建一个 docs/index.html 好了。
最后的效果如下:

地址:https://jeseki.github.io/KiBotOne/
目前这个项目就做的差不多了。
以上。