""" Parallel coverage calculation. Primary backend: Ray (shared-memory object store, zero-copy numpy arrays) Fallback: ProcessPoolExecutor (4-6 workers to limit memory) Last resort: Sequential (single-threaded, no extra dependencies) Ray advantages over ProcessPoolExecutor: - ray.put() stores terrain cache ONCE in shared memory - Workers access numpy arrays via zero-copy (no per-worker pickle/copy) - Eliminates MemoryError on Detailed preset with large terrain + buildings ProcessPoolExecutor fallback: - Used when Ray is unavailable (e.g. PyInstaller builds) - Capped at 6 workers to prevent MemoryError from data pickling - Each worker gets a full copy of terrain/buildings (no shared memory) Usage: from app.services.parallel_coverage_service import ( calculate_coverage_parallel, get_cpu_count, RAY_AVAILABLE, ) """ import os import sys import time import multiprocessing as mp from typing import List, Dict, Tuple, Any, Optional, Callable import numpy as np # ── Try to import Ray ── RAY_AVAILABLE = False try: import ray RAY_AVAILABLE = True except ImportError: ray = None # type: ignore # ── Worker-level spatial index cache (persists across tasks in same worker) ── _worker_spatial_idx = None _worker_cache_key: Optional[str] = None def _ray_process_chunk_impl(chunk, terrain_cache, buildings, osm_data, config): """Implementation: process a chunk of (lat, lon, elevation) tuples. Called inside a Ray remote function. terrain_cache numpy arrays come from the Ray object store via zero-copy. """ global _worker_spatial_idx, _worker_cache_key # Inject terrain cache into the module-level singleton. # For numpy arrays, Ray gives us a read-only view into shared memory. from app.services.terrain_service import terrain_service terrain_service._tile_cache = terrain_cache # Build or reuse spatial index (expensive — ~1s for 350K buildings). cache_key = config.get('cache_key', '') if _worker_cache_key != cache_key: from app.services.spatial_index import SpatialIndex _worker_spatial_idx = SpatialIndex() if buildings: _worker_spatial_idx.build(buildings) _worker_cache_key = cache_key # Process points from app.services.coverage_service import CoverageService, SiteParams, CoverageSettings site = SiteParams(**config['site_dict']) settings = CoverageSettings(**config['settings_dict']) svc = CoverageService() timing = { "los": 0.0, "buildings": 0.0, "antenna": 0.0, "dominant_path": 0.0, "street_canyon": 0.0, "reflection": 0.0, "vegetation": 0.0, } results = [] for lat, lon, point_elev in chunk: point = svc._calculate_point_sync( site, lat, lon, settings, buildings, osm_data.get('streets', []), _worker_spatial_idx, osm_data.get('water_bodies', []), osm_data.get('vegetation_areas', []), config['site_elevation'], point_elev, timing, ) if point.rsrp >= settings.min_signal: results.append(point.model_dump()) return results # ── Register the Ray remote function (only if Ray is available) ── _ray_process_chunk = None if RAY_AVAILABLE: _ray_process_chunk = ray.remote(_ray_process_chunk_impl) # ── Public API ── def get_cpu_count() -> int: """Get number of usable CPU cores, capped at 14.""" try: return min(mp.cpu_count() or 4, 14) except Exception: return 4 def get_parallel_backend() -> str: """Return which parallel backend is available.""" if RAY_AVAILABLE: return "ray" return "process_pool" def _try_init_ray(num_cpus: int) -> bool: """Initialize Ray lazily. Returns True if Ray is ready.""" if not RAY_AVAILABLE: return False if ray.is_initialized(): return True try: data_path = os.environ.get('RFCP_DATA_PATH', './data') ray_tmp = os.path.join(data_path, 'ray_tmp') os.makedirs(ray_tmp, exist_ok=True) ray.init( num_cpus=num_cpus, include_dashboard=False, log_to_driver=True, _temp_dir=ray_tmp, ) print(f"[PARALLEL] Ray initialized: {num_cpus} CPUs, " f"object store ~{ray.cluster_resources().get('object_store_memory', 0) / 1e9:.1f}GB", flush=True) return True except Exception as e: print(f"[PARALLEL] Ray init failed: {e}", flush=True) return False def calculate_coverage_parallel( grid: List[Tuple[float, float]], point_elevations: Dict[Tuple[float, float], float], site_dict: Dict, settings_dict: Dict, terrain_cache: Dict[str, np.ndarray], buildings: List, streets: List, water_bodies: List, vegetation_areas: List, site_elevation: float, num_workers: Optional[int] = None, log_fn: Optional[Callable[[str], None]] = None, ) -> Tuple[List[Dict], Dict[str, float]]: """Calculate coverage points in parallel. Uses Ray if available (shared memory, zero-copy numpy), otherwise falls back to sequential single-threaded calculation. Same signature as before — drop-in replacement. """ if log_fn is None: log_fn = lambda msg: print(f"[PARALLEL] {msg}", flush=True) if num_workers is None: num_workers = get_cpu_count() total_points = len(grid) # Try Ray if RAY_AVAILABLE and _try_init_ray(num_workers): try: return _calculate_with_ray( grid, point_elevations, site_dict, settings_dict, terrain_cache, buildings, streets, water_bodies, vegetation_areas, site_elevation, num_workers, log_fn, ) except Exception as e: log_fn(f"Ray execution failed: {e} — falling back to sequential") # Fallback: ProcessPoolExecutor with reduced workers to avoid MemoryError pool_workers = min(num_workers, 6) if pool_workers > 1 and total_points > 100: try: return _calculate_with_process_pool( grid, point_elevations, site_dict, settings_dict, terrain_cache, buildings, streets, water_bodies, vegetation_areas, site_elevation, pool_workers, log_fn, ) except Exception as e: log_fn(f"ProcessPool failed: {e} — falling back to sequential") # Last resort: sequential log_fn(f"Sequential fallback: {total_points} points") return _calculate_sequential( grid, point_elevations, site_dict, settings_dict, buildings, streets, water_bodies, vegetation_areas, site_elevation, log_fn, ) # ── Ray backend ── def _calculate_with_ray( grid, point_elevations, site_dict, settings_dict, terrain_cache, buildings, streets, water_bodies, vegetation_areas, site_elevation, num_workers, log_fn, ): """Execute using Ray shared-memory object store.""" total_points = len(grid) log_fn(f"Ray mode: {total_points} points, {num_workers} workers") # ── Put large data into Ray object store ── # Numpy arrays (terrain tiles) get zero-copy shared memory. # Python objects (buildings) get serialized once, stored in plasma. t_put = time.time() terrain_ref = ray.put(terrain_cache) buildings_ref = ray.put(buildings) osm_ref = ray.put({ 'streets': streets, 'water_bodies': water_bodies, 'vegetation_areas': vegetation_areas, }) cache_key = f"{site_dict['lat']:.4f},{site_dict['lon']:.4f},{len(buildings)}" config_ref = ray.put({ 'site_dict': site_dict, 'settings_dict': settings_dict, 'site_elevation': site_elevation, 'cache_key': cache_key, }) put_time = time.time() - t_put log_fn(f"ray.put() done in {put_time:.1f}s") # ── Prepare and submit chunks ── items = [ (lat, lon, point_elevations.get((lat, lon), 0.0)) for lat, lon in grid ] # ~4 chunks per worker for granular progress chunk_size = max(1, len(items) // (num_workers * 4)) chunks = [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)] log_fn(f"Submitting {len(chunks)} chunks of ~{chunk_size} points") t_calc = time.time() pending = [ _ray_process_chunk.remote(chunk, terrain_ref, buildings_ref, osm_ref, config_ref) for chunk in chunks ] # ── Collect results with progress via ray.wait() ── all_results: List[Dict] = [] total_chunks = len(pending) remaining = list(pending) completed_chunks = 0 while remaining: # Wait for at least 1 result, batch up to ~10% for progress logging batch = max(1, min(len(remaining), total_chunks // 10 or 1)) done, remaining = ray.wait(remaining, num_returns=batch, timeout=600) for ref in done: try: chunk_results = ray.get(ref) all_results.extend(chunk_results) except Exception as e: log_fn(f"Chunk error: {e}") completed_chunks += len(done) pct = completed_chunks * 100 // total_chunks elapsed = time.time() - t_calc pts = len(all_results) rate = pts / elapsed if elapsed > 0 else 0 eta = (total_points - pts) / rate if rate > 0 else 0 log_fn(f"Progress: {completed_chunks}/{total_chunks} chunks ({pct}%) — " f"{pts} pts, {rate:.0f} pts/s, ETA {eta:.0f}s") calc_time = time.time() - t_calc log_fn(f"Ray done: {calc_time:.1f}s, {len(all_results)} results " f"({calc_time / max(1, total_points) * 1000:.1f}ms/point)") timing = { "parallel_total": calc_time, "ray_put": put_time, "workers": num_workers, "backend": "ray", } return all_results, timing # ── ProcessPoolExecutor fallback ── def _pool_worker_process_chunk(args): """Worker function for ProcessPoolExecutor. Processes a chunk of points.""" chunk, terrain_cache, buildings, osm_data, config = args from app.services.terrain_service import terrain_service terrain_service._tile_cache = terrain_cache from app.services.spatial_index import SpatialIndex spatial_idx = SpatialIndex() if buildings: spatial_idx.build(buildings) from app.services.coverage_service import CoverageService, SiteParams, CoverageSettings site = SiteParams(**config['site_dict']) settings = CoverageSettings(**config['settings_dict']) svc = CoverageService() timing = { "los": 0.0, "buildings": 0.0, "antenna": 0.0, "dominant_path": 0.0, "street_canyon": 0.0, "reflection": 0.0, "vegetation": 0.0, } results = [] for lat, lon, point_elev in chunk: point = svc._calculate_point_sync( site, lat, lon, settings, buildings, osm_data.get('streets', []), spatial_idx, osm_data.get('water_bodies', []), osm_data.get('vegetation_areas', []), config['site_elevation'], point_elev, timing, ) if point.rsrp >= settings.min_signal: results.append(point.model_dump()) return results def _calculate_with_process_pool( grid, point_elevations, site_dict, settings_dict, terrain_cache, buildings, streets, water_bodies, vegetation_areas, site_elevation, num_workers, log_fn, ): """Execute using ProcessPoolExecutor with reduced workers to limit memory.""" from concurrent.futures import ProcessPoolExecutor, as_completed total_points = len(grid) log_fn(f"ProcessPool mode: {total_points} points, {num_workers} workers") items = [ (lat, lon, point_elevations.get((lat, lon), 0.0)) for lat, lon in grid ] # Larger chunks than Ray — fewer workers means bigger chunks chunk_size = max(1, len(items) // (num_workers * 2)) chunks = [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)] log_fn(f"Submitting {len(chunks)} chunks of ~{chunk_size} points") config = { 'site_dict': site_dict, 'settings_dict': settings_dict, 'site_elevation': site_elevation, } osm_data = { 'streets': streets, 'water_bodies': water_bodies, 'vegetation_areas': vegetation_areas, } t_calc = time.time() all_results: List[Dict] = [] with ProcessPoolExecutor(max_workers=num_workers) as executor: futures = { executor.submit( _pool_worker_process_chunk, (chunk, terrain_cache, buildings, osm_data, config), ): i for i, chunk in enumerate(chunks) } completed_chunks = 0 for future in as_completed(futures): try: chunk_results = future.result() all_results.extend(chunk_results) except Exception as e: log_fn(f"Chunk error: {e}") completed_chunks += 1 pct = completed_chunks * 100 // len(chunks) elapsed = time.time() - t_calc pts = len(all_results) rate = pts / elapsed if elapsed > 0 else 0 eta = (total_points - pts) / rate if rate > 0 else 0 log_fn(f"Progress: {completed_chunks}/{len(chunks)} chunks ({pct}%) — " f"{pts} pts, {rate:.0f} pts/s, ETA {eta:.0f}s") calc_time = time.time() - t_calc log_fn(f"ProcessPool done: {calc_time:.1f}s, {len(all_results)} results " f"({calc_time / max(1, total_points) * 1000:.1f}ms/point)") timing = { "parallel_total": calc_time, "workers": num_workers, "backend": "process_pool", } return all_results, timing # ── Sequential fallback ── def _calculate_sequential( grid, point_elevations, site_dict, settings_dict, buildings, streets, water_bodies, vegetation_areas, site_elevation, log_fn, ): """Sequential fallback — no extra dependencies, runs in calling thread.""" from app.services.coverage_service import CoverageService, SiteParams, CoverageSettings from app.services.spatial_index import SpatialIndex site = SiteParams(**site_dict) settings = CoverageSettings(**settings_dict) svc = CoverageService() spatial_idx = SpatialIndex() if buildings: spatial_idx.build(buildings) total = len(grid) log_interval = max(1, total // 20) timing = { "los": 0.0, "buildings": 0.0, "antenna": 0.0, "dominant_path": 0.0, "street_canyon": 0.0, "reflection": 0.0, "vegetation": 0.0, } t0 = time.time() results = [] for i, (lat, lon) in enumerate(grid): if i % log_interval == 0: log_fn(f"Sequential: {i}/{total} ({i * 100 // total}%)") point_elev = point_elevations.get((lat, lon), 0.0) point = svc._calculate_point_sync( site, lat, lon, settings, buildings, streets, spatial_idx, water_bodies, vegetation_areas, site_elevation, point_elev, timing, ) if point.rsrp >= settings.min_signal: results.append(point.model_dump()) calc_time = time.time() - t0 log_fn(f"Sequential done: {calc_time:.1f}s, {len(results)} results " f"({calc_time / max(1, total) * 1000:.1f}ms/point)") timing["sequential_total"] = calc_time timing["backend"] = "sequential" return results, timing