260603
上次我们已经实现了 Frontier 的核心算法,接下来我们需要接入 ROS2 的探索节点了。
我们先创建这个文件 src/kibot_one_control/kibot_one_control/frontier_explorer.py,并写入所需的依赖:
from __future__ import annotations
from typing import Any, cast
import rclpy
from action_msgs.msg import GoalStatus # type: ignore
from geometry_msgs.msg import PoseStamped # type: ignore
from nav2_msgs.action import NavigateToPose # type: ignore
from nav_msgs.msg import OccupancyGrid # type: ignore
from rclpy.action import ActionClient # type: ignore
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from rclpy.qos import QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
from tf2_ros import Buffer, TransformException, TransformListener # type: ignore
from kibot_one_control.frontier_core import GridInfo
from kibot_one_control.frontier_core import filter_cooldown_candidates
from kibot_one_control.frontier_core import find_frontier_candidates
然后创建 FrontierExplorer 并声明所需的状态:
class FrontierExplorer(Node):
def __init__(self, node_name: str, *, context: rclpy.Context | None = None, cli_args: List[str] | None = None, namespace: str | None = None, use_global_arguments: bool = True, enable_rosout: bool = True, start_parameter_services: bool = True, parameter_overrides: List[Parameter] | None = None, allow_undeclared_parameters: bool = False, automatically_declare_parameters_from_overrides: bool = False, enable_logger_service: bool = False) -> None:
super().__init__(node_name=node_name, context=context, cli_args=cli_args, namespace=namespace, use_global_arguments=use_global_arguments, enable_rosout=enable_rosout, start_parameter_services=start_parameter_services, parameter_overrides=parameter_overrides, allow_undeclared_parameters=allow_undeclared_parameters, automatically_declare_parameters_from_overrides=automatically_declare_parameters_from_overrides, enable_logger_service=enable_logger_service)
params = [
("map_topic", "/map"),
("map_frame", "map"),
("base_frame", "base_link"),
("navigate_action", "/navigate_to_pose"),
("exploration_period", 1.0),
("goal_timeout", 35.0),
("frontier_cooldown", 20.0),
("min_frontier_size", 3),
("min_goal_distance", 0.35),
("max_goal_distance", 3.0),
("free_threshold", 20)
]
self.declare_parameters(namespace="", parameters=cast(Any, params))
self.lastest_map: OccupancyGrid | None = None
self.current_goal_key: str | None = None
self.current_goal_sent_seconds: float | None = None
self.goal_handle: Any | None = None
self.cooled_frontiers: dict[str, float] = {}
然后接入 TF,动作客户端,Map 订阅和计时器:
class FrontierExplorer(Node):
def __init__(self, node_name: str, *, context: rclpy.Context | None = None, cli_args: List[str] | None = None, namespace: str | None = None, use_global_arguments: bool = True, enable_rosout: bool = True, start_parameter_services: bool = True, parameter_overrides: List[Parameter] | None = None, allow_undeclared_parameters: bool = False, automatically_declare_parameters_from_overrides: bool = False, enable_logger_service: bool = False) -> None:
super().__init__(node_name=node_name, context=context, cli_args=cli_args, namespace=namespace, use_global_arguments=use_global_arguments, enable_rosout=enable_rosout, start_parameter_services=start_parameter_services, parameter_overrides=parameter_overrides, allow_undeclared_parameters=allow_undeclared_parameters, automatically_declare_parameters_from_overrides=automatically_declare_parameters_from_overrides, enable_logger_service=enable_logger_service)
params = [
("map_topic", "/map"),
("map_frame", "map"),
("base_frame", "base_link"),
("navigate_action", "/navigate_to_pose"),
("exploration_period", 1.0),
("goal_timeout", 35.0),
("frontier_cooldown", 20.0),
("min_frontier_size", 3),
("min_goal_distance", 0.35),
("max_goal_distance", 3.0),
("free_threshold", 20)
]
self.declare_parameters(namespace="", parameters=cast(Any, params))
self.lastest_map: OccupancyGrid | None = None
self.current_goal_key: str | None = None
self.current_goal_sent_seconds: float | None = None
self.goal_handle: Any | None = None
self.cooled_frontiers: dict[str, float] = {}
self.tf_buffer: Buffer = Buffer()
self.tf_listener: TransformListener = TransformListener(buffer=self.tf_buffer, node=self)
self.action_client: ActionClient = ActionClient(
node=self,
action_type=NavigateToPose,
action_name=cast(str, self.get_parameter(name="navigate_action").value)
)
map_qos: QoSProfile = QoSProfile(
history=QoSHistoryPolicy.KEEP_LAST,
depth=1,
reliability=QoSReliabilityPolicy.RELIABLE,
durability=QoSDurabilityPolicy.TRANSIENT_LOCAL
)
self.create_subscription(
msg_type=OccupancyGrid,
topic=cast(str, self.get_parameter(name="map_topic").value),
callback=self._map_callback,
qos_profile=map_qos
)
self.create_timer(
timer_period_sec=cast(float, self.get_parameter(name="exploration_period").value),
callback=self._exploration_tick,
)
def _map_callback(self, msg: OccupancyGrid) -> None:
self.latest_map = msg
def _exploration_tick(self) -> None:
pass
_exploration_tick 这个函数我们后续再进行完善。
我们先实现一下查询 TF 并选择候选的函数:
def _lookup_robot_pose(self) -> Tuple[float, float] | None:
map_frame = cast(str, self.get_parameter(name="map_frame").value)
base_frame = cast(str, self.get_parameter(name="base_frame").value)
try:
transform = self.tf_buffer.lookup_transform(target_frame=map_frame, source_frame=base_frame, time=rclpy.time.Time())
except TransformException as exc:
self.get_logger().debug(f"等待 {map_frame} -> {base_frame} 时出现错误: {exc}")
return None
return(
transform.transform.translation.x,
transform.transform.translation.y
)
def _now_seconds(self) -> float:
return self.get_clock().now().nanoseconds / 1e9
def _select_candidate(self, robot_x: float, robot_y: float):
assert self.lastest_map is not None
grid = self.lastest_map.info
info = GridInfo(
width=grid.width,
height=grid.height,
resolution=grid.resolution,
origin_x=grid.origin.position.x,
origin_y=grid.origin.position.y
)
candidates = find_frontier_candidates(
data=self.lastest_map.data,
info=info,
robot_x=robot_x,
robot_y=robot_y,
min_frontier_size=cast(int, self.get_parameter(name="min_frontier_size").value),
min_goal_distance=cast(float, self.get_parameter(name="min_goal_distance").value),
max_goal_distance=cast(float, self.get_parameter(name="max_goal_distance").value),
free_threshold=cast(int, self.get_parameter(name="free_threshold").value)
)
available = filter_cooldown_candidates(candidates=candidates, cooled_frontiers=self.cooled_frontiers, now_seconds=self._now_seconds())
return available[0] if available else None
实现超时、冷却和清理:
def _cancel_timed_out_goal(self) -> None:
if self.goal_handle is None or self.current_goal_sent_seconds is None:
return
timeout = cast(float, self.get_parameter(name="goal_timeout").value)
if self._now_seconds() - self.current_goal_sent_seconds <= timeout:
return
self.get_logger().warning(message=f"边界目标 {self.current_goal_key} 超时,取消中...")
self.goal_handle.cancel_goal_async()
self._cool_current_frontier()
self._clear_current_goal()
def _cool_current_frontier(self) -> None:
if self.current_goal_key is None:
return
cooldown = cast(float, self.get_parameter(name="frontier_cooldown").value)
self.cooled_frontiers[self.current_goal_key] = self._now_seconds() + cooldown
def _clear_current_goal(self) -> None:
self.current_goal_key = None
self.current_goal_sent_seconds = None
self.goal_handle = None
然后处理动作的响应以及结果:
def _goal_response_callback(self, future: Future) -> None:
goal_handle = future.result()
if not goal_handle or not goal_handle.accepted:
self.get_logger().warning(message=f"发送边界目标 {self.current_goal_key} 被拒绝")
self._cool_current_frontier()
self._clear_current_goal()
return
self.goal_handle = goal_handle
result_future = goal_handle.get_result_async()
result_future.add_done_callback(self._goal_response_callback)
def _goal_result_callback(self, future: Future) -> None:
result = future.result()
if result is None:
return
status = result.status
if status == GoalStatus.STATUS_SUCCEEDED:
self.get_logger().info(message=f"边界目标 {self.current_goal_key} 完成")
else:
self.get_logger().warning(message=f"边界目标 {self.current_goal_key} 已完成,状态: {status}")
self._cool_current_frontier()
self._clear_current_goal()
最后实现保存地图并探索 tick:
def _map_callback(self, msg: OccupancyGrid) -> None:
self.latest_map = msg
def _exploration_tick(self) -> None:
if self.current_goal_key is not None:
self._cancel_timed_out_goal()
return
if self.latest_map is None:
self.get_logger().debug(message="正在选择边界前等待 /map 的消息...")
return
if not self.action_client.wait_for_server(timeout_sec=0.1):
self.get_logger().debug(message="正在等待 NavigateToPose 动作提供者...")
return
robot_pose = self._lookup_robot_pose()
if robot_pose is None:
return
candidate = self._select_candidate(robot_x=robot_pose[0], robot_y=robot_pose[1])
if candidate is None:
self.get_logger().info(message="没有找到可用的边界候选。")
return
goal = NavigateToPose.Goal()
goal.pose.header.frame_id = cast(str, self.get_parameter(name="map_frame").value)
goal.pose.header.stamp = self.get_clock().now().to_msg()
goal.pose.pose.position.x = candidate.map_x
goal.pose.pose.position.y = candidate.map_y
goal.pose.pose.orientation.w = 1.0
self.current_goal_key = candidate.key
self.current_goal_sent_seconds = self._now_seconds()
self.get_logger().info(
message="在 (%.2f, %.2f) 发送边界 %s 中... ;size=%d, distance=%.2f, score=%.2f"
% (candidate.map_x, candidate.map_y,candidate.key, candidate.size, candidate.distance, candidate.score)
)
future = self.action_client.send_goal_async(goal=goal)
future.add_done_callback(callback=self._goal_response_callback)
最后增加 main 入口:
def main(args=None) -> None:
explorer = None
try:
rclpy.init(args=args)
explorer = FrontierExplorer()
rclpy.spin(explorer)
except (KeyboardInterrupt, ExternalShutdownException):
pass
finally:
if explorer is not None:
explorer.destroy_node()
if rclpy.ok():
rclpy.shutdown()
if __name__ == "__main__":
main()