From 221000d5b35758c48ef99f385c69e09f79149bf1 Mon Sep 17 00:00:00 2001 From: mytec Date: Sat, 31 Jan 2026 21:09:10 +0200 Subject: [PATCH] @mytec: refactor to ray ready for testing --- .claude/settings.local.json | 4 +- backend/app/api/routes/system.py | 19 +- backend/app/services/coverage_service.py | 5 +- .../app/services/parallel_coverage_service.py | 388 +++++++++++------- backend/requirements.txt | 1 + installer/rfcp-server.spec | 15 + 6 files changed, 278 insertions(+), 154 deletions(-) diff --git a/.claude/settings.local.json b/.claude/settings.local.json index e26f8ad..f1c7dec 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -20,7 +20,9 @@ "Bash(mv:*)", "Read(*)", "Write(*)", - "Bash(python3:*)" + "Bash(python3:*)", + "Bash(source:*)", + "Bash(/mnt/d/root/rfcp/venv/bin/python3:*)" ] } } diff --git a/backend/app/api/routes/system.py b/backend/app/api/routes/system.py index c42d71b..b97f9a2 100644 --- a/backend/app/api/routes/system.py +++ b/backend/app/api/routes/system.py @@ -6,9 +6,22 @@ router = APIRouter() @router.get("/info") async def get_system_info(): - """Return system info: CPU cores, GPU availability, parallel support.""" + """Return system info: CPU cores, GPU availability, parallel backend.""" cpu_cores = mp.cpu_count() or 1 + # Check Ray + ray_available = False + ray_initialized = False + try: + from app.services.parallel_coverage_service import RAY_AVAILABLE + ray_available = RAY_AVAILABLE + if ray_available: + import ray + ray_initialized = ray.is_initialized() + except Exception: + pass + + # Check GPU gpu_info = None try: import cupy as cp @@ -24,7 +37,9 @@ async def get_system_info(): return { "cpu_cores": cpu_cores, "parallel_workers": min(cpu_cores, 14), - "parallel_enabled": True, + "parallel_backend": "ray" if ray_available else "sequential", + "ray_available": ray_available, + "ray_initialized": ray_initialized, "gpu": gpu_info, "gpu_enabled": gpu_info is not None, } diff --git a/backend/app/services/coverage_service.py b/backend/app/services/coverage_service.py index 1e9bd61..18b57c2 100644 --- a/backend/app/services/coverage_service.py +++ b/backend/app/services/coverage_service.py @@ -54,7 +54,7 @@ from app.services.weather_service import weather_service from app.services.indoor_service import indoor_service from app.services.atmospheric_service import atmospheric_service from app.services.parallel_coverage_service import ( - calculate_coverage_parallel, get_cpu_count + calculate_coverage_parallel, get_cpu_count, get_parallel_backend, ) @@ -360,8 +360,9 @@ class CoverageService: num_workers = get_cpu_count() if use_parallel: + backend = get_parallel_backend() _clog(f"━━━ PHASE 3: Calculating {len(grid)} points " - f"(PARALLEL, {num_workers} workers) ━━━") + f"(PARALLEL/{backend}, {num_workers} workers) ━━━") try: loop = asyncio.get_event_loop() diff --git a/backend/app/services/parallel_coverage_service.py b/backend/app/services/parallel_coverage_service.py index ed1f879..b6758af 100644 --- a/backend/app/services/parallel_coverage_service.py +++ b/backend/app/services/parallel_coverage_service.py @@ -1,93 +1,71 @@ """ -Parallel coverage calculation using ProcessPoolExecutor. +Parallel coverage calculation. -Workers receive pre-loaded terrain cache, buildings, and OSM data -via a shared pickle file. Each worker initializes module-level -service singletons with the cached data, then processes point chunks. +Primary backend: Ray (shared-memory object store, zero-copy numpy arrays) +Fallback: Sequential (single-threaded, no extra dependencies) + +Ray advantages over ProcessPoolExecutor: + - ray.put() stores terrain cache ONCE in shared memory + - Workers access numpy arrays via zero-copy (no per-worker pickle/copy) + - Eliminates MemoryError on Detailed preset with large terrain + buildings Usage: - from app.services.parallel_coverage_service import calculate_coverage_parallel + from app.services.parallel_coverage_service import ( + calculate_coverage_parallel, get_cpu_count, RAY_AVAILABLE, + ) """ import os import sys import time -import pickle -import tempfile import multiprocessing as mp -from concurrent.futures import ProcessPoolExecutor from typing import List, Dict, Tuple, Any, Optional, Callable import numpy as np -# ── Module-level worker state (set once per process by _init_worker) ── +# ── Try to import Ray ── -_worker_data: Dict[str, Any] = {} -_worker_initialized = False +RAY_AVAILABLE = False +try: + import ray + RAY_AVAILABLE = True +except ImportError: + ray = None # type: ignore -def _init_worker(shared_data_path: str): - """Initialize a worker process with shared data from temp file. +# ── Worker-level spatial index cache (persists across tasks in same worker) ── - Injects terrain cache into the module-level terrain_service singleton - so that all other services (LOS, dominant path, etc.) automatically - see the cached tiles. +_worker_spatial_idx = None +_worker_cache_key: Optional[str] = None + + +def _ray_process_chunk_impl(chunk, terrain_cache, buildings, osm_data, config): + """Implementation: process a chunk of (lat, lon, elevation) tuples. + + Called inside a Ray remote function. terrain_cache numpy arrays come + from the Ray object store via zero-copy. """ - global _worker_data, _worker_initialized + global _worker_spatial_idx, _worker_cache_key - if _worker_initialized: - return - - t0 = time.time() - pid = os.getpid() - - # Load shared data - with open(shared_data_path, 'rb') as f: - data = pickle.load(f) - - # Inject terrain cache into the global singleton — - # this automatically fixes los_service, dominant_path_service, etc. - # because they hold references to the same terrain_service object. + # Inject terrain cache into the module-level singleton. + # For numpy arrays, Ray gives us a read-only view into shared memory. from app.services.terrain_service import terrain_service - terrain_service._tile_cache = data['terrain_cache'] + terrain_service._tile_cache = terrain_cache - # Build spatial index from buildings - from app.services.spatial_index import SpatialIndex - spatial_idx = SpatialIndex() - if data['buildings']: - spatial_idx.build(data['buildings']) + # Build or reuse spatial index (expensive — ~1s for 350K buildings). + cache_key = config.get('cache_key', '') + if _worker_cache_key != cache_key: + from app.services.spatial_index import SpatialIndex + _worker_spatial_idx = SpatialIndex() + if buildings: + _worker_spatial_idx.build(buildings) + _worker_cache_key = cache_key - _worker_data = { - 'buildings': data['buildings'], - 'streets': data['streets'], - 'water_bodies': data['water_bodies'], - 'vegetation_areas': data['vegetation_areas'], - 'spatial_idx': spatial_idx, - 'site_dict': data['site_dict'], - 'settings_dict': data['settings_dict'], - 'site_elevation': data['site_elevation'], - } - - _worker_initialized = True - dt = time.time() - t0 - print(f"[WORKER {pid}] Initialized in {dt:.1f}s — " - f"{len(data['terrain_cache'])} tiles, " - f"{len(data['buildings'])} buildings, " - f"{len(data.get('vegetation_areas', []))} vegetation", - flush=True) - - -def _process_chunk(chunk: List[Tuple[float, float, float]]) -> List[Dict]: - """Process a chunk of (lat, lon, point_elevation) tuples. - - Returns list of CoveragePoint dicts for points above min_signal. - """ + # Process points from app.services.coverage_service import CoverageService, SiteParams, CoverageSettings - data = _worker_data - site = SiteParams(**data['site_dict']) - settings = CoverageSettings(**data['settings_dict']) - + site = SiteParams(**config['site_dict']) + settings = CoverageSettings(**config['settings_dict']) svc = CoverageService() timing = { @@ -100,10 +78,10 @@ def _process_chunk(chunk: List[Tuple[float, float, float]]) -> List[Dict]: for lat, lon, point_elev in chunk: point = svc._calculate_point_sync( site, lat, lon, settings, - data['buildings'], data['streets'], - data['spatial_idx'], data['water_bodies'], - data['vegetation_areas'], - data['site_elevation'], point_elev, timing, + buildings, osm_data.get('streets', []), + _worker_spatial_idx, osm_data.get('water_bodies', []), + osm_data.get('vegetation_areas', []), + config['site_elevation'], point_elev, timing, ) if point.rsrp >= settings.min_signal: results.append(point.model_dump()) @@ -111,6 +89,13 @@ def _process_chunk(chunk: List[Tuple[float, float, float]]) -> List[Dict]: return results +# ── Register the Ray remote function (only if Ray is available) ── + +_ray_process_chunk = None +if RAY_AVAILABLE: + _ray_process_chunk = ray.remote(_ray_process_chunk_impl) + + # ── Public API ── @@ -122,6 +107,42 @@ def get_cpu_count() -> int: return 4 +def get_parallel_backend() -> str: + """Return which parallel backend is available.""" + if RAY_AVAILABLE: + return "ray" + return "sequential" + + +def _try_init_ray(num_cpus: int) -> bool: + """Initialize Ray lazily. Returns True if Ray is ready.""" + if not RAY_AVAILABLE: + return False + + if ray.is_initialized(): + return True + + try: + data_path = os.environ.get('RFCP_DATA_PATH', './data') + ray_tmp = os.path.join(data_path, 'ray_tmp') + os.makedirs(ray_tmp, exist_ok=True) + + ray.init( + num_cpus=num_cpus, + include_dashboard=False, + log_to_driver=True, + _temp_dir=ray_tmp, + ) + print(f"[PARALLEL] Ray initialized: {num_cpus} CPUs, " + f"object store ~{ray.cluster_resources().get('object_store_memory', 0) / 1e9:.1f}GB", + flush=True) + return True + + except Exception as e: + print(f"[PARALLEL] Ray init failed: {e}", flush=True) + return False + + def calculate_coverage_parallel( grid: List[Tuple[float, float]], point_elevations: Dict[Tuple[float, float], float], @@ -136,21 +157,12 @@ def calculate_coverage_parallel( num_workers: Optional[int] = None, log_fn: Optional[Callable[[str], None]] = None, ) -> Tuple[List[Dict], Dict[str, float]]: - """Calculate coverage points in parallel using ProcessPoolExecutor. + """Calculate coverage points in parallel. - Args: - grid: List of (lat, lon) tuples. - point_elevations: Pre-computed {(lat, lon): elevation} dict. - site_dict: SiteParams as a dict (for pickling). - settings_dict: CoverageSettings as a dict (for pickling). - terrain_cache: {tile_name: np.ndarray} — pre-loaded SRTM tiles. - buildings, streets, water_bodies, vegetation_areas: OSM data. - site_elevation: Elevation at site location (meters). - num_workers: Override worker count (default: auto-detect). - log_fn: Logging function (receives string messages). + Uses Ray if available (shared memory, zero-copy numpy), otherwise + falls back to sequential single-threaded calculation. - Returns: - (results, timing) where results is list of CoveragePoint dicts. + Same signature as before — drop-in replacement. """ if log_fn is None: log_fn = lambda msg: print(f"[PARALLEL] {msg}", flush=True) @@ -159,92 +171,170 @@ def calculate_coverage_parallel( num_workers = get_cpu_count() total_points = len(grid) - log_fn(f"Parallel mode: {total_points} points, {num_workers} workers") - # Prepare items with pre-computed elevations + # Try Ray + if RAY_AVAILABLE and _try_init_ray(num_workers): + try: + return _calculate_with_ray( + grid, point_elevations, site_dict, settings_dict, + terrain_cache, buildings, streets, water_bodies, + vegetation_areas, site_elevation, + num_workers, log_fn, + ) + except Exception as e: + log_fn(f"Ray execution failed: {e} — falling back to sequential") + + # Fallback: sequential + log_fn(f"Sequential fallback: {total_points} points") + return _calculate_sequential( + grid, point_elevations, site_dict, settings_dict, + buildings, streets, water_bodies, vegetation_areas, + site_elevation, log_fn, + ) + + +# ── Ray backend ── + + +def _calculate_with_ray( + grid, point_elevations, site_dict, settings_dict, + terrain_cache, buildings, streets, water_bodies, + vegetation_areas, site_elevation, + num_workers, log_fn, +): + """Execute using Ray shared-memory object store.""" + total_points = len(grid) + log_fn(f"Ray mode: {total_points} points, {num_workers} workers") + + # ── Put large data into Ray object store ── + # Numpy arrays (terrain tiles) get zero-copy shared memory. + # Python objects (buildings) get serialized once, stored in plasma. + t_put = time.time() + + terrain_ref = ray.put(terrain_cache) + buildings_ref = ray.put(buildings) + osm_ref = ray.put({ + 'streets': streets, + 'water_bodies': water_bodies, + 'vegetation_areas': vegetation_areas, + }) + + cache_key = f"{site_dict['lat']:.4f},{site_dict['lon']:.4f},{len(buildings)}" + config_ref = ray.put({ + 'site_dict': site_dict, + 'settings_dict': settings_dict, + 'site_elevation': site_elevation, + 'cache_key': cache_key, + }) + + put_time = time.time() - t_put + log_fn(f"ray.put() done in {put_time:.1f}s") + + # ── Prepare and submit chunks ── items = [ (lat, lon, point_elevations.get((lat, lon), 0.0)) for lat, lon in grid ] - # Split into chunks — ~4 chunks per worker for granular progress - chunks_per_worker = 4 - chunk_size = max(1, len(items) // (num_workers * chunks_per_worker)) + # ~4 chunks per worker for granular progress + chunk_size = max(1, len(items) // (num_workers * 4)) chunks = [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)] - log_fn(f"Split into {len(chunks)} chunks of ~{chunk_size} points") + log_fn(f"Submitting {len(chunks)} chunks of ~{chunk_size} points") - # ── Serialize shared data to temp file (once, not per-worker) ── - t_serial = time.time() - shared_data = { - 'terrain_cache': terrain_cache, - 'buildings': buildings, - 'streets': streets, - 'water_bodies': water_bodies, - 'vegetation_areas': vegetation_areas, - 'site_dict': site_dict, - 'settings_dict': settings_dict, - 'site_elevation': site_elevation, - } - - tmpfile = tempfile.NamedTemporaryFile(delete=False, suffix='.pkl') - try: - pickle.dump(shared_data, tmpfile, protocol=pickle.HIGHEST_PROTOCOL) - finally: - tmpfile.close() - - shared_data_path = tmpfile.name - file_size_mb = os.path.getsize(shared_data_path) / (1024 * 1024) - serial_time = time.time() - t_serial - log_fn(f"Serialized shared data: {file_size_mb:.1f}MB in {serial_time:.1f}s") - - # Free main-process memory for the duplicate - del shared_data - - # ── Run in process pool ── t_calc = time.time() + pending = [ + _ray_process_chunk.remote(chunk, terrain_ref, buildings_ref, osm_ref, config_ref) + for chunk in chunks + ] + + # ── Collect results with progress via ray.wait() ── all_results: List[Dict] = [] - completed_points = 0 + total_chunks = len(pending) + remaining = list(pending) + completed_chunks = 0 - try: - with ProcessPoolExecutor( - max_workers=num_workers, - initializer=_init_worker, - initargs=(shared_data_path,), - ) as executor: - futures = [executor.submit(_process_chunk, chunk) for chunk in chunks] + while remaining: + # Wait for at least 1 result, batch up to ~10% for progress logging + batch = max(1, min(len(remaining), total_chunks // 10 or 1)) + done, remaining = ray.wait(remaining, num_returns=batch, timeout=600) - for i, future in enumerate(futures): - try: - chunk_results = future.result(timeout=600) # 10 min max per chunk - all_results.extend(chunk_results) - except Exception as e: - log_fn(f"Chunk {i} failed: {e}") + for ref in done: + try: + chunk_results = ray.get(ref) + all_results.extend(chunk_results) + except Exception as e: + log_fn(f"Chunk error: {e}") - completed_points += len(chunks[i]) - pct = min(100, completed_points * 100 // total_points) - elapsed = time.time() - t_calc - rate = completed_points / elapsed if elapsed > 0 else 0 - - # Log every ~10% or on last chunk - if (i + 1) % max(1, len(chunks) // 10) == 0 or i == len(chunks) - 1: - eta = (total_points - completed_points) / rate if rate > 0 else 0 - log_fn(f"Progress: {completed_points}/{total_points} ({pct}%) — " - f"{rate:.0f} pts/s, ETA {eta:.0f}s") - - finally: - # Clean up temp file - try: - os.unlink(shared_data_path) - except Exception: - pass + completed_chunks += len(done) + pct = completed_chunks * 100 // total_chunks + elapsed = time.time() - t_calc + pts = len(all_results) + rate = pts / elapsed if elapsed > 0 else 0 + eta = (total_points - pts) / rate if rate > 0 else 0 + log_fn(f"Progress: {completed_chunks}/{total_chunks} chunks ({pct}%) — " + f"{pts} pts, {rate:.0f} pts/s, ETA {eta:.0f}s") calc_time = time.time() - t_calc - log_fn(f"Parallel done: {calc_time:.1f}s, {len(all_results)} results " + log_fn(f"Ray done: {calc_time:.1f}s, {len(all_results)} results " f"({calc_time / max(1, total_points) * 1000:.1f}ms/point)") timing = { "parallel_total": calc_time, - "serialize": serial_time, + "ray_put": put_time, "workers": num_workers, + "backend": "ray", } return all_results, timing + + +# ── Sequential fallback ── + + +def _calculate_sequential( + grid, point_elevations, site_dict, settings_dict, + buildings, streets, water_bodies, vegetation_areas, + site_elevation, log_fn, +): + """Sequential fallback — no extra dependencies, runs in calling thread.""" + from app.services.coverage_service import CoverageService, SiteParams, CoverageSettings + from app.services.spatial_index import SpatialIndex + + site = SiteParams(**site_dict) + settings = CoverageSettings(**settings_dict) + svc = CoverageService() + + spatial_idx = SpatialIndex() + if buildings: + spatial_idx.build(buildings) + + total = len(grid) + log_interval = max(1, total // 20) + timing = { + "los": 0.0, "buildings": 0.0, "antenna": 0.0, + "dominant_path": 0.0, "street_canyon": 0.0, + "reflection": 0.0, "vegetation": 0.0, + } + + t0 = time.time() + results = [] + for i, (lat, lon) in enumerate(grid): + if i % log_interval == 0: + log_fn(f"Sequential: {i}/{total} ({i * 100 // total}%)") + + point_elev = point_elevations.get((lat, lon), 0.0) + point = svc._calculate_point_sync( + site, lat, lon, settings, + buildings, streets, spatial_idx, + water_bodies, vegetation_areas, + site_elevation, point_elev, timing, + ) + if point.rsrp >= settings.min_signal: + results.append(point.model_dump()) + + calc_time = time.time() - t0 + log_fn(f"Sequential done: {calc_time:.1f}s, {len(results)} results " + f"({calc_time / max(1, total) * 1000:.1f}ms/point)") + + timing["sequential_total"] = calc_time + timing["backend"] = "sequential" + return results, timing diff --git a/backend/requirements.txt b/backend/requirements.txt index ec6db3d..23e8577 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -11,3 +11,4 @@ requests==2.31.0 httpx==0.27.0 aiosqlite>=0.19.0 sqlalchemy>=2.0.0 +ray[default]>=2.9.0 diff --git a/installer/rfcp-server.spec b/installer/rfcp-server.spec index d381b84..5020b63 100644 --- a/installer/rfcp-server.spec +++ b/installer/rfcp-server.spec @@ -83,6 +83,21 @@ a = Analysis( # Encoding 'email.mime', 'email.mime.multipart', + # Ray (parallel processing) — graceful fallback if missing + 'ray', + 'ray._private', + 'ray._private.worker', + 'ray._private.node', + 'ray._private.services', + 'ray._private.utils', + 'ray._raylet', + 'ray.runtime_context', + 'ray.util', + # Multiprocessing (fallback) + 'multiprocessing', + 'multiprocessing.pool', + 'multiprocessing.queues', + 'concurrent.futures', ], hookspath=[], hooksconfig={},