""" Parallel coverage calculation. Primary backend: Ray (shared-memory object store, zero-copy numpy arrays) Fallback: ProcessPoolExecutor (4-6 workers to limit memory) Last resort: Sequential (single-threaded, no extra dependencies) Ray advantages over ProcessPoolExecutor: - ray.put() stores terrain cache ONCE in shared memory - Workers access numpy arrays via zero-copy (no per-worker pickle/copy) - Eliminates MemoryError on Detailed preset with large terrain + buildings ProcessPoolExecutor fallback: - Used when Ray is unavailable (e.g. PyInstaller builds) - Capped at 6 workers to prevent MemoryError from data pickling - Each worker gets a full copy of terrain/buildings (no shared memory) Usage: from app.services.parallel_coverage_service import ( calculate_coverage_parallel, get_cpu_count, RAY_AVAILABLE, ) """ import gc import os import sys import subprocess import time import threading import multiprocessing as mp from typing import List, Dict, Tuple, Any, Optional, Callable import numpy as np # ── Cancellation token ── class CancellationToken: """Thread-safe cancellation token for cooperative cancellation.""" def __init__(self): self._event = threading.Event() def cancel(self): self._event.set() @property def is_cancelled(self) -> bool: return self._event.is_set() # ── Active pool tracking (for graceful shutdown) ── _active_pool = None # Global ref to current ProcessPoolExecutor _active_pool_lock = threading.Lock() def _set_active_pool(pool): global _active_pool with _active_pool_lock: _active_pool = pool def _clear_active_pool(): global _active_pool with _active_pool_lock: _active_pool = None # ── Worker process cleanup ── def _clog(msg: str): """Log with [PARALLEL] prefix.""" print(f"[PARALLEL] {msg}", flush=True) def _kill_worker_processes() -> int: """Kill ALL rfcp-server processes except the current (main) process. First shuts down the active ProcessPoolExecutor (if any), then uses process NAME matching to kill remaining workers. Returns the number of processes killed. """ global _active_pool # Step 0: Shut down active ProcessPoolExecutor gracefully with _active_pool_lock: pool = _active_pool _active_pool = None if pool is not None: try: pool.shutdown(wait=False, cancel_futures=True) _clog("Active ProcessPoolExecutor shutdown requested") except Exception as e: _clog(f"Pool shutdown error: {e}") my_pid = os.getpid() killed_count = 0 if sys.platform == 'win32': try: # List all rfcp-server.exe processes in CSV format result = subprocess.run( ['tasklist', '/FI', 'IMAGENAME eq rfcp-server.exe', '/FO', 'CSV', '/NH'], capture_output=True, text=True, timeout=5, ) for line in result.stdout.strip().split('\n'): if 'rfcp-server.exe' not in line: continue parts = line.split(',') if len(parts) >= 2: pid_str = parts[1].strip().strip('"') try: pid = int(pid_str) if pid != my_pid: subprocess.run( ['taskkill', '/F', '/PID', str(pid)], capture_output=True, timeout=5, ) killed_count += 1 _clog(f"Killed worker PID {pid}") except (ValueError, subprocess.TimeoutExpired): pass except Exception as e: _clog(f"Kill workers error: {e}") # Fallback: kill ALL rfcp-server.exe try: subprocess.run( ['taskkill', '/F', '/IM', 'rfcp-server.exe', '/T'], capture_output=True, timeout=5, ) except Exception: pass else: # Unix: pgrep + kill try: result = subprocess.run( ['pgrep', '-f', 'rfcp-server'], capture_output=True, text=True, timeout=5, ) for pid_str in result.stdout.strip().split('\n'): if not pid_str: continue try: pid = int(pid_str) if pid != my_pid: os.kill(pid, 9) # SIGKILL killed_count += 1 _clog(f"Killed worker PID {pid}") except (ValueError, ProcessLookupError, PermissionError): pass except Exception as e: _clog(f"Kill workers error: {e}") return killed_count # ── Try to import Ray ── RAY_AVAILABLE = False try: import ray RAY_AVAILABLE = True except ImportError: ray = None # type: ignore # ── Worker-level caches (persist across tasks in same worker process) ── _worker_spatial_idx = None _worker_cache_key: Optional[str] = None # Shared-memory buildings/OSM — unpickled once per worker, cached by key _worker_shared_buildings = None _worker_shared_osm_data = None _worker_shared_data_key: Optional[str] = None def _ray_process_chunk_impl(chunk, terrain_cache, buildings, osm_data, config): """Implementation: process a chunk of (lat, lon, elevation) tuples. Called inside a Ray remote function. terrain_cache numpy arrays come from the Ray object store via zero-copy. """ global _worker_spatial_idx, _worker_cache_key # Inject terrain cache into the module-level singleton. # For numpy arrays, Ray gives us a read-only view into shared memory. from app.services.terrain_service import terrain_service terrain_service._tile_cache = terrain_cache # Build or reuse spatial index (expensive — ~1s for 350K buildings). cache_key = config.get('cache_key', '') if _worker_cache_key != cache_key: if buildings: from app.services.spatial_index import SpatialIndex _worker_spatial_idx = SpatialIndex() _worker_spatial_idx.build(buildings) else: _worker_spatial_idx = None _worker_cache_key = cache_key # Process points from app.services.coverage_service import CoverageService, SiteParams, CoverageSettings site = SiteParams(**config['site_dict']) settings = CoverageSettings(**config['settings_dict']) svc = CoverageService() timing = { "los": 0.0, "buildings": 0.0, "antenna": 0.0, "dominant_path": 0.0, "street_canyon": 0.0, "reflection": 0.0, "vegetation": 0.0, "lod_none": 0, "lod_simplified": 0, "lod_full": 0, } precomputed = config.get('precomputed') results = [] for lat, lon, point_elev in chunk: pre = precomputed.get((lat, lon)) if precomputed else None point = svc._calculate_point_sync( site, lat, lon, settings, buildings, osm_data.get('streets', []), _worker_spatial_idx, osm_data.get('water_bodies', []), osm_data.get('vegetation_areas', []), config['site_elevation'], point_elev, timing, precomputed_distance=pre.get('distance') if pre else None, precomputed_path_loss=pre.get('path_loss') if pre else None, ) if point.rsrp >= settings.min_signal: results.append(point.model_dump()) return results # ── Register the Ray remote function (only if Ray is available) ── _ray_process_chunk = None if RAY_AVAILABLE: _ray_process_chunk = ray.remote(_ray_process_chunk_impl) # ── Public API ── def get_cpu_count() -> int: """Get number of usable CPU cores, capped at 6. Each worker holds its own copy of buildings + OSM data + spatial index (~200-400 MB per worker). Capping at 6 prevents OOM on systems with 8-16 GB RAM (especially WSL2 with limited memory allocation). """ try: return min(mp.cpu_count() or 4, 6) except Exception: return 4 def get_parallel_backend() -> str: """Return which parallel backend is available.""" if RAY_AVAILABLE: return "ray" return "process_pool" def _try_init_ray(num_cpus: int) -> bool: """Initialize Ray lazily. Returns True if Ray is ready.""" if not RAY_AVAILABLE: return False if ray.is_initialized(): return True try: data_path = os.environ.get('RFCP_DATA_PATH', './data') ray_tmp = os.path.join(data_path, 'ray_tmp') os.makedirs(ray_tmp, exist_ok=True) ray.init( num_cpus=num_cpus, include_dashboard=False, log_to_driver=True, _temp_dir=ray_tmp, ) print(f"[PARALLEL] Ray initialized: {num_cpus} CPUs, " f"object store ~{ray.cluster_resources().get('object_store_memory', 0) / 1e9:.1f}GB", flush=True) return True except Exception as e: print(f"[PARALLEL] Ray init failed: {e}", flush=True) return False def calculate_coverage_parallel( grid: List[Tuple[float, float]], point_elevations: Dict[Tuple[float, float], float], site_dict: Dict, settings_dict: Dict, terrain_cache: Dict[str, np.ndarray], buildings: List, streets: List, water_bodies: List, vegetation_areas: List, site_elevation: float, num_workers: Optional[int] = None, log_fn: Optional[Callable[[str], None]] = None, cancel_token: Optional[CancellationToken] = None, precomputed: Optional[Dict] = None, progress_fn: Optional[Callable[[str, float], None]] = None, ) -> Tuple[List[Dict], Dict[str, float]]: """Calculate coverage points in parallel. Uses Ray if available (shared memory, zero-copy numpy), otherwise falls back to ProcessPoolExecutor or sequential single-threaded calculation. cancel_token: cooperative cancellation — checked between chunks. precomputed: dict mapping (lat, lon) -> {distance, path_loss} from GPU pre-computation. """ if log_fn is None: log_fn = lambda msg: print(f"[PARALLEL] {msg}", flush=True) if num_workers is None: num_workers = get_cpu_count() total_points = len(grid) # Try Ray if RAY_AVAILABLE and _try_init_ray(num_workers): try: return _calculate_with_ray( grid, point_elevations, site_dict, settings_dict, terrain_cache, buildings, streets, water_bodies, vegetation_areas, site_elevation, num_workers, log_fn, cancel_token, precomputed, progress_fn, ) except Exception as e: log_fn(f"Ray execution failed: {e} — falling back to sequential") # Fallback: ProcessPoolExecutor (shared memory eliminates per-chunk pickle) pool_workers = num_workers # Scale workers down based on data volume to prevent OOM. # Each worker unpickles + holds its own copy of buildings, OSM data, and # spatial index. With large datasets the per-worker memory can exceed # 300 MB, so reduce workers to keep total under ~2 GB. data_items = len(buildings) + len(streets) + len(water_bodies) + len(vegetation_areas) if data_items > 20000: pool_workers = min(pool_workers, 2) log_fn(f"Data volume high ({data_items} items) — capping workers at {pool_workers}") elif data_items > 10000: pool_workers = min(pool_workers, 3) log_fn(f"Data volume moderate ({data_items} items) — capping workers at {pool_workers}") elif data_items > 5000: pool_workers = min(pool_workers, 4) log_fn(f"Data volume elevated ({data_items} items) — capping workers at {pool_workers}") log_fn(f"ProcessPool: {pool_workers} workers (cpu_count={num_workers}, data_items={data_items})") if pool_workers > 1 and total_points > 100: try: return _calculate_with_process_pool( grid, point_elevations, site_dict, settings_dict, terrain_cache, buildings, streets, water_bodies, vegetation_areas, site_elevation, pool_workers, log_fn, cancel_token, precomputed, progress_fn, ) except (MemoryError, OSError) as e: log_fn(f"ProcessPool OOM/OS error: {e} — falling back to sequential") except Exception as e: log_fn(f"ProcessPool failed: {e} — falling back to sequential") # Last resort: sequential log_fn(f"Sequential fallback: {total_points} points") return _calculate_sequential( grid, point_elevations, site_dict, settings_dict, buildings, streets, water_bodies, vegetation_areas, site_elevation, log_fn, cancel_token, precomputed, progress_fn, ) # ── Ray backend ── def _calculate_with_ray( grid, point_elevations, site_dict, settings_dict, terrain_cache, buildings, streets, water_bodies, vegetation_areas, site_elevation, num_workers, log_fn, cancel_token=None, precomputed=None, progress_fn=None, ): """Execute using Ray shared-memory object store.""" total_points = len(grid) log_fn(f"Ray mode: {total_points} points, {num_workers} workers") # ── Put large data into Ray object store ── t_put = time.time() terrain_ref = ray.put(terrain_cache) buildings_ref = ray.put(buildings) osm_ref = ray.put({ 'streets': streets, 'water_bodies': water_bodies, 'vegetation_areas': vegetation_areas, }) cache_key = f"{site_dict['lat']:.4f},{site_dict['lon']:.4f},{len(buildings)}" config = { 'site_dict': site_dict, 'settings_dict': settings_dict, 'site_elevation': site_elevation, 'cache_key': cache_key, } if precomputed: config['precomputed'] = precomputed config_ref = ray.put(config) put_time = time.time() - t_put log_fn(f"ray.put() done in {put_time:.1f}s") # ── Prepare and submit chunks ── items = [ (lat, lon, point_elevations.get((lat, lon), 0.0)) for lat, lon in grid ] # Larger chunks to amortize IPC overhead (was num_workers*4) chunk_size = max(1, min(400, len(items) // max(2, num_workers))) chunks = [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)] log_fn(f"Submitting {len(chunks)} chunks of ~{chunk_size} points") t_calc = time.time() pending = [ _ray_process_chunk.remote(chunk, terrain_ref, buildings_ref, osm_ref, config_ref) for chunk in chunks ] # ── Collect results with progress via ray.wait() ── all_results: List[Dict] = [] total_chunks = len(pending) remaining = list(pending) completed_chunks = 0 while remaining: # Check cancellation before waiting if cancel_token and cancel_token.is_cancelled: log_fn(f"Cancelled — aborting {len(remaining)} remaining Ray chunks") for ref in remaining: try: ray.cancel(ref, force=True) except Exception: pass break # Wait for at least 1 result, batch up to ~10% for progress logging batch = max(1, min(len(remaining), total_chunks // 10 or 1)) done, remaining = ray.wait(remaining, num_returns=batch, timeout=30) for ref in done: try: chunk_results = ray.get(ref) all_results.extend(chunk_results) except Exception as e: log_fn(f"Chunk error: {e}") completed_chunks += len(done) pct = completed_chunks * 100 // total_chunks elapsed = time.time() - t_calc pts = len(all_results) rate = pts / elapsed if elapsed > 0 else 0 eta = (total_points - pts) / rate if rate > 0 else 0 log_fn(f"Progress: {completed_chunks}/{total_chunks} chunks ({pct}%) — " f"{pts} pts, {rate:.0f} pts/s, ETA {eta:.0f}s") if progress_fn: # Map chunk progress to 40%-95% range progress_fn("Calculating coverage", 0.40 + 0.55 * (completed_chunks / total_chunks)) calc_time = time.time() - t_calc log_fn(f"Ray done: {calc_time:.1f}s, {len(all_results)} results " f"({calc_time / max(1, total_points) * 1000:.1f}ms/point)") # Force garbage collection after Ray computation gc.collect() timing = { "parallel_total": calc_time, "ray_put": put_time, "workers": num_workers, "backend": "ray", } return all_results, timing # ── ProcessPoolExecutor fallback ── def _pool_worker_process_chunk(args): """Worker function for ProcessPoolExecutor. Processes a chunk of points.""" chunk, terrain_cache, buildings, osm_data, config = args from app.services.terrain_service import terrain_service terrain_service._tile_cache = terrain_cache spatial_idx = None if buildings: from app.services.spatial_index import SpatialIndex spatial_idx = SpatialIndex() spatial_idx.build(buildings) from app.services.coverage_service import CoverageService, SiteParams, CoverageSettings site = SiteParams(**config['site_dict']) settings = CoverageSettings(**config['settings_dict']) svc = CoverageService() timing = { "los": 0.0, "buildings": 0.0, "antenna": 0.0, "dominant_path": 0.0, "street_canyon": 0.0, "reflection": 0.0, "vegetation": 0.0, "lod_none": 0, "lod_simplified": 0, "lod_full": 0, } precomputed = config.get('precomputed') results = [] for lat, lon, point_elev in chunk: pre = precomputed.get((lat, lon)) if precomputed else None point = svc._calculate_point_sync( site, lat, lon, settings, buildings, osm_data.get('streets', []), spatial_idx, osm_data.get('water_bodies', []), osm_data.get('vegetation_areas', []), config['site_elevation'], point_elev, timing, precomputed_distance=pre.get('distance') if pre else None, precomputed_path_loss=pre.get('path_loss') if pre else None, ) if point.rsrp >= settings.min_signal: results.append(point.model_dump()) return results def _store_terrain_in_shm(terrain_cache: Dict[str, np.ndarray], log_fn) -> Tuple[list, Dict[str, dict]]: """Store terrain tile arrays in shared memory. Returns (shm_blocks, tile_refs). tile_refs is a dict mapping tile_name -> {shm_name, shape, dtype_str} that workers use to reconstruct numpy arrays from shared memory. """ import multiprocessing.shared_memory as shm_mod blocks = [] refs = {} for tile_name, arr in terrain_cache.items(): try: block = shm_mod.SharedMemory(create=True, size=arr.nbytes) blocks.append(block) # Copy tile data to shared memory shm_arr = np.ndarray(arr.shape, dtype=arr.dtype, buffer=block.buf) shm_arr[:] = arr[:] refs[tile_name] = { 'shm_name': block.name, 'shape': arr.shape, 'dtype': str(arr.dtype), } except Exception as e: log_fn(f"Failed to store tile {tile_name} in shm: {e}") # Fallback: worker will have to use pickled copy pass return blocks, refs def _store_pickle_in_shm(data, label: str, log_fn) -> Tuple[Optional[Any], Optional[dict]]: """Pickle arbitrary data into a SharedMemory block. Returns (shm_block, ref_dict) where ref_dict = {shm_name, size}. On failure returns (None, None) and caller should fall back to pickle. """ import multiprocessing.shared_memory as shm_mod import pickle try: blob = pickle.dumps(data, protocol=pickle.HIGHEST_PROTOCOL) size = len(blob) block = shm_mod.SharedMemory(create=True, size=size) block.buf[:size] = blob mb = size / (1024 * 1024) log_fn(f"{label} in shared memory: {mb:.1f} MB") return block, {'shm_name': block.name, 'size': size} except Exception as e: log_fn(f"Failed to store {label} in shm: {e}") return None, None def _pool_worker_shm_chunk(args): """Worker function that reads terrain from shared memory instead of pickle.""" import multiprocessing.shared_memory as shm_mod chunk, terrain_shm_refs, buildings, osm_data, config = args # Reconstruct terrain cache from shared memory (zero-copy numpy views) terrain_cache = {} for tile_name, ref in terrain_shm_refs.items(): try: block = shm_mod.SharedMemory(name=ref['shm_name']) terrain_cache[tile_name] = np.ndarray( ref['shape'], dtype=ref['dtype'], buffer=block.buf, ) except Exception: pass # Inject terrain cache from app.services.terrain_service import terrain_service terrain_service._tile_cache = terrain_cache # Build spatial index global _worker_spatial_idx, _worker_cache_key cache_key = config.get('cache_key', '') if _worker_cache_key != cache_key: if buildings: from app.services.spatial_index import SpatialIndex _worker_spatial_idx = SpatialIndex() _worker_spatial_idx.build(buildings) else: _worker_spatial_idx = None _worker_cache_key = cache_key # Process points from app.services.coverage_service import CoverageService, SiteParams, CoverageSettings site = SiteParams(**config['site_dict']) settings = CoverageSettings(**config['settings_dict']) svc = CoverageService() timing = { "los": 0.0, "buildings": 0.0, "antenna": 0.0, "dominant_path": 0.0, "street_canyon": 0.0, "reflection": 0.0, "vegetation": 0.0, "lod_none": 0, "lod_simplified": 0, "lod_full": 0, } precomputed = config.get('precomputed') results = [] for lat, lon, point_elev in chunk: pre = precomputed.get((lat, lon)) if precomputed else None point = svc._calculate_point_sync( site, lat, lon, settings, buildings, osm_data.get('streets', []), _worker_spatial_idx, osm_data.get('water_bodies', []), osm_data.get('vegetation_areas', []), config['site_elevation'], point_elev, timing, precomputed_distance=pre.get('distance') if pre else None, precomputed_path_loss=pre.get('path_loss') if pre else None, ) if point.rsrp >= settings.min_signal: results.append(point.model_dump()) return results _worker_chunk_count: int = 0 # per-worker chunk counter def _pool_worker_shm_shared(args): """Worker: terrain + buildings + OSM all via shared memory. Per-chunk args are tiny (~8 KB): just point coords, shm refs, and config. Buildings and OSM data are unpickled from shared memory ONCE per worker and cached in module globals for subsequent chunks. """ import multiprocessing.shared_memory as shm_mod import pickle global _worker_chunk_count _worker_chunk_count += 1 pid = os.getpid() t_worker_start = time.perf_counter() chunk, terrain_shm_refs, shared_data_refs, config = args # ── Reconstruct terrain from shared memory ── t0 = time.perf_counter() terrain_cache = {} for tile_name, ref in terrain_shm_refs.items(): try: block = shm_mod.SharedMemory(name=ref['shm_name']) terrain_cache[tile_name] = np.ndarray( ref['shape'], dtype=ref['dtype'], buffer=block.buf, ) except Exception: pass from app.services.terrain_service import terrain_service terrain_service._tile_cache = terrain_cache t_terrain_shm = time.perf_counter() - t0 # ── Read buildings + OSM from shared memory (cached per worker) ── global _worker_shared_buildings, _worker_shared_osm_data, _worker_shared_data_key global _worker_spatial_idx, _worker_cache_key data_key = config.get('cache_key', '') cached = (_worker_shared_data_key == data_key) t_unpickle_bld = 0.0 t_unpickle_osm = 0.0 t_spatial = 0.0 if not cached: # First chunk for this calculation — unpickle from shm buildings_ref = shared_data_refs.get('buildings') osm_ref = shared_data_refs.get('osm_data') if buildings_ref: try: t0 = time.perf_counter() blk = shm_mod.SharedMemory(name=buildings_ref['shm_name']) _worker_shared_buildings = pickle.loads(bytes(blk.buf[:buildings_ref['size']])) t_unpickle_bld = time.perf_counter() - t0 except Exception: _worker_shared_buildings = [] else: _worker_shared_buildings = [] if osm_ref: try: t0 = time.perf_counter() blk = shm_mod.SharedMemory(name=osm_ref['shm_name']) _worker_shared_osm_data = pickle.loads(bytes(blk.buf[:osm_ref['size']])) t_unpickle_osm = time.perf_counter() - t0 except Exception: _worker_shared_osm_data = {} else: _worker_shared_osm_data = {} _worker_shared_data_key = data_key # Rebuild spatial index for new data t0 = time.perf_counter() if _worker_shared_buildings: from app.services.spatial_index import SpatialIndex _worker_spatial_idx = SpatialIndex() _worker_spatial_idx.build(_worker_shared_buildings) else: _worker_spatial_idx = None _worker_cache_key = data_key t_spatial = time.perf_counter() - t0 print( f"[WORKER {pid}] Init: terrain_shm={t_terrain_shm*1000:.1f}ms " f"unpickle_bld={t_unpickle_bld*1000:.1f}ms " f"unpickle_osm={t_unpickle_osm*1000:.1f}ms " f"spatial={t_spatial*1000:.1f}ms " f"buildings={len(_worker_shared_buildings or [])} " f"tiles={len(terrain_cache)}", flush=True, ) print( f"[WORKER {pid}] Processing chunk {_worker_chunk_count}, " f"cached={cached}, points={len(chunk)}", flush=True, ) buildings = _worker_shared_buildings or [] osm_data = _worker_shared_osm_data or {} # ── Imports + object creation (timed) ── t0 = time.perf_counter() from app.services.coverage_service import CoverageService, SiteParams, CoverageSettings t_import = time.perf_counter() - t0 t0 = time.perf_counter() site = SiteParams(**config['site_dict']) settings = CoverageSettings(**config['settings_dict']) svc = CoverageService() t_pydantic = time.perf_counter() - t0 timing = { "los": 0.0, "buildings": 0.0, "antenna": 0.0, "dominant_path": 0.0, "street_canyon": 0.0, "reflection": 0.0, "vegetation": 0.0, "lod_none": 0, "lod_simplified": 0, "lod_full": 0, } precomputed = config.get('precomputed') streets = osm_data.get('streets', []) water = osm_data.get('water_bodies', []) veg = osm_data.get('vegetation_areas', []) site_elev = config['site_elevation'] t_init_done = time.perf_counter() init_ms = (t_init_done - t_worker_start) * 1000 # ── Process points with per-point profiling (first 3 only) ── results = [] t_loop_start = time.perf_counter() t_model_dump_total = 0.0 n_dumped = 0 for i, (lat, lon, point_elev) in enumerate(chunk): pre = precomputed.get((lat, lon)) if precomputed else None # Snapshot timing dict before call (for first 3 points) if i < 3: timing_before = {k: v for k, v in timing.items()} t_pt = time.perf_counter() point = svc._calculate_point_sync( site, lat, lon, settings, buildings, streets, _worker_spatial_idx, water, veg, site_elev, point_elev, timing, precomputed_distance=pre.get('distance') if pre else None, precomputed_path_loss=pre.get('path_loss') if pre else None, ) if i < 3: t_pt_done = time.perf_counter() pt_ms = (t_pt_done - t_pt) * 1000 deltas = {k: (timing[k] - timing_before.get(k, 0)) * 1000 for k in timing} parts = " ".join(f"{k}={v:.2f}" for k, v in deltas.items() if v > 0.001) print( f"[WORKER {pid}] Point {i}: {pt_ms:.2f}ms " f"rsrp={point.rsrp:.1f} dist={point.distance:.0f}m " f"breakdown=[{parts}]", flush=True, ) if point.rsrp >= settings.min_signal: t_md = time.perf_counter() results.append(point.model_dump()) t_model_dump_total += time.perf_counter() - t_md n_dumped += 1 t_loop_done = time.perf_counter() loop_ms = (t_loop_done - t_loop_start) * 1000 total_ms = (t_loop_done - t_worker_start) * 1000 avg_pt = loop_ms / len(chunk) if chunk else 0 avg_dump = (t_model_dump_total * 1000 / n_dumped) if n_dumped else 0 print( f"[WORKER {pid}] Chunk done: total={total_ms:.0f}ms " f"init={init_ms:.0f}ms loop={loop_ms:.0f}ms " f"avg_pt={avg_pt:.2f}ms model_dump={avg_dump:.2f}ms×{n_dumped} " f"import={t_import*1000:.1f}ms pydantic={t_pydantic*1000:.1f}ms " f"terrain_shm={t_terrain_shm*1000:.1f}ms " f"results={len(results)}/{len(chunk)}", flush=True, ) return results def _calculate_with_process_pool( grid, point_elevations, site_dict, settings_dict, terrain_cache, buildings, streets, water_bodies, vegetation_areas, site_elevation, num_workers, log_fn, cancel_token=None, precomputed=None, progress_fn=None, ): """Execute using ProcessPoolExecutor. Uses shared memory for terrain tiles (zero-copy numpy views), buildings, and OSM data (pickle-once, read-many) to eliminate per-chunk serialization overhead. """ from concurrent.futures import ProcessPoolExecutor, as_completed total_points = len(grid) building_count = len(buildings) data_items = building_count + len(streets) + len(water_bodies) + len(vegetation_areas) log_fn(f"ProcessPool mode: {total_points} points, {num_workers} workers, " f"{building_count} buildings, {data_items} total OSM items") # Log memory at start try: with open('/proc/self/status') as f: for line in f: if line.startswith('VmRSS:'): log_fn(f"Memory before calculation: {line.strip()}") break except Exception: pass # Store terrain tiles in shared memory shm_blocks = [] terrain_shm_refs = {} try: shm_blocks, terrain_shm_refs = _store_terrain_in_shm(terrain_cache, log_fn) if terrain_shm_refs: tile_mb = sum( np.prod(r['shape']) * np.dtype(r['dtype']).itemsize for r in terrain_shm_refs.values() ) / (1024 * 1024) log_fn(f"Stored {len(terrain_shm_refs)} terrain tiles in shared memory ({tile_mb:.0f} MB)") use_shm = True else: use_shm = False except Exception as e: log_fn(f"Shared memory setup failed ({e}), using pickle fallback") use_shm = False # Store buildings + OSM data in shared memory (pickle once, read many) shared_data_refs = {} if use_shm: bld_block, bld_ref = _store_pickle_in_shm(buildings, "Buildings", log_fn) if bld_block: shm_blocks.append(bld_block) shared_data_refs['buildings'] = bld_ref osm_data_dict = { 'streets': streets, 'water_bodies': water_bodies, 'vegetation_areas': vegetation_areas, } osm_block, osm_ref = _store_pickle_in_shm(osm_data_dict, "OSM data", log_fn) if osm_block: shm_blocks.append(osm_block) shared_data_refs['osm_data'] = osm_ref items = [ (lat, lon, point_elevations.get((lat, lon), 0.0)) for lat, lon in grid ] # Target larger chunks to amortize IPC overhead (was num_workers*2) chunk_size = max(1, min(400, len(items) // max(2, num_workers))) chunks = [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)] log_fn(f"Submitting {len(chunks)} chunks of ~{chunk_size} points") cache_key = f"{site_dict['lat']:.4f},{site_dict['lon']:.4f},{len(buildings)}" config = { 'site_dict': site_dict, 'settings_dict': settings_dict, 'site_elevation': site_elevation, 'cache_key': cache_key, } if precomputed: config['precomputed'] = precomputed osm_data = { 'streets': streets, 'water_bodies': water_bodies, 'vegetation_areas': vegetation_areas, } t_calc = time.time() all_results: List[Dict] = [] pool = None try: ctx = mp.get_context('spawn') pool = ProcessPoolExecutor(max_workers=num_workers, mp_context=ctx) _set_active_pool(pool) if use_shm and shared_data_refs: # Full shared memory path: terrain + buildings + OSM all via shm worker_fn = _pool_worker_shm_shared futures = { pool.submit( worker_fn, (chunk, terrain_shm_refs, shared_data_refs, config), ): i for i, chunk in enumerate(chunks) } elif use_shm and data_items <= 2000: # Terrain-only shm — buildings/OSM pickled per chunk. # Only safe for small datasets; large datasets would OOM from # pickle copies (num_chunks × pickle_size). log_fn(f"Terrain-only shm (small data: {data_items} items)") worker_fn = _pool_worker_shm_chunk futures = { pool.submit( worker_fn, (chunk, terrain_shm_refs, buildings, osm_data, config), ): i for i, chunk in enumerate(chunks) } elif data_items <= 2000: # Full pickle fallback — only safe for small datasets log_fn(f"Full pickle path (small data: {data_items} items)") futures = { pool.submit( _pool_worker_process_chunk, (chunk, terrain_cache, buildings, osm_data, config), ): i for i, chunk in enumerate(chunks) } else: # Large dataset + shared memory failed → per-chunk pickle would OOM. # Bail out; caller will fall back to sequential. log_fn(f"Shared memory failed for large dataset ({data_items} items) " f"— skipping ProcessPool to avoid OOM") raise MemoryError( f"Cannot safely pickle {data_items} OSM items per chunk" ) completed_chunks = 0 for future in as_completed(futures): if cancel_token and cancel_token.is_cancelled: log_fn(f"Cancelled — cancelling {len(futures) - completed_chunks - 1} pending futures") for f in futures: f.cancel() break try: chunk_results = future.result() all_results.extend(chunk_results) except Exception as e: log_fn(f"Chunk error: {e}") completed_chunks += 1 pct = completed_chunks * 100 // len(chunks) elapsed = time.time() - t_calc pts = len(all_results) rate = pts / elapsed if elapsed > 0 else 0 eta = (total_points - pts) / rate if rate > 0 else 0 log_fn(f"Progress: {completed_chunks}/{len(chunks)} chunks ({pct}%) — " f"{pts} pts, {rate:.0f} pts/s, ETA {eta:.0f}s") if progress_fn: progress_fn("Calculating coverage", 0.40 + 0.55 * (completed_chunks / len(chunks))) except MemoryError: raise # Propagate to caller for sequential fallback except Exception as e: log_fn(f"ProcessPool error: {e}") finally: _clear_active_pool() if pool: pool.shutdown(wait=False, cancel_futures=True) time.sleep(0.5) killed = _kill_worker_processes() if killed > 0: log_fn(f"Force killed {killed} orphaned workers") # Cleanup shared memory blocks for block in shm_blocks: try: block.close() block.unlink() except Exception: pass # Release large local references before GC chunks = None # noqa: F841 items = None # noqa: F841 osm_data = None # noqa: F841 shared_data_refs = None # noqa: F841 # Force garbage collection to release memory from workers gc.collect() # Log memory after cleanup try: with open('/proc/self/status') as f: for line in f: if line.startswith('VmRSS:'): log_fn(f"Memory after cleanup: {line.strip()}") break except Exception: pass calc_time = time.time() - t_calc log_fn(f"ProcessPool done: {calc_time:.1f}s, {len(all_results)} results " f"({calc_time / max(1, total_points) * 1000:.1f}ms/point)") timing = { "parallel_total": calc_time, "workers": num_workers, "backend": "process_pool" + ( "/shm_full" if (use_shm and shared_data_refs) else "/shm_terrain" if use_shm else "/pickle" ), } return all_results, timing # ── Sequential fallback ── def _calculate_sequential( grid, point_elevations, site_dict, settings_dict, buildings, streets, water_bodies, vegetation_areas, site_elevation, log_fn, cancel_token=None, precomputed=None, progress_fn=None, ): """Sequential fallback — no extra dependencies, runs in calling thread.""" from app.services.coverage_service import CoverageService, SiteParams, CoverageSettings from app.services.spatial_index import SpatialIndex site = SiteParams(**site_dict) settings = CoverageSettings(**settings_dict) svc = CoverageService() spatial_idx = None if buildings: spatial_idx = SpatialIndex() spatial_idx.build(buildings) total = len(grid) log_interval = max(1, total // 20) timing = { "los": 0.0, "buildings": 0.0, "antenna": 0.0, "dominant_path": 0.0, "street_canyon": 0.0, "reflection": 0.0, "vegetation": 0.0, "lod_none": 0, "lod_simplified": 0, "lod_full": 0, } t0 = time.time() results = [] for i, (lat, lon) in enumerate(grid): # Check cancellation if cancel_token and cancel_token.is_cancelled: log_fn(f"Sequential cancelled at {i}/{total}") break if i % log_interval == 0: log_fn(f"Sequential: {i}/{total} ({i * 100 // total}%)") if progress_fn: progress_fn("Calculating coverage", 0.40 + 0.55 * (i / total)) point_elev = point_elevations.get((lat, lon), 0.0) # Use precomputed values if available pre = precomputed.get((lat, lon)) if precomputed else None point = svc._calculate_point_sync( site, lat, lon, settings, buildings, streets, spatial_idx, water_bodies, vegetation_areas, site_elevation, point_elev, timing, precomputed_distance=pre.get('distance') if pre else None, precomputed_path_loss=pre.get('path_loss') if pre else None, ) if point.rsrp >= settings.min_signal: results.append(point.model_dump()) calc_time = time.time() - t0 log_fn(f"Sequential done: {calc_time:.1f}s, {len(results)} results " f"({calc_time / max(1, total) * 1000:.1f}ms/point)") # Force garbage collection after sequential computation gc.collect() timing["sequential_total"] = calc_time timing["backend"] = "sequential" return results, timing