260602

六一儿童节当然是给我们这种儿童放假玩的啊。

继续,我们接下来实现自由侧质心、坐标转换和对应的 Key:

def _component_free_centroid(
    component: Sequence[int],
    data: Sequence[int],
    info: GridInfo,
    free_threshold: int
) -> Tuple[float, float]:
    free_neighbors = {
        neighbor
        for index in component
        for neighbor in _neighbor4(index=index, info=info)
        if _is_free(value=data[neighbor], free_threshold=free_threshold)
    }

    if not free_neighbors:
        return _component_centroid(component=component, info=info)
    return _component_centroid(component=sorted(component), info=info)


def _index_to_world(index: int, info: GridInfo) -> Tuple[float, float]:
    x = index % info.width
    y = index // info.width
    return(
        info.origin_x + (x + 0.5) * info.resolution,
        info.origin_y + (y + 0.5) * info.resolution
    )

def _component_key(component: Sequence[int], info: GridInfo) -> str:
    centroid_x, centroid_y = _component_centroid(component=component, info=info)
    cell_x = int((centroid_x - info.origin_x) / info.resolution)
    cell_y = int((centroid_y - info.origin_y) / info.resolution)
    return f"{cell_x}:{cell_y}"

然后实现组分遍历和质心计算:

def _collect_component(
    start: int, frontier_cells: Set[int], visited: Set[int], info: GridInfo
) -> List[int]:
    component: List[int] = []
    queue: Deque[int] = Deque([start])
    visited.add(start)
  
    while queue:
        index = queue.popleft()
        component.append(index)
        for neighbor in _neighbor8(index=index, info=info):
            if neighbor in frontier_cells and neighbor not in visited:
                visited.add(neighbor)
                queue.append(neighbor)
  
    return component


def _component_centroid(
    component: Sequence[int], info: GridInfo
) -> Tuple[float, float]:
    total_x = 0.0
    total_y = 0.0

    for index in component:
        world_x, world_y = _index_to_world(index=index, info=info)
        total_x += world_x
        total_y += world_y

    return total_x / len(component), total_y / len(component)

最后实现候选提取的入口:

def find_frontier_candidates(
    data: Sequence[int],
    info: GridInfo,
    robot_x: float,
    robot_y: float,
    *,
    min_frontier_size: int = 3,
    min_goal_distance: float = 0.35,
    max_goal_distance: float = 3,
    free_threshold: int = 20,
) -> List[FrontierCandidate]:
    _validate_grid(data=data, info=info)

    frontier_cells = {
        index
        for index, vaule in enumerate(data)
        if _is_unknown(value=value)
        and any(
            _is_free(value=data[neighbor], free_threshold=free_threshold)
            for neighbor in _neighbor4(index=index, info=info)
        )
    }

    candidates: List[FrontierCandidate] = []
    visited: Set[int] = set()
    for start in sorted(frontier_cells):
        if start in visited:
            continue
  
        component = _collect_component(start=start, frontier_cells=frontier_cells, visited=visited, info=info)
        if len(component) < min_frontier_size:
            continue
      
        goal_x, goal_y = _component_free_centroid(component=component, data=data, info=info, free_threshold=free_threshold)
        distance = math.hypot(goal_x - robot_x, goal_y - robot_y)
        if distance < min_goal_distance or distance > max_goal_distance:
            continue

        score = float(len(component)) - distance
        candidates.append(
            FrontierCandidate(
                key=_component_key(component=component, info=info),
                map_x=goal_x,
                map_y=goal_y,
                size=len(component),
                distance=distance,
                score=score
            )
        )
  
    return sorted(candidates, key=lambda candidate: candidate.score, reverse=True)

最后我们实现一下我们核心算法的测试文件 src/kibot_one_control/test/test_frontier_core.py:

from kibot_one_control.frontier_core import GridInfo
from kibot_one_control.frontier_core import filter_cooldown_candidates
from kibot_one_control.frontier_core import find_frontier_candidates


def test_find_frontier_candidates_groups_unknown_cells_next_to_free_space() -> None:
    grid = GridInfo(width=5, height=5, resolution=1.0, origin_x=0.0, origin_y=0.0)
    data = [
        100, 100, 100, 100, 100,
        100, 0, 0, -1, 100,
        100, 0, 0, -1, 100,
        100, 100, 100, 100, 100,
        100, 100, 100, 100, 100,
    ]

    candidates = find_frontier_candidates(
        data,
        grid,
        robot_x=1.5,
        robot_y=1.5,
        min_frontier_size=2,
        max_goal_distance=5.0,
    )

    assert len(candidates) == 1
    assert candidates[0].key == "3:2"
    assert candidates[0].map_x == 2.5
    assert candidates[0].map_y == 2.0
    assert candidates[0].size == 2


