diff --git a/backend/app/api/websocket.py b/backend/app/api/websocket.py index 34ec9fa..5afc32b 100644 --- a/backend/app/api/websocket.py +++ b/backend/app/api/websocket.py @@ -62,6 +62,23 @@ class ConnectionManager: except Exception as e: logger.warning(f"[WS] send_error failed: {e}") + async def send_partial_results( + self, ws: WebSocket, calc_id: str, + points: list, tile_idx: int, total_tiles: int, + ): + """Send per-tile partial results for progressive rendering.""" + try: + await ws.send_json({ + "type": "partial_results", + "calculation_id": calc_id, + "points": [p.model_dump() for p in points], + "tile": tile_idx, + "total_tiles": total_tiles, + "progress": (tile_idx + 1) / total_tiles, + }) + except Exception as e: + logger.debug(f"[WS] send_partial_results failed: {e}") + ws_manager = ConnectionManager() @@ -135,6 +152,12 @@ async def _run_calculation(ws: WebSocket, calc_id: str, data: dict): await ws_manager.send_progress(ws, calc_id, "Initializing", 0.02) + # ── Tile callback for progressive results (large radius) ── + async def _tile_callback(tile_points, tile_idx, total_tiles): + await ws_manager.send_partial_results( + ws, calc_id, tile_points, tile_idx, total_tiles, + ) + # ── Backup progress poller: catches anything call_soon_threadsafe missed ── async def progress_poller(): last_sent_seq = 0 @@ -165,6 +188,7 @@ async def _run_calculation(ws: WebSocket, calc_id: str, data: dict): coverage_service.calculate_coverage( sites[0], settings, cancel_token, progress_fn=sync_progress_fn, + tile_callback=_tile_callback, ), timeout=300.0, ) @@ -173,6 +197,7 @@ async def _run_calculation(ws: WebSocket, calc_id: str, data: dict): coverage_service.calculate_multi_site_coverage( sites, settings, cancel_token, progress_fn=sync_progress_fn, + tile_callback=_tile_callback, ), timeout=300.0, ) diff --git a/backend/app/services/cache_db.py b/backend/app/services/cache_db.py new file mode 100644 index 0000000..10e5116 --- /dev/null +++ b/backend/app/services/cache_db.py @@ -0,0 +1,241 @@ +""" +SQLite cache for OSM data — buildings, vegetation, water, streets. + +Replaces in-memory caching for large-area calculations. Instead of holding +hundreds of thousands of buildings in RAM, data is stored on disk in SQLite +and queried per-tile using spatial bbox queries. + +Location: ~/.rfcp/osm_cache.db +""" + +import json +import time +import sqlite3 +from pathlib import Path +from typing import List, Dict, Optional + + +def _default_db_path() -> str: + """Get default database path at ~/.rfcp/osm_cache.db.""" + cache_dir = Path.home() / '.rfcp' + cache_dir.mkdir(parents=True, exist_ok=True) + return str(cache_dir / 'osm_cache.db') + + +class OSMCacheDB: + """SQLite-backed cache for OSM feature data with bbox queries. + + Stores buildings and vegetation as JSON blobs with bounding-box + columns for fast spatial queries. Cache freshness is tracked + per 1-degree cell (matching the OSM grid fetch pattern). + """ + + def __init__(self, db_path: Optional[str] = None): + if db_path is None: + db_path = _default_db_path() + self.db_path = db_path + self._conn: Optional[sqlite3.Connection] = None + + @property + def conn(self) -> sqlite3.Connection: + """Lazy connection with WAL mode for concurrent reads.""" + if self._conn is None: + self._conn = sqlite3.connect(self.db_path, check_same_thread=False) + self._conn.execute("PRAGMA journal_mode=WAL") + self._conn.execute("PRAGMA synchronous=NORMAL") + self._init_tables() + return self._conn + + def _init_tables(self): + assert self._conn is not None + self._conn.executescript(""" + CREATE TABLE IF NOT EXISTS buildings ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + osm_id INTEGER, + min_lat REAL NOT NULL, + min_lon REAL NOT NULL, + max_lat REAL NOT NULL, + max_lon REAL NOT NULL, + height REAL DEFAULT 10.0, + data TEXT NOT NULL, + cell_key TEXT NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_bld_cell ON buildings(cell_key); + CREATE INDEX IF NOT EXISTS idx_bld_bbox + ON buildings(min_lat, max_lat, min_lon, max_lon); + + CREATE TABLE IF NOT EXISTS vegetation ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + osm_id INTEGER, + min_lat REAL NOT NULL, + min_lon REAL NOT NULL, + max_lat REAL NOT NULL, + max_lon REAL NOT NULL, + data TEXT NOT NULL, + cell_key TEXT NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_veg_cell ON vegetation(cell_key); + CREATE INDEX IF NOT EXISTS idx_veg_bbox + ON vegetation(min_lat, max_lat, min_lon, max_lon); + + CREATE TABLE IF NOT EXISTS cache_meta ( + cell_key TEXT NOT NULL, + data_type TEXT NOT NULL, + fetched_at REAL NOT NULL, + item_count INTEGER DEFAULT 0, + PRIMARY KEY (cell_key, data_type) + ); + """) + self._conn.commit() + + # ── Cell key helpers ── + + @staticmethod + def cell_key(min_lat: float, min_lon: float, max_lat: float, max_lon: float) -> str: + """Generate cell key from bbox (matches 1-degree grid alignment).""" + return f"{min_lat:.0f},{min_lon:.0f},{max_lat:.0f},{max_lon:.0f}" + + def is_cell_cached( + self, cell_key: str, data_type: str, max_age_hours: float = 24.0 + ) -> bool: + """Check if cell data is cached and fresh.""" + cursor = self.conn.execute( + "SELECT fetched_at FROM cache_meta " + "WHERE cell_key = ? AND data_type = ?", + (cell_key, data_type), + ) + row = cursor.fetchone() + if row is None: + return False + age_hours = (time.time() - row[0]) / 3600 + return age_hours < max_age_hours + + def mark_cell_cached(self, cell_key: str, data_type: str, item_count: int): + """Record that a cell has been fetched.""" + self.conn.execute( + "INSERT OR REPLACE INTO cache_meta " + "(cell_key, data_type, fetched_at, item_count) VALUES (?, ?, ?, ?)", + (cell_key, data_type, time.time(), item_count), + ) + self.conn.commit() + + # ── Buildings ── + + def insert_buildings_bulk(self, buildings_data: List[Dict], cell_key: str): + """Bulk insert serialised building dicts for a cell. + + Each dict must have 'geometry' (list of [lon, lat]) and 'id'. + """ + rows = [] + for b in buildings_data: + geom = b.get('geometry', []) + if not geom: + continue + lats = [p[1] for p in geom] + lons = [p[0] for p in geom] + rows.append(( + b.get('id', 0), + min(lats), min(lons), max(lats), max(lons), + b.get('height', 10.0), + json.dumps(b), + cell_key, + )) + + if rows: + self.conn.executemany( + "INSERT INTO buildings " + "(osm_id, min_lat, min_lon, max_lat, max_lon, height, data, cell_key) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + rows, + ) + self.conn.commit() + + def query_buildings_bbox( + self, + min_lat: float, max_lat: float, + min_lon: float, max_lon: float, + limit: int = 20000, + ) -> List[Dict]: + """Query buildings whose bbox overlaps the given bbox.""" + cursor = self.conn.execute( + "SELECT data FROM buildings " + "WHERE max_lat >= ? AND min_lat <= ? " + "AND max_lon >= ? AND min_lon <= ? " + "LIMIT ?", + (min_lat, max_lat, min_lon, max_lon, limit), + ) + return [json.loads(row[0]) for row in cursor] + + # ── Vegetation ── + + def insert_vegetation_bulk(self, veg_data: List[Dict], cell_key: str): + """Bulk insert serialised vegetation dicts for a cell.""" + rows = [] + for v in veg_data: + geom = v.get('geometry', []) + if not geom: + continue + lats = [p[1] for p in geom] + lons = [p[0] for p in geom] + rows.append(( + v.get('id', 0), + min(lats), min(lons), max(lats), max(lons), + json.dumps(v), + cell_key, + )) + + if rows: + self.conn.executemany( + "INSERT INTO vegetation " + "(osm_id, min_lat, min_lon, max_lat, max_lon, data, cell_key) " + "VALUES (?, ?, ?, ?, ?, ?, ?)", + rows, + ) + self.conn.commit() + + def query_vegetation_bbox( + self, + min_lat: float, max_lat: float, + min_lon: float, max_lon: float, + limit: int = 10000, + ) -> List[Dict]: + """Query vegetation whose bbox overlaps the given bbox.""" + cursor = self.conn.execute( + "SELECT data FROM vegetation " + "WHERE max_lat >= ? AND min_lat <= ? " + "AND max_lon >= ? AND min_lon <= ? " + "LIMIT ?", + (min_lat, max_lat, min_lon, max_lon, limit), + ) + return [json.loads(row[0]) for row in cursor] + + # ── Housekeeping ── + + def close(self): + """Close the database connection.""" + if self._conn: + self._conn.close() + self._conn = None + + def get_stats(self) -> Dict[str, int]: + """Get cache statistics.""" + stats: Dict[str, int] = {} + for table in ('buildings', 'vegetation'): + cursor = self.conn.execute(f"SELECT COUNT(*) FROM {table}") # noqa: S608 + stats[table] = cursor.fetchone()[0] + cursor = self.conn.execute("SELECT COUNT(*) FROM cache_meta") + stats['cached_cells'] = cursor.fetchone()[0] + return stats + + +# ── Singleton ── + +_cache_db: Optional[OSMCacheDB] = None + + +def get_osm_cache_db() -> OSMCacheDB: + """Get or create the singleton OSM cache database.""" + global _cache_db + if _cache_db is None: + _cache_db = OSMCacheDB() + return _cache_db diff --git a/backend/app/services/coverage_service.py b/backend/app/services/coverage_service.py index 078ac7c..bebe92f 100644 --- a/backend/app/services/coverage_service.py +++ b/backend/app/services/coverage_service.py @@ -1,3 +1,4 @@ +import gc import math import os import sys @@ -426,18 +427,28 @@ class CoverageService: settings: CoverageSettings, cancel_token: Optional[CancellationToken] = None, progress_fn: Optional[Callable[[str, float], None]] = None, + tile_callback: Optional[Callable] = None, ) -> List[CoveragePoint]: """ Calculate coverage grid for a single site Returns list of CoveragePoint with RSRP values. progress_fn(phase, pct): optional callback for progress updates (0.0-1.0). + tile_callback(points, tile_idx, total_tiles): optional callback for per-tile + partial results when using tiled processing (radius > 10km). """ calc_start = time.time() # Apply preset if specified settings = apply_preset(settings) + # ── Tiled processing for large radius ── + from app.services.tile_processor import TILE_THRESHOLD_M + if settings.radius > TILE_THRESHOLD_M: + return await self.calculate_coverage_tiled( + site, settings, cancel_token, progress_fn, tile_callback + ) + points = [] # Generate grid @@ -660,12 +671,14 @@ class CoverageService: settings: CoverageSettings, cancel_token: Optional[CancellationToken] = None, progress_fn: Optional[Callable[[str, float], None]] = None, + tile_callback: Optional[Callable] = None, ) -> List[CoveragePoint]: """ Calculate combined coverage from multiple sites Best server (strongest signal) wins at each point progress_fn(phase, pct): optional callback for progress updates (0.0-1.0). + tile_callback: forwarded to calculate_coverage for progressive results. """ if not sites: return [] @@ -691,6 +704,7 @@ class CoverageService: self.calculate_coverage( site, settings, cancel_token, progress_fn=_make_site_progress(i) if progress_fn else None, + tile_callback=tile_callback, ) for i, site in enumerate(sites) ]) @@ -707,6 +721,208 @@ class CoverageService: return list(point_map.values()) + async def calculate_coverage_tiled( + self, + site: SiteParams, + settings: CoverageSettings, + cancel_token: Optional[CancellationToken] = None, + progress_fn: Optional[Callable[[str, float], None]] = None, + tile_callback: Optional[Callable] = None, + ) -> List[CoveragePoint]: + """Tile-based coverage for large radius (>10km). + + Splits the coverage area into 5km sub-tiles. Each tile loads its + own OSM data and terrain, processes its grid points, then frees + memory before moving to the next tile. This keeps peak RAM + bounded regardless of total coverage area. + + tile_callback(points, tile_idx, total_tiles): async callback + invoked with partial results after each tile completes. + """ + from app.services.tile_processor import ( + generate_tile_grid, partition_grid_to_tiles, get_adaptive_worker_count, + ) + + calc_start = time.time() + + # NOTE: settings already has preset applied by calculate_coverage() + + # Generate full adaptive grid (lightweight — just coordinate tuples) + grid = self._generate_grid( + site.lat, site.lon, settings.radius, settings.resolution, + ) + _clog(f"Tiled mode: {len(grid)} total grid points, radius={settings.radius}m") + + # Generate tiles and partition grid points + tiles = generate_tile_grid(site.lat, site.lon, settings.radius) + total_tiles = len(tiles) + tile_grids = partition_grid_to_tiles(grid, tiles) + _clog(f"Generated {total_tiles} tiles") + + # Free full grid reference + del grid + + site_elevation: Optional[float] = None + all_points: List[CoveragePoint] = [] + + for tile_idx, tile in enumerate(tiles): + if cancel_token and cancel_token.is_cancelled: + _clog("Tiled calculation cancelled") + break + + tile_grid = tile_grids.get(tile.index, []) + if not tile_grid: + continue + + tile_start = time.time() + min_lat, min_lon, max_lat, max_lon = tile.bbox + _clog(f"━━━ Tile {tile_idx + 1}/{total_tiles}: " + f"{len(tile_grid)} points ━━━") + + # Per-tile progress mapped to overall progress range + def _tile_progress(phase: str, pct: float, _idx=tile_idx): + if progress_fn: + overall = (_idx + pct) / total_tiles + progress_fn( + f"Tile {_idx + 1}/{total_tiles}: {phase}", overall, + ) + + # ── 1. Fetch OSM data for this tile ── + _tile_progress("Fetching map data", 0.10) + await asyncio.sleep(0) + + osm_data = await self._fetch_osm_grid_aligned( + min_lat, min_lon, max_lat, max_lon, settings, + ) + + buildings = _filter_buildings_to_bbox( + osm_data["buildings"], min_lat, min_lon, max_lat, max_lon, + site.lat, site.lon, _clog, + ) + streets = _filter_osm_list_to_bbox( + osm_data["streets"], min_lat, min_lon, max_lat, max_lon, + ) + water_bodies = _filter_osm_list_to_bbox( + osm_data["water_bodies"], min_lat, min_lon, max_lat, max_lon, + ) + vegetation_areas = _filter_osm_list_to_bbox( + osm_data["vegetation_areas"], min_lat, min_lon, max_lat, max_lon, + max_count=5000, + ) + + spatial_idx: Optional[SpatialIndex] = None + if buildings: + cache_key = f"tile_{tile_idx}_{min_lat:.3f},{min_lon:.3f}" + spatial_idx = get_spatial_index(cache_key, buildings) + + # ── 2. Pre-load terrain for this tile ── + _tile_progress("Loading terrain", 0.25) + await asyncio.sleep(0) + + tile_names = await self.terrain.ensure_tiles_for_bbox( + min_lat, min_lon, max_lat, max_lon, + ) + for tn in tile_names: + self.terrain._load_tile(tn) + + if site_elevation is None: + site_elevation = self.terrain.get_elevation_sync( + site.lat, site.lon, + ) + + point_elevations = {} + for lat, lon in tile_grid: + point_elevations[(lat, lon)] = self.terrain.get_elevation_sync( + lat, lon, + ) + + # ── 3. Precompute distances / path loss ── + _tile_progress("Pre-computing propagation", 0.35) + await asyncio.sleep(0) + + from app.services.gpu_service import gpu_service + + grid_lats = np.array([lat for lat, _lon in tile_grid]) + grid_lons = np.array([_lon for _lat, _lon in tile_grid]) + + pre_distances = gpu_service.precompute_distances( + grid_lats, grid_lons, site.lat, site.lon, + ) + pre_path_loss = gpu_service.precompute_path_loss( + pre_distances, site.frequency, site.height, + environment=getattr(settings, 'environment', 'urban'), + ) + + precomputed = {} + for i, (lat, lon) in enumerate(tile_grid): + precomputed[(lat, lon)] = { + 'distance': float(pre_distances[i]), + 'path_loss': float(pre_path_loss[i]), + } + + # ── 4. Calculate points (parallel with adaptive workers) ── + _tile_progress("Calculating coverage", 0.40) + await asyncio.sleep(0) + + num_workers = get_adaptive_worker_count( + settings.radius, get_cpu_count(), + ) + use_parallel = len(tile_grid) > 100 and num_workers > 1 + + if use_parallel: + loop = asyncio.get_event_loop() + result_dicts, _timing = await loop.run_in_executor( + None, + lambda: calculate_coverage_parallel( + tile_grid, point_elevations, + site.model_dump(), settings.model_dump(), + self.terrain._tile_cache, + buildings, streets, water_bodies, vegetation_areas, + site_elevation, num_workers, _clog, + cancel_token=cancel_token, + precomputed=precomputed, + ), + ) + tile_points = [CoveragePoint(**d) for d in result_dicts] + else: + loop = asyncio.get_event_loop() + tile_points, _timing = await loop.run_in_executor( + None, + lambda: self._run_point_loop( + tile_grid, site, settings, buildings, streets, + spatial_idx, water_bodies, vegetation_areas, + site_elevation, point_elevations, + cancel_token=cancel_token, + precomputed=precomputed, + ), + ) + + all_points.extend(tile_points) + + # Send partial results via callback + if tile_callback and tile_points: + await tile_callback(tile_points, tile_idx, total_tiles) + + tile_time = time.time() - tile_start + _clog(f"Tile {tile_idx + 1}/{total_tiles} done: " + f"{len(tile_points)} points in {tile_time:.1f}s") + + # ── 5. Free memory ── + del buildings, streets, water_bodies, vegetation_areas + del osm_data, spatial_idx, point_elevations, precomputed + del pre_distances, pre_path_loss, grid_lats, grid_lons + gc.collect() + + total_time = time.time() - calc_start + _clog(f"━━━ Tiled calculation complete: " + f"{len(all_points)} points in {total_time:.1f}s ━━━") + + if progress_fn: + progress_fn("Finalizing", 0.95) + await asyncio.sleep(0) + + return all_points + # Adaptive resolution zone boundaries (meters) _ADAPTIVE_ZONES = [ (0, 2000), # Inner: full user resolution diff --git a/backend/app/services/terrain_service.py b/backend/app/services/terrain_service.py index d3ca40d..740536a 100644 --- a/backend/app/services/terrain_service.py +++ b/backend/app/services/terrain_service.py @@ -86,7 +86,12 @@ class TerrainService: return False def _load_tile(self, tile_name: str) -> Optional[np.ndarray]: - """Load tile from disk into memory cache""" + """Load tile from disk into memory cache using memory-mapped I/O. + + Uses np.memmap so the OS pages data from disk on demand — near-zero + upfront RAM cost per tile (~25 MB savings each vs full load). + Falls back to np.frombuffer if memmap fails. + """ # Check memory cache first if tile_name in self._tile_cache: return self._tile_cache[tile_name] @@ -97,18 +102,26 @@ class TerrainService: return None try: - data = tile_path.read_bytes() + file_size = tile_path.stat().st_size # SRTM HGT format: big-endian signed 16-bit integers - if len(data) == 3601 * 3601 * 2: + if file_size == 3601 * 3601 * 2: size = 3601 # SRTM1 (30m) - elif len(data) == 1201 * 1201 * 2: + elif file_size == 1201 * 1201 * 2: size = 1201 # SRTM3 (90m) else: - print(f"[Terrain] Unknown tile size: {len(data)} bytes for {tile_name}") + print(f"[Terrain] Unknown tile size: {file_size} bytes for {tile_name}") return None - tile = np.frombuffer(data, dtype='>i2').reshape((size, size)) + # Memory-mapped loading — OS pages from disk, near-zero RAM + try: + tile = np.memmap( + tile_path, dtype='>i2', mode='r', shape=(size, size), + ) + except Exception: + # Fallback: full load into RAM + data = tile_path.read_bytes() + tile = np.frombuffer(data, dtype='>i2').reshape((size, size)) # Manage memory cache with LRU eviction if len(self._tile_cache) >= self._max_cache_tiles: @@ -272,6 +285,38 @@ class TerrainService: total = sum(f.stat().st_size for f in self.terrain_path.glob("*.hgt")) return total / (1024 * 1024) + def evict_disk_cache(self, max_size_mb: float = 2048.0): + """LRU eviction of .hgt files when disk cache exceeds max_size_mb. + + Deletes the oldest-accessed files until total size is under the limit. + """ + hgt_files = list(self.terrain_path.glob("*.hgt")) + if not hgt_files: + return + + total = sum(f.stat().st_size for f in hgt_files) + if total / (1024 * 1024) <= max_size_mb: + return + + # Sort by access time (oldest first) + hgt_files.sort(key=lambda f: f.stat().st_atime) + + evicted = 0 + for f in hgt_files: + if total / (1024 * 1024) <= max_size_mb: + break + fsize = f.stat().st_size + # Remove from memory cache if loaded + stem = f.stem + self._tile_cache.pop(stem, None) + f.unlink() + total -= fsize + evicted += 1 + + if evicted: + print(f"[Terrain] Evicted {evicted} tiles, " + f"cache now {total / (1024 * 1024):.0f} MB") + @staticmethod def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float: """Calculate distance between two points in meters""" diff --git a/backend/app/services/tile_processor.py b/backend/app/services/tile_processor.py new file mode 100644 index 0000000..e6b6678 --- /dev/null +++ b/backend/app/services/tile_processor.py @@ -0,0 +1,142 @@ +""" +Tile-based processing for large radius coverage calculations. + +When radius > 10km, the coverage circle is split into 5km sub-tiles. +Each tile is processed independently — OSM data and terrain are loaded +per-tile and freed between tiles, keeping peak RAM usage bounded. + +Usage: + from app.services.tile_processor import ( + generate_tile_grid, partition_grid_to_tiles, + TILE_THRESHOLD_M, get_adaptive_worker_count, + ) + + if radius_m > TILE_THRESHOLD_M: + tiles = generate_tile_grid(center_lat, center_lon, radius_m) + tile_grids = partition_grid_to_tiles(grid, tiles) +""" + +import math +from dataclasses import dataclass +from typing import List, Tuple, Dict + + +# Use tiled processing for radius above this threshold +TILE_THRESHOLD_M = 10000 # 10 km + +# Default tile size — 5km balances overhead vs memory usage +DEFAULT_TILE_SIZE_M = 5000 # 5 km + + +@dataclass +class Tile: + """A rectangular sub-tile of the coverage area.""" + bbox: Tuple[float, float, float, float] # (min_lat, min_lon, max_lat, max_lon) + index: Tuple[int, int] # (row, col) in tile grid + + +def generate_tile_grid( + center_lat: float, + center_lon: float, + radius_m: float, + tile_size_m: float = DEFAULT_TILE_SIZE_M, +) -> List[Tile]: + """Generate grid of tiles covering the coverage circle. + + Only includes tiles that actually intersect the coverage circle. + Tiles are ordered row-by-row from SW to NE. + """ + cos_lat = math.cos(math.radians(center_lat)) + + # Full coverage area in degrees + lat_delta = radius_m / 111000 + lon_delta = radius_m / (111000 * cos_lat) + + # Number of tiles along each axis + n_tiles = max(1, math.ceil(radius_m * 2 / tile_size_m)) + + # Tile size in degrees + tile_lat = (2 * lat_delta) / n_tiles + tile_lon = (2 * lon_delta) / n_tiles + + base_lat = center_lat - lat_delta + base_lon = center_lon - lon_delta + + tiles = [] + for row in range(n_tiles): + for col in range(n_tiles): + min_lat = base_lat + row * tile_lat + max_lat = base_lat + (row + 1) * tile_lat + min_lon = base_lon + col * tile_lon + max_lon = base_lon + (col + 1) * tile_lon + + bbox = (min_lat, min_lon, max_lat, max_lon) + + if _tile_intersects_circle(bbox, center_lat, center_lon, radius_m, cos_lat): + tiles.append(Tile(bbox=bbox, index=(row, col))) + + return tiles + + +def _tile_intersects_circle( + bbox: Tuple[float, float, float, float], + center_lat: float, + center_lon: float, + radius_m: float, + cos_lat: float, +) -> bool: + """Check if tile bbox intersects the coverage circle. + + Uses fast equirectangular approximation — tiles are small (5km) + so full haversine is unnecessary for intersection testing. + """ + min_lat, min_lon, max_lat, max_lon = bbox + + # Closest point on bbox to circle center + closest_lat = max(min_lat, min(center_lat, max_lat)) + closest_lon = max(min_lon, min(center_lon, max_lon)) + + # Approximate distance in meters (equirectangular) + dlat = (closest_lat - center_lat) * 111000 + dlon = (closest_lon - center_lon) * 111000 * cos_lat + dist_sq = dlat * dlat + dlon * dlon + + return dist_sq <= radius_m * radius_m + + +def get_adaptive_worker_count(radius_m: float, base_workers: int) -> int: + """Scale down workers for large calculations to prevent combined memory explosion. + + Large radius = more buildings per tile = more memory per worker. + Reducing workers keeps total worker memory bounded. + """ + if radius_m > 30000: + return min(base_workers, 2) + elif radius_m > 20000: + return min(base_workers, 3) + elif radius_m > 10000: + return min(base_workers, 4) + return base_workers + + +def partition_grid_to_tiles( + grid: List[Tuple[float, float]], + tiles: List[Tile], +) -> Dict[Tuple[int, int], List[Tuple[float, float]]]: + """Partition grid points into tiles by bbox containment. + + Returns dict mapping tile index -> list of (lat, lon) points. + Points on tile boundaries are assigned to the first matching tile. + """ + tile_grids: Dict[Tuple[int, int], List[Tuple[float, float]]] = { + t.index: [] for t in tiles + } + + for lat, lon in grid: + for tile in tiles: + min_lat, min_lon, max_lat, max_lon = tile.bbox + if min_lat <= lat <= max_lat and min_lon <= lon <= max_lon: + tile_grids[tile.index].append((lat, lon)) + break + + return tile_grids diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index b474aa5..315410a 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -65,6 +65,7 @@ export default function App() { const heatmapVisible = useCoverageStore((s) => s.heatmapVisible); const coverageError = useCoverageStore((s) => s.error); const coverageProgress = useCoverageStore((s) => s.progress); + const partialPoints = useCoverageStore((s) => s.partialPoints); const calculateCoverageApi = useCoverageStore((s) => s.calculateCoverage); const cancelCalculation = useCoverageStore((s) => s.cancelCalculation); @@ -658,20 +659,23 @@ export default function App() { {/* Map */}
- {coverageResult && ( + {/* Show partial results during tiled calculation, or final result */} + {(coverageResult || (isCalculating && partialPoints.length > 0)) && ( <> 0 ? partialPoints : (coverageResult?.points ?? [])} visible={heatmapVisible} opacity={settings.heatmapOpacity} radiusMeters={settings.heatmapRadius} rsrpThreshold={settings.rsrpThreshold} /> - p.rsrp >= settings.rsrpThreshold)} - visible={heatmapVisible} - resolution={settings.resolution} - /> + {coverageResult && ( + p.rsrp >= settings.rsrpThreshold)} + visible={heatmapVisible} + resolution={settings.resolution} + /> + )} )} diff --git a/frontend/src/services/websocket.ts b/frontend/src/services/websocket.ts index b085d75..67cf17e 100644 --- a/frontend/src/services/websocket.ts +++ b/frontend/src/services/websocket.ts @@ -19,15 +19,24 @@ export interface WSProgress { eta_seconds?: number; } +export interface WSPartialResults { + points: Array>; + tile: number; + total_tiles: number; + progress: number; +} + type ProgressCallback = (progress: WSProgress) => void; type ResultCallback = (data: CoverageResponse) => void; type ErrorCallback = (error: string) => void; +type PartialResultsCallback = (data: WSPartialResults) => void; type ConnectionCallback = (connected: boolean) => void; interface PendingCalc { onProgress?: ProgressCallback; onResult: ResultCallback; onError: ErrorCallback; + onPartialResults?: PartialResultsCallback; } class WebSocketService { @@ -110,6 +119,16 @@ class WebSocketService { console.warn('[WS] progress msg but no pending calc:', calcId, msg.phase, msg.progress); } break; + case 'partial_results': + if (pending?.onPartialResults) { + pending.onPartialResults({ + points: msg.points, + tile: msg.tile, + total_tiles: msg.total_tiles, + progress: msg.progress, + }); + } + break; case 'result': if (pending) { pending.onResult(msg.data); @@ -159,13 +178,14 @@ class WebSocketService { onResult: ResultCallback, onError: ErrorCallback, onProgress?: ProgressCallback, + onPartialResults?: PartialResultsCallback, ): string | undefined { if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { return undefined; } const calcId = crypto.randomUUID(); - this._pendingCalcs.set(calcId, { onProgress, onResult, onError }); + this._pendingCalcs.set(calcId, { onProgress, onResult, onError, onPartialResults }); this.ws.send(JSON.stringify({ type: 'calculate', diff --git a/frontend/src/store/coverage.ts b/frontend/src/store/coverage.ts index d55c547..d99c800 100644 --- a/frontend/src/store/coverage.ts +++ b/frontend/src/store/coverage.ts @@ -20,6 +20,11 @@ interface CoverageState { progress: WSProgress | null; activeCalcId: string | null; + // Progressive rendering — accumulates points as tiles complete + partialPoints: CoverageResult['points']; + tilesCompleted: number; + totalTiles: number; + setResult: (result: CoverageResult | null) => void; clearCoverage: () => void; setIsCalculating: (val: boolean) => void; @@ -166,6 +171,9 @@ export const useCoverageStore = create((set, get) => ({ error: null, progress: null, activeCalcId: null, + partialPoints: [], + tilesCompleted: 0, + totalTiles: 0, setResult: (result) => set({ result }), clearCoverage: () => set({ result: null, error: null }), @@ -195,7 +203,7 @@ export const useCoverageStore = create((set, get) => ({ const apiSettings = buildApiSettings(settings); - set({ isCalculating: true, error: null, progress: null, activeCalcId: null }); + set({ isCalculating: true, error: null, progress: null, activeCalcId: null, partialPoints: [], tilesCompleted: 0, totalTiles: 0 }); // Try WebSocket first (provides real-time progress) if (wsService.connected) { @@ -206,7 +214,7 @@ export const useCoverageStore = create((set, get) => ({ (data) => { try { const result = responseToResult(data, settings); - set({ result, isCalculating: false, error: null, progress: null, activeCalcId: null }); + set({ result, isCalculating: false, error: null, progress: null, activeCalcId: null, partialPoints: [], tilesCompleted: 0, totalTiles: 0 }); // Show success toast for WS result const addToast = useToastStore.getState().addToast; if (result.points.length === 0) { @@ -227,18 +235,40 @@ export const useCoverageStore = create((set, get) => ({ } } catch (err) { console.error('[Coverage] Failed to process result:', err); - set({ isCalculating: false, error: 'Failed to process coverage result', progress: null, activeCalcId: null }); + set({ isCalculating: false, error: 'Failed to process coverage result', progress: null, activeCalcId: null, partialPoints: [], tilesCompleted: 0, totalTiles: 0 }); } }, // onError (error) => { - set({ isCalculating: false, error, progress: null, activeCalcId: null }); + set({ isCalculating: false, error, progress: null, activeCalcId: null, partialPoints: [], tilesCompleted: 0, totalTiles: 0 }); useToastStore.getState().addToast(`Calculation failed: ${error}`, 'error'); }, // onProgress (progress) => { set({ progress }); }, + // onPartialResults — accumulate points as tiles complete + (data) => { + const newPoints = data.points.map((p: Record) => ({ + lat: p.lat as number, + lon: p.lon as number, + rsrp: p.rsrp as number, + distance: p.distance as number, + has_los: p.has_los as boolean, + terrain_loss: p.terrain_loss as number, + building_loss: p.building_loss as number, + reflection_gain: (p.reflection_gain as number) ?? 0, + vegetation_loss: (p.vegetation_loss as number) ?? 0, + rain_loss: (p.rain_loss as number) ?? 0, + indoor_loss: (p.indoor_loss as number) ?? 0, + atmospheric_loss: (p.atmospheric_loss as number) ?? 0, + })); + set((state) => ({ + partialPoints: [...state.partialPoints, ...newPoints], + tilesCompleted: data.tile + 1, + totalTiles: data.total_tiles, + })); + }, ); if (calcId) { @@ -278,6 +308,6 @@ export const useCoverageStore = create((set, get) => ({ wsService.cancel(activeCalcId); } api.cancelCalculation(); - set({ isCalculating: false, progress: null, activeCalcId: null }); + set({ isCalculating: false, progress: null, activeCalcId: null, partialPoints: [], tilesCompleted: 0, totalTiles: 0 }); }, }));