Files
rfcp/backend/app/services/parallel_coverage_service.py

341 lines
11 KiB
Python

"""
Parallel coverage calculation.
Primary backend: Ray (shared-memory object store, zero-copy numpy arrays)
Fallback: Sequential (single-threaded, no extra dependencies)
Ray advantages over ProcessPoolExecutor:
- ray.put() stores terrain cache ONCE in shared memory
- Workers access numpy arrays via zero-copy (no per-worker pickle/copy)
- Eliminates MemoryError on Detailed preset with large terrain + buildings
Usage:
from app.services.parallel_coverage_service import (
calculate_coverage_parallel, get_cpu_count, RAY_AVAILABLE,
)
"""
import os
import sys
import time
import multiprocessing as mp
from typing import List, Dict, Tuple, Any, Optional, Callable
import numpy as np
# ── Try to import Ray ──
RAY_AVAILABLE = False
try:
import ray
RAY_AVAILABLE = True
except ImportError:
ray = None # type: ignore
# ── Worker-level spatial index cache (persists across tasks in same worker) ──
_worker_spatial_idx = None
_worker_cache_key: Optional[str] = None
def _ray_process_chunk_impl(chunk, terrain_cache, buildings, osm_data, config):
"""Implementation: process a chunk of (lat, lon, elevation) tuples.
Called inside a Ray remote function. terrain_cache numpy arrays come
from the Ray object store via zero-copy.
"""
global _worker_spatial_idx, _worker_cache_key
# Inject terrain cache into the module-level singleton.
# For numpy arrays, Ray gives us a read-only view into shared memory.
from app.services.terrain_service import terrain_service
terrain_service._tile_cache = terrain_cache
# Build or reuse spatial index (expensive — ~1s for 350K buildings).
cache_key = config.get('cache_key', '')
if _worker_cache_key != cache_key:
from app.services.spatial_index import SpatialIndex
_worker_spatial_idx = SpatialIndex()
if buildings:
_worker_spatial_idx.build(buildings)
_worker_cache_key = cache_key
# Process points
from app.services.coverage_service import CoverageService, SiteParams, CoverageSettings
site = SiteParams(**config['site_dict'])
settings = CoverageSettings(**config['settings_dict'])
svc = CoverageService()
timing = {
"los": 0.0, "buildings": 0.0, "antenna": 0.0,
"dominant_path": 0.0, "street_canyon": 0.0,
"reflection": 0.0, "vegetation": 0.0,
}
results = []
for lat, lon, point_elev in chunk:
point = svc._calculate_point_sync(
site, lat, lon, settings,
buildings, osm_data.get('streets', []),
_worker_spatial_idx, osm_data.get('water_bodies', []),
osm_data.get('vegetation_areas', []),
config['site_elevation'], point_elev, timing,
)
if point.rsrp >= settings.min_signal:
results.append(point.model_dump())
return results
# ── Register the Ray remote function (only if Ray is available) ──
_ray_process_chunk = None
if RAY_AVAILABLE:
_ray_process_chunk = ray.remote(_ray_process_chunk_impl)
# ── Public API ──
def get_cpu_count() -> int:
"""Get number of usable CPU cores, capped at 14."""
try:
return min(mp.cpu_count() or 4, 14)
except Exception:
return 4
def get_parallel_backend() -> str:
"""Return which parallel backend is available."""
if RAY_AVAILABLE:
return "ray"
return "sequential"
def _try_init_ray(num_cpus: int) -> bool:
"""Initialize Ray lazily. Returns True if Ray is ready."""
if not RAY_AVAILABLE:
return False
if ray.is_initialized():
return True
try:
data_path = os.environ.get('RFCP_DATA_PATH', './data')
ray_tmp = os.path.join(data_path, 'ray_tmp')
os.makedirs(ray_tmp, exist_ok=True)
ray.init(
num_cpus=num_cpus,
include_dashboard=False,
log_to_driver=True,
_temp_dir=ray_tmp,
)
print(f"[PARALLEL] Ray initialized: {num_cpus} CPUs, "
f"object store ~{ray.cluster_resources().get('object_store_memory', 0) / 1e9:.1f}GB",
flush=True)
return True
except Exception as e:
print(f"[PARALLEL] Ray init failed: {e}", flush=True)
return False
def calculate_coverage_parallel(
grid: List[Tuple[float, float]],
point_elevations: Dict[Tuple[float, float], float],
site_dict: Dict,
settings_dict: Dict,
terrain_cache: Dict[str, np.ndarray],
buildings: List,
streets: List,
water_bodies: List,
vegetation_areas: List,
site_elevation: float,
num_workers: Optional[int] = None,
log_fn: Optional[Callable[[str], None]] = None,
) -> Tuple[List[Dict], Dict[str, float]]:
"""Calculate coverage points in parallel.
Uses Ray if available (shared memory, zero-copy numpy), otherwise
falls back to sequential single-threaded calculation.
Same signature as before — drop-in replacement.
"""
if log_fn is None:
log_fn = lambda msg: print(f"[PARALLEL] {msg}", flush=True)
if num_workers is None:
num_workers = get_cpu_count()
total_points = len(grid)
# Try Ray
if RAY_AVAILABLE and _try_init_ray(num_workers):
try:
return _calculate_with_ray(
grid, point_elevations, site_dict, settings_dict,
terrain_cache, buildings, streets, water_bodies,
vegetation_areas, site_elevation,
num_workers, log_fn,
)
except Exception as e:
log_fn(f"Ray execution failed: {e} — falling back to sequential")
# Fallback: sequential
log_fn(f"Sequential fallback: {total_points} points")
return _calculate_sequential(
grid, point_elevations, site_dict, settings_dict,
buildings, streets, water_bodies, vegetation_areas,
site_elevation, log_fn,
)
# ── Ray backend ──
def _calculate_with_ray(
grid, point_elevations, site_dict, settings_dict,
terrain_cache, buildings, streets, water_bodies,
vegetation_areas, site_elevation,
num_workers, log_fn,
):
"""Execute using Ray shared-memory object store."""
total_points = len(grid)
log_fn(f"Ray mode: {total_points} points, {num_workers} workers")
# ── Put large data into Ray object store ──
# Numpy arrays (terrain tiles) get zero-copy shared memory.
# Python objects (buildings) get serialized once, stored in plasma.
t_put = time.time()
terrain_ref = ray.put(terrain_cache)
buildings_ref = ray.put(buildings)
osm_ref = ray.put({
'streets': streets,
'water_bodies': water_bodies,
'vegetation_areas': vegetation_areas,
})
cache_key = f"{site_dict['lat']:.4f},{site_dict['lon']:.4f},{len(buildings)}"
config_ref = ray.put({
'site_dict': site_dict,
'settings_dict': settings_dict,
'site_elevation': site_elevation,
'cache_key': cache_key,
})
put_time = time.time() - t_put
log_fn(f"ray.put() done in {put_time:.1f}s")
# ── Prepare and submit chunks ──
items = [
(lat, lon, point_elevations.get((lat, lon), 0.0))
for lat, lon in grid
]
# ~4 chunks per worker for granular progress
chunk_size = max(1, len(items) // (num_workers * 4))
chunks = [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]
log_fn(f"Submitting {len(chunks)} chunks of ~{chunk_size} points")
t_calc = time.time()
pending = [
_ray_process_chunk.remote(chunk, terrain_ref, buildings_ref, osm_ref, config_ref)
for chunk in chunks
]
# ── Collect results with progress via ray.wait() ──
all_results: List[Dict] = []
total_chunks = len(pending)
remaining = list(pending)
completed_chunks = 0
while remaining:
# Wait for at least 1 result, batch up to ~10% for progress logging
batch = max(1, min(len(remaining), total_chunks // 10 or 1))
done, remaining = ray.wait(remaining, num_returns=batch, timeout=600)
for ref in done:
try:
chunk_results = ray.get(ref)
all_results.extend(chunk_results)
except Exception as e:
log_fn(f"Chunk error: {e}")
completed_chunks += len(done)
pct = completed_chunks * 100 // total_chunks
elapsed = time.time() - t_calc
pts = len(all_results)
rate = pts / elapsed if elapsed > 0 else 0
eta = (total_points - pts) / rate if rate > 0 else 0
log_fn(f"Progress: {completed_chunks}/{total_chunks} chunks ({pct}%) — "
f"{pts} pts, {rate:.0f} pts/s, ETA {eta:.0f}s")
calc_time = time.time() - t_calc
log_fn(f"Ray done: {calc_time:.1f}s, {len(all_results)} results "
f"({calc_time / max(1, total_points) * 1000:.1f}ms/point)")
timing = {
"parallel_total": calc_time,
"ray_put": put_time,
"workers": num_workers,
"backend": "ray",
}
return all_results, timing
# ── Sequential fallback ──
def _calculate_sequential(
grid, point_elevations, site_dict, settings_dict,
buildings, streets, water_bodies, vegetation_areas,
site_elevation, log_fn,
):
"""Sequential fallback — no extra dependencies, runs in calling thread."""
from app.services.coverage_service import CoverageService, SiteParams, CoverageSettings
from app.services.spatial_index import SpatialIndex
site = SiteParams(**site_dict)
settings = CoverageSettings(**settings_dict)
svc = CoverageService()
spatial_idx = SpatialIndex()
if buildings:
spatial_idx.build(buildings)
total = len(grid)
log_interval = max(1, total // 20)
timing = {
"los": 0.0, "buildings": 0.0, "antenna": 0.0,
"dominant_path": 0.0, "street_canyon": 0.0,
"reflection": 0.0, "vegetation": 0.0,
}
t0 = time.time()
results = []
for i, (lat, lon) in enumerate(grid):
if i % log_interval == 0:
log_fn(f"Sequential: {i}/{total} ({i * 100 // total}%)")
point_elev = point_elevations.get((lat, lon), 0.0)
point = svc._calculate_point_sync(
site, lat, lon, settings,
buildings, streets, spatial_idx,
water_bodies, vegetation_areas,
site_elevation, point_elev, timing,
)
if point.rsrp >= settings.min_signal:
results.append(point.model_dump())
calc_time = time.time() - t0
log_fn(f"Sequential done: {calc_time:.1f}s, {len(results)} results "
f"({calc_time / max(1, total) * 1000:.1f}ms/point)")
timing["sequential_total"] = calc_time
timing["backend"] = "sequential"
return results, timing