260406

接下来就可以来写我们 ROS2 的控制部分了。

在开始写正式的控制节点,我们要解决一个问题——DiffDrive 自身的指令记住导致的在收到新的指令前会一直保持相同速度进行移动的问题。

在我们真实的机器人中,通常都会在没有收到新的指令后就快速停下,而非一直保持活动。

我们可以通过自己写一个小型的控制节点,记录最后一次收到 /cmd_vel 的时间,超过 0.5s 没有收到新的命令后,就主动发布一个 0 速度到 /cmd_vel 中。

简单的来说就是一种 watchdog。

我们可以在 src/kibot_one_control/kibot_one_control/cmd_vel_watchdog.py 中写入下面的内容来实现这样的一个 watchdog:

from datetime import datetime, timezone
from typing import List

import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node

from geometry_msgs.msg import Twist

TZ = timezone.utc

class CMDVelWatchDog(Node):

    def __init__(self) -> None:
        super().__init__(node_name='cmd_vel_watchdog')
        self.last_topic_msg_timestamp = datetime.now(tz=TZ)
      
        self.cmd_vel_subscription = self.create_subscription(
            msg_type=Twist,
            topic='cmd_vel',
            callback=self.cmd_vel_listener_callback,
            qos_profile=10
        )
        self.cmd_vel_publisher = self.create_publisher(
            msg_type=Twist,
            topic='cmd_vel',
            qos_profile=10
        )

        watch_time_period = 0.1 # seconds
        self.watch_timer = self.create_timer(
            timer_period_sec=watch_time_period,
            callback=self.watch_timer_callback   
        )

    def cmd_vel_listener_callback(self, msg: Twist) -> None:
        self.last_topic_msg_timestamp = datetime.now(tz=TZ)

    def watch_timer_callback(self) -> None:
        time_diff = (datetime.now(tz=TZ) - self.last_topic_msg_timestamp).total_seconds()
        if time_diff > 0.5:
            msg = Twist()
            msg.linear.x = 0
            msg.linear.y = 0
            msg.linear.z = 0

            msg.angular.x = 0
            msg.angular.y = 0
            msg.angular.z = 0

            self.cmd_vel_publisher.publish(msg=msg)

def main(args = None) -> None:
    try:
        with rclpy.init(args=args): # type: ignore
            cmd_vel_watchdog = CMDVelWatchDog()

            rclpy.spin(cmd_vel_watchdog)
    except (KeyboardInterrupt, ExternalShutdownException):
        pass

if __name__ == "__main__":
    main()

嗯...不过在经过 review 后,我们发现可能在后续使用use_sim_time 的时候,由于我们使用的是datetime,可能会引发混乱,这不是一个很好的习惯。

因此我们最好还是使用 ROSClock /Time /Duration,更“ROS 正统”。

以及,如果我们发布了这个消息的话,此时我们的小车应当停下了(不考虑惯性之类的),因此此时是停下的,也不应当继续发布消息。

除此之外,我们这种一个话题循环给自己发送消息的方式可能会造成一些混乱,也是一个坏习惯。

因此,我们最好是将原始指令和处理后的安全指令的话题进行分开:

  • cmd_vel_raw:上游节点想要机器人进行运动的原始输入。
  • cmd_watchdog:watchdog 所发布的输入。
  • cmd_vel:经过处理,确认安全后的实际输入。

我们的这些对于监控的时间间隔之类的目前也是常量,我们后续想修改的话就必须修改文件并重新 build,在我们实际调试的时候可能会比较麻烦,因此最好是改成参数。

接下来我们可以修改一下我们的 init 和两个回调函数来达到我们的目的。

修改获取最后一条话题消息的时间戳:

self.last_topic_msg_timestamp = self.get_clock().now()

创建时间间隔的参数:

        parmas = [
            ("stop_time_period", 0.5),
            ("watch_time_period", 0.1),
        ]
        self.declare_parameters(namespace="", parameters=cast(Any, parmas))

