""" Tile-based processing for large radius coverage calculations. When radius > 10km, the coverage circle is split into 5km sub-tiles. Each tile is processed independently — OSM data and terrain are loaded per-tile and freed between tiles, keeping peak RAM usage bounded. Usage: from app.services.tile_processor import ( generate_tile_grid, partition_grid_to_tiles, TILE_THRESHOLD_M, get_adaptive_worker_count, ) if radius_m > TILE_THRESHOLD_M: tiles = generate_tile_grid(center_lat, center_lon, radius_m) tile_grids = partition_grid_to_tiles(grid, tiles) """ import math from dataclasses import dataclass from typing import List, Tuple, Dict # Use tiled processing for radius above this threshold TILE_THRESHOLD_M = 10000 # 10 km # Default tile size — 5km balances overhead vs memory usage DEFAULT_TILE_SIZE_M = 5000 # 5 km @dataclass class Tile: """A rectangular sub-tile of the coverage area.""" bbox: Tuple[float, float, float, float] # (min_lat, min_lon, max_lat, max_lon) index: Tuple[int, int] # (row, col) in tile grid def generate_tile_grid( center_lat: float, center_lon: float, radius_m: float, tile_size_m: float = DEFAULT_TILE_SIZE_M, ) -> List[Tile]: """Generate grid of tiles covering the coverage circle. Only includes tiles that actually intersect the coverage circle. Tiles are ordered row-by-row from SW to NE. """ cos_lat = math.cos(math.radians(center_lat)) # Full coverage area in degrees lat_delta = radius_m / 111000 lon_delta = radius_m / (111000 * cos_lat) # Number of tiles along each axis n_tiles = max(1, math.ceil(radius_m * 2 / tile_size_m)) # Tile size in degrees tile_lat = (2 * lat_delta) / n_tiles tile_lon = (2 * lon_delta) / n_tiles base_lat = center_lat - lat_delta base_lon = center_lon - lon_delta tiles = [] for row in range(n_tiles): for col in range(n_tiles): min_lat = base_lat + row * tile_lat max_lat = base_lat + (row + 1) * tile_lat min_lon = base_lon + col * tile_lon max_lon = base_lon + (col + 1) * tile_lon bbox = (min_lat, min_lon, max_lat, max_lon) if _tile_intersects_circle(bbox, center_lat, center_lon, radius_m, cos_lat): tiles.append(Tile(bbox=bbox, index=(row, col))) return tiles def _tile_intersects_circle( bbox: Tuple[float, float, float, float], center_lat: float, center_lon: float, radius_m: float, cos_lat: float, ) -> bool: """Check if tile bbox intersects the coverage circle. Uses fast equirectangular approximation — tiles are small (5km) so full haversine is unnecessary for intersection testing. """ min_lat, min_lon, max_lat, max_lon = bbox # Closest point on bbox to circle center closest_lat = max(min_lat, min(center_lat, max_lat)) closest_lon = max(min_lon, min(center_lon, max_lon)) # Approximate distance in meters (equirectangular) dlat = (closest_lat - center_lat) * 111000 dlon = (closest_lon - center_lon) * 111000 * cos_lat dist_sq = dlat * dlat + dlon * dlon return dist_sq <= radius_m * radius_m def get_adaptive_worker_count(radius_m: float, base_workers: int) -> int: """Scale down workers for large calculations to prevent combined memory explosion. Large radius = more buildings per tile = more memory per worker. Reducing workers keeps total worker memory bounded. """ if radius_m > 30000: return min(base_workers, 2) elif radius_m > 20000: return min(base_workers, 3) elif radius_m > 10000: return min(base_workers, 4) return base_workers def partition_grid_to_tiles( grid: List[Tuple[float, float]], tiles: List[Tile], ) -> Dict[Tuple[int, int], List[Tuple[float, float]]]: """Partition grid points into tiles by bbox containment. Returns dict mapping tile index -> list of (lat, lon) points. Points on tile boundaries are assigned to the first matching tile. """ tile_grids: Dict[Tuple[int, int], List[Tuple[float, float]]] = { t.index: [] for t in tiles } for lat, lon in grid: for tile in tiles: min_lat, min_lon, max_lat, max_lon = tile.bbox if min_lat <= lat <= max_lat and min_lon <= lon <= max_lon: tile_grids[tile.index].append((lat, lon)) break return tile_grids