260421
我们新增一个我们扫描话题的订阅者:
self.create_subscription(
msg_type=LaserScan,
topic=cast(str, self.get_parameter('scan_topic').value),
callback=self._scan_callback,
qos_profile=qos_profile_sensor_data,
)
这个订阅者用于订阅我们激光雷达的扫描数据,我们可以这样写这个回调:
def _scan_callback(self, msg: LaserScan) -> None:
self.latest_scan = msg
self.last_scan_received_time = self.get_clock().now()
这里我们新增了两个变量,分别是 self.latest_scan 和 self.last_scan_received_time,因为我们激光雷达和我们的控制循环实际上是会分开的,因此才需要这两个变量。
我们可以在 __init__ 中加入这两个变量:
self.latest_scan: LaserScan | None = None
self.last_scan_received_time = None
这两个变量的实际功能是用于缓存最近一帧的激光数据以及收到的时间。
由于我们的决策循环是:选择方向 -> 发布速度 -> 选择方向 -> 发布速度。
因此我们需要根据我们实际的激光雷达的数据来先调整我们的方向:
def _select_heading(self, goal_heading_error: float, goal_distance: float) -> tuple[float, float, float, bool]:
safe_radius = cast(float, self.get_parameter('robot_body_radius').value) + cast(
float, self.get_parameter('obstacle_padding').value
)
scan_range_limit = self._scan_range_limit()
goal_clearance = self._direction_clearance(goal_heading_error, safe_radius, scan_range_limit)
direct_path_free = goal_clearance >= min(goal_distance, scan_range_limit)
base_heading = goal_heading_error if direct_path_free else 0.0
best_heading = goal_heading_error
best_clearance = goal_clearance
best_score = -math.inf
sector_count = max(3, cast(int, self.get_parameter('sector_count').value))
sector_fov = cast(float, self.get_parameter('sector_fov').value)
blocked_distance = safe_radius * cast(float, self.get_parameter('blocked_distance_scale').value)
clearance_weight = cast(float, self.get_parameter('clearance_weight').value)
goal_weight = cast(float, self.get_parameter('goal_weight').value)
heading_weight = cast(float, self.get_parameter('heading_weight').value)
reverse_heading_bias = cast(float, self.get_parameter('reverse_heading_bias').value)
best_should_reverse = False
for index in range(sector_count):
ratio = index / (sector_count - 1)
sector_heading = self._normalize_angle(base_heading - sector_fov / 2.0 + ratio * sector_fov)
sector_clearance = self._direction_clearance(sector_heading, safe_radius, scan_range_limit)
clearance_ratio = sector_clearance / scan_range_limit
goal_align = (math.cos(self._normalize_angle(sector_heading - goal_heading_error)) + 1.0) / 2.0
heading_align = reverse_heading_bias + (1.0 - reverse_heading_bias) * ((math.cos(sector_heading) + 1.0) / 2.0)
should_reverse = abs(sector_heading) > (math.pi / 2.0)
if sector_clearance < blocked_distance:
score = -9999.0 + clearance_ratio
else:
score = (
clearance_ratio * clearance_weight
+ goal_align * goal_weight
+ heading_align * heading_weight
)
if score > best_score:
best_score = score
best_heading = sector_heading
best_clearance = sector_clearance
best_should_reverse = should_reverse
return best_heading, best_clearance, scan_range_limit, best_should_reverse
我们解释一下这部分代码。
首先是 safe_radius这个变量,通过robot_body_radius这一小车的半径加上 obstacle_padding 这一冗余范围来形成了实际的安全范围。
然后是 _scan_range_limit 这一辅助函数,其代码如下:
def _scan_range_limit(self) -> float:
if self.latest_scan is None:
return max(0.1, cast(float, self.get_parameter('avoidance_range').value))
return max(
0.1,
min(
cast(float, self.get_parameter('avoidance_range').value),
self.latest_scan.range_max,
),
)
该限制是为了局部规划的范围。
我们不一定要看激光的最长量程的数据,关注我们的局部范围即可。
继续往下,就是检查朝目标直走是否可行:
goal_clearance = self._direction_clearance(goal_heading_error, safe_radius, scan_range_limit)
direct_path_free = goal_clearance >= min(goal_distance, scan_range_limit)
这里的 _direction_clearance 辅助函数代码如下:
def _direction_clearance(self, heading: float, safe_radius: float, scan_range_limit: float) -> float:
if self.latest_scan is None:
return 0.0
direction_x = math.cos(heading)
direction_y = math.sin(heading)
lidar_offset_x = cast(float, self.get_parameter('lidar_offset_x').value)
lidar_offset_y = cast(float, self.get_parameter('lidar_offset_y').value)
robot_body_radius = cast(float, self.get_parameter('robot_body_radius').value)
min_clearance = scan_range_limit
for index, measured_range in enumerate(self.latest_scan.ranges):
if not math.isfinite(measured_range) or measured_range < self.latest_scan.range_min:
continue
# No-return beams and obstacles beyond the local planning horizon should stay "clear".
if measured_range >= self.latest_scan.range_max or measured_range > scan_range_limit:
continue
beam_angle = self.latest_scan.angle_min + index * self.latest_scan.angle_increment
point_x = lidar_offset_x + measured_range * math.cos(beam_angle)
point_y = lidar_offset_y + measured_range * math.sin(beam_angle)
# Ignore returns from the robot's own body footprint.
if math.hypot(point_x, point_y) <= robot_body_radius + 0.02:
continue
projection = point_x * direction_x + point_y * direction_y
if projection <= 0.0 or projection > scan_range_limit:
continue
perpendicular = abs(point_x * direction_y - point_y * direction_x)
if perpendicular >= safe_radius:
continue
offset = math.sqrt(max(0.0, safe_radius * safe_radius - perpendicular * perpendicular))
entry_distance = projection - offset
if entry_distance <= 0.0:
return 0.0
if entry_distance < min_clearance:
min_clearance = entry_distance
return min_clearance
该函数我们传入 3 个参数,分别是 heading(弧度), safe_radius (安全半径)和 scan_range_limit(我们设定的最大感知范围)。
我们首先将我们的弧度转为方向向量:
direction_x = math.cos(heading)
direction_y = math.sin(heading)
然后遍历我们收到的每条雷达射线,计算障碍物点的坐标。
beam_angle = self.latest_scan.angle_min + index * self.latest_scan.angle_increment
point_x = lidar_offset_x + measured_range * math.cos(beam_angle)
point_y = lidar_offset_y + measured_range * math.sin(beam_angle)
然后计算障碍物在该方向上的投影和横向距离:
projection = point_x * direction_x + point_y * direction_y
if projection <= 0.0 or projection > scan_range_limit:
continue
perpendicular = abs(point_x * direction_y - point_y * direction_x)
if perpendicular >= safe_radius:
continue
这个投影+横向距离就构成了一个矩形,就像一个管道一样。
如果障碍物在"安全管"内(垂直距离 <safe_radius),计算进入障碍物安全区域的距离,这个距离就是最小可用的通行距离。
就类似于这样的:

这部分就是 _direction_clearance 这个函数的功能了。