修改订阅者:

        self.cmd_vel_subscription = self.create_subscription(
            msg_type=Twist,
            topic='cmd_vel_raw',
            callback=self.cmd_vel_listener_callback,
            qos_profile=10
        )

修改 watch_timer:

        self.watch_timer = self.create_timer(
            timer_period_sec=cast(
                float, self.get_parameter(name="watch_time_period").get_parameter_value().double_value
            ),
            callback=self.watch_timer_callback
        )

修改订阅回调:

    def cmd_vel_raw_listener_callback(self, msg: Twist) -> None:
        self.last_topic_msg_timestamp = self.get_clock().now()
        self.cmd_vel_publisher.publish(msg)
        if msg.linear.x != 0 or \
            msg.linear.y != 0 or \
            msg.linear.z != 0 or \
            msg.angular.x != 0 or \
            msg.angular.y != 0 or \
            msg.angular.z != 0:
            self.last_command_was_zero = False
        else:
            self.last_command_was_zero = True

修改发布回调:

    def watch_timer_callback(self) -> None:
        future = self.last_topic_msg_timestamp + Duration(
            seconds=cast(
                float, self.get_parameter(name="stop_time_period").get_parameter_value().double_value
            )
        )
        if not self.last_command_was_zero and self.get_clock().now() > future:
            msg = Twist()

            self.cmd_vel_publisher.publish(msg=msg)
          
            self.last_command_was_zero = True

现在的完整代码就长这样了:

from typing import Any, cast

import rclpy
from rclpy.duration import Duration
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node

from geometry_msgs.msg import Twist

class CMDVelWatchDog(Node):

    def __init__(self) -> None:
        super().__init__(node_name='cmd_vel_watchdog')
      
        self.last_topic_msg_timestamp = self.get_clock().now()
        self.last_command_was_zero = True

        params = [
            ("stop_time_period", 0.5), # seconds
            ("watch_time_period", 0.1), # seconds
        ]
        self.declare_parameters(namespace="", parameters=cast(Any, params))

        self.cmd_vel_raw_subscription = self.create_subscription(
            msg_type=Twist,
            topic='cmd_vel_raw',
            callback=self.cmd_vel_raw_listener_callback,
            qos_profile=10
        )
        self.cmd_vel_publisher = self.create_publisher(
            msg_type=Twist,
            topic='cmd_vel',
            qos_profile=10
        )

        self.watch_timer = self.create_timer(
            timer_period_sec=cast(
                float, self.get_parameter(name="watch_time_period").get_parameter_value().double_value
            ),
            callback=self.watch_timer_callback
        )

    def cmd_vel_raw_listener_callback(self, msg: Twist) -> None:
        self.last_topic_msg_timestamp = self.get_clock().now()
        self.cmd_vel_publisher.publish(msg)
        if msg.linear.x != 0 or \
            msg.linear.y != 0 or \
            msg.linear.z != 0 or \
            msg.angular.x != 0 or \
            msg.angular.y != 0 or \
            msg.angular.z != 0:
            self.last_command_was_zero = False
        else:
            self.last_command_was_zero = True

    def watch_timer_callback(self) -> None:
        future = self.last_topic_msg_timestamp + Duration(
            seconds=cast(
                float, self.get_parameter(name="stop_time_period").get_parameter_value().double_value
            )
        )
        if not self.last_command_was_zero and self.get_clock().now() > future:
            msg = Twist()

            self.cmd_vel_publisher.publish(msg=msg)
          
            self.last_command_was_zero = True

def main(args = None) -> None:
    try:
        with rclpy.init(args=args): # type: ignore
            cmd_vel_watchdog = CMDVelWatchDog()

            rclpy.spin(cmd_vel_watchdog)
    except (KeyboardInterrupt, ExternalShutdownException):
        pass

if __name__ == "__main__":
    main()

孩子们,这是我手搓的代码。

然后加入到我们的入口中:

    entry_points={
        'console_scripts': [
            'cmd_vel_watchdog = kibot_one_control.cmd_vel_watchdog:main',
        ],
    },

就这样,我们就完成了 watchdog。

以上。