def test_find_frontier_candidates_filters_small_and_far_frontiers() -> None:
    grid = GridInfo(width=5, height=5, resolution=1.0, origin_x=0.0, origin_y=0.0)
    data = [
        0, -1, 100, 100, 100,
        100, 100, 100, 100, 100,
        100, 100, 100, 100, 100,
        100, 100, 100, 0, -1,
        100, 100, 100, 0, -1,
    ]

    candidates = find_frontier_candidates(
        data,
        grid,
        robot_x=0.5,
        robot_y=0.5,
        min_frontier_size=2,
        max_goal_distance=2.0,
    )

    assert candidates == []


def test_filter_cooldown_candidates_removes_recent_failures() -> None:
    grid = GridInfo(width=5, height=5, resolution=1.0, origin_x=0.0, origin_y=0.0)
    data = [
        100, 100, 100, 100, 100,
        100, 0, 0, -1, 100,
        100, 0, 0, -1, 100,
        100, 100, 100, 100, 100,
        100, 100, 100, 100, 100,
    ]
    candidates = find_frontier_candidates(data, grid, 1.5, 1.5, min_frontier_size=2, max_goal_distance=5.0)

    assert filter_cooldown_candidates(candidates, {candidates[0].key: 20.0}, now_seconds=10.0) == []
    assert filter_cooldown_candidates(candidates, {candidates[0].key: 20.0}, now_seconds=21.0) == candidates

运行一下测试看看:

(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotTwo$ source .vscode/project-terminal-init.sh
PYTHONPATH=src/kibot_one_control python3 -m pytest -q src/kibot_one_control/test/test_frontier_core.py

========================================================================================== ERRORS ===========================================================================================
_____________________________________________________________ ERROR collecting src/kibot_one_control/test/test_frontier_core.py _____________________________________________________________
ImportError while importing test module '/home/jese--ki/Projects/dev/KiBots/KiBotTwo/src/kibot_one_control/test/test_frontier_core.py'.
Hint: make sure your test modules/packages have valid Python names.
Traceback:
/usr/lib/python3.12/importlib/__init__.py:90: in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
src/kibot_one_control/test/test_frontier_core.py:1: in <module>
    from kibot_one_control.frontier_core import GridInfo
src/kibot_one_control/kibot_one_control/frontier_core.py:8: in <module>
    from typeguard import value
E   ModuleNotFoundError: No module named 'typeguard'
================================================================================== short test summary info ==================================================================================
ERROR src/kibot_one_control/test/test_frontier_core.py
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Interrupted: 1 error during collection !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
1 error in 0.07s

嗯...这个原因似乎是我一个变量 value 被我设置为导出的依赖中的内容了...

重新运行一下:

(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotTwo$ PYTHONPATH=src/kibot_one_control python3 -m pytest -q src/kibot_one_control/test/test_frontier_core.py
F..                                                                                                                                                                                   [100%]
========================================================================================= FAILURES ==========================================================================================
___________________________________________________________ test_find_frontier_candidates_groups_unknown_cells_next_to_free_space ___________________________________________________________

    def test_find_frontier_candidates_groups_unknown_cells_next_to_free_space() -> None:
        grid = GridInfo(width=5, height=5, resolution=1.0, origin_x=0.0, origin_y=0.0)
        data = [
            100, 100, 100, 100, 100,
            100, 0, 0, -1, 100,
            100, 0, 0, -1, 100,
            100, 100, 100, 100, 100,
            100, 100, 100, 100, 100,
        ]
  
        candidates = find_frontier_candidates(
            data,
            grid,
            robot_x=1.5,
            robot_y=1.5,
            min_frontier_size=2,
            max_goal_distance=5.0,
        )
  
        assert len(candidates) == 1
        assert candidates[0].key == "3:2"
>       assert candidates[0].map_x == 2.5
E       AssertionError: assert 3.5 == 2.5
E        +  where 3.5 = FrontierCandidate(key='3:2', map_x=3.5, map_y=2.0, size=2, distance=2.0615528128088303, score=-0.06155281280883029).map_x

src/kibot_one_control/test/test_frontier_core.py:27: AssertionError
================================================================================== short test summary info ==================================================================================
FAILED src/kibot_one_control/test/test_frontier_core.py::test_find_frontier_candidates_groups_unknown_cells_next_to_free_space - AssertionError: assert 3.5 == 2.5
1 failed, 2 passed in 0.03s

还是有错误,不过在这次看起来像是逻辑上的错误。

貌似是我 _component_free_centroid 这个函数写错了,最后一行应该是 return _component_centroid(component=sorted(free_neighbors), info=info) 而非 return _component_centroid(component=sorted(component), info=info)。

我们继续测试一下看看:

(.venv) jese--ki@KiBall:~/Projects/dev/KiBots/KiBotTwo$ PYTHONPATH=src/kibot_one_control python3 -m pytest -q src/kibot_one_control/test/test_frontier_core.py
...                                                                                                                                                                                   [100%]
3 passed in 0.01s

OK 的,全部通过了。

目前来说,我们就完成了我们 fontier 的核心算法。