Files
rfcp/backend/app/services/spatial_index.py

148 lines
5.2 KiB
Python

"""
R-tree spatial index for fast building and geometry lookups.
Uses a simple grid-based approach (no external dependency) for
O(1) amortised lookups instead of O(n) linear scans.
"""
from typing import List, Tuple, Optional, Dict
from collections import defaultdict
from app.services.buildings_service import Building
class SpatialIndex:
"""Grid-based spatial index for fast building lookups"""
def __init__(self, cell_size: float = 0.001):
"""
Args:
cell_size: Grid cell size in degrees (~111m at equator)
"""
self.cell_size = cell_size
self._grid: Dict[Tuple[int, int], List[Building]] = defaultdict(list)
self._buildings: List[Building] = []
self._buildings_by_id: Dict[int, Building] = {}
def _cell_key(self, lat: float, lon: float) -> Tuple[int, int]:
"""Convert lat/lon to grid cell key"""
return (int(lat / self.cell_size), int(lon / self.cell_size))
def build(self, buildings: List[Building]):
"""Build spatial index from buildings list"""
self._grid.clear()
self._buildings = buildings
self._buildings_by_id = {b.id: b for b in buildings}
for building in buildings:
# Get bounding box of building
lons = [p[0] for p in building.geometry]
lats = [p[1] for p in building.geometry]
min_lon, max_lon = min(lons), max(lons)
min_lat, max_lat = min(lats), max(lats)
# Insert into all overlapping grid cells
min_cell_lat = int(min_lat / self.cell_size)
max_cell_lat = int(max_lat / self.cell_size)
min_cell_lon = int(min_lon / self.cell_size)
max_cell_lon = int(max_lon / self.cell_size)
for clat in range(min_cell_lat, max_cell_lat + 1):
for clon in range(min_cell_lon, max_cell_lon + 1):
self._grid[(clat, clon)].append(building)
def query_point(self, lat: float, lon: float, buffer_cells: int = 1) -> List[Building]:
"""Find buildings near a point"""
if not self._grid:
return self._buildings # Fallback to linear scan
center = self._cell_key(lat, lon)
results = set()
for dlat in range(-buffer_cells, buffer_cells + 1):
for dlon in range(-buffer_cells, buffer_cells + 1):
key = (center[0] + dlat, center[1] + dlon)
for b in self._grid.get(key, []):
results.add(b.id)
return [self._buildings_by_id[bid] for bid in results if bid in self._buildings_by_id]
def query_line(
self,
lat1: float, lon1: float,
lat2: float, lon2: float,
buffer_cells: int = 1
) -> List[Building]:
"""Find buildings along a line by walking the actual cells it passes through.
Samples points along the line at cell_size intervals and queries
a buffer around each sample — much faster than bounding-box scan
for long lines.
"""
if not self._grid:
return self._buildings
# Walk the line in cell_size steps, collecting unique cells
dlat = lat2 - lat1
dlon = lon2 - lon1
length = max(abs(dlat), abs(dlon))
num_steps = max(1, int(length / self.cell_size) + 1)
visited_cells: set = set()
for s in range(num_steps + 1):
t = s / num_steps
lat = lat1 + t * dlat
lon = lon1 + t * dlon
center = self._cell_key(lat, lon)
for dy in range(-buffer_cells, buffer_cells + 1):
for dx in range(-buffer_cells, buffer_cells + 1):
visited_cells.add((center[0] + dy, center[1] + dx))
results = set()
for key in visited_cells:
for b in self._grid.get(key, []):
results.add(b.id)
return [self._buildings_by_id[bid] for bid in results if bid in self._buildings_by_id]
def query_bbox(
self,
min_lat: float, min_lon: float,
max_lat: float, max_lon: float
) -> List[Building]:
"""Find all buildings in bounding box"""
if not self._grid:
return self._buildings
min_clat = int(min_lat / self.cell_size)
max_clat = int(max_lat / self.cell_size)
min_clon = int(min_lon / self.cell_size)
max_clon = int(max_lon / self.cell_size)
results = set()
for clat in range(min_clat, max_clat + 1):
for clon in range(min_clon, max_clon + 1):
for b in self._grid.get((clat, clon), []):
results.add(b.id)
return [self._buildings_by_id[bid] for bid in results if bid in self._buildings_by_id]
# Global cache of spatial indices
_spatial_indices: dict[str, SpatialIndex] = {}
def get_spatial_index(cache_key: str, buildings: List[Building]) -> SpatialIndex:
"""Get or create spatial index for buildings"""
if cache_key not in _spatial_indices:
idx = SpatialIndex()
idx.build(buildings)
_spatial_indices[cache_key] = idx
# Limit cache size
if len(_spatial_indices) > 20:
oldest = next(iter(_spatial_indices))
del _spatial_indices[oldest]
return _spatial_indices[cache_key]