835 lines
28 KiB
Python
835 lines
28 KiB
Python
"""
|
|
Parallel coverage calculation.
|
|
|
|
Primary backend: Ray (shared-memory object store, zero-copy numpy arrays)
|
|
Fallback: ProcessPoolExecutor (4-6 workers to limit memory)
|
|
Last resort: Sequential (single-threaded, no extra dependencies)
|
|
|
|
Ray advantages over ProcessPoolExecutor:
|
|
- ray.put() stores terrain cache ONCE in shared memory
|
|
- Workers access numpy arrays via zero-copy (no per-worker pickle/copy)
|
|
- Eliminates MemoryError on Detailed preset with large terrain + buildings
|
|
|
|
ProcessPoolExecutor fallback:
|
|
- Used when Ray is unavailable (e.g. PyInstaller builds)
|
|
- Capped at 6 workers to prevent MemoryError from data pickling
|
|
- Each worker gets a full copy of terrain/buildings (no shared memory)
|
|
|
|
Usage:
|
|
from app.services.parallel_coverage_service import (
|
|
calculate_coverage_parallel, get_cpu_count, RAY_AVAILABLE,
|
|
)
|
|
"""
|
|
|
|
import gc
|
|
import os
|
|
import sys
|
|
import subprocess
|
|
import time
|
|
import threading
|
|
import multiprocessing as mp
|
|
from typing import List, Dict, Tuple, Any, Optional, Callable
|
|
import numpy as np
|
|
|
|
|
|
# ── Cancellation token ──
|
|
|
|
class CancellationToken:
|
|
"""Thread-safe cancellation token for cooperative cancellation."""
|
|
|
|
def __init__(self):
|
|
self._event = threading.Event()
|
|
|
|
def cancel(self):
|
|
self._event.set()
|
|
|
|
@property
|
|
def is_cancelled(self) -> bool:
|
|
return self._event.is_set()
|
|
|
|
|
|
# ── Active pool tracking (for graceful shutdown) ──
|
|
|
|
_active_pool = None # Global ref to current ProcessPoolExecutor
|
|
_active_pool_lock = threading.Lock()
|
|
|
|
|
|
def _set_active_pool(pool):
|
|
global _active_pool
|
|
with _active_pool_lock:
|
|
_active_pool = pool
|
|
|
|
|
|
def _clear_active_pool():
|
|
global _active_pool
|
|
with _active_pool_lock:
|
|
_active_pool = None
|
|
|
|
|
|
# ── Worker process cleanup ──
|
|
|
|
def _clog(msg: str):
|
|
"""Log with [PARALLEL] prefix."""
|
|
print(f"[PARALLEL] {msg}", flush=True)
|
|
|
|
|
|
def _kill_worker_processes() -> int:
|
|
"""Kill ALL rfcp-server processes except the current (main) process.
|
|
|
|
First shuts down the active ProcessPoolExecutor (if any), then uses
|
|
process NAME matching to kill remaining workers.
|
|
Returns the number of processes killed.
|
|
"""
|
|
global _active_pool
|
|
|
|
# Step 0: Shut down active ProcessPoolExecutor gracefully
|
|
with _active_pool_lock:
|
|
pool = _active_pool
|
|
_active_pool = None
|
|
if pool is not None:
|
|
try:
|
|
pool.shutdown(wait=False, cancel_futures=True)
|
|
_clog("Active ProcessPoolExecutor shutdown requested")
|
|
except Exception as e:
|
|
_clog(f"Pool shutdown error: {e}")
|
|
|
|
my_pid = os.getpid()
|
|
killed_count = 0
|
|
|
|
if sys.platform == 'win32':
|
|
try:
|
|
# List all rfcp-server.exe processes in CSV format
|
|
result = subprocess.run(
|
|
['tasklist', '/FI', 'IMAGENAME eq rfcp-server.exe', '/FO', 'CSV', '/NH'],
|
|
capture_output=True, text=True, timeout=5,
|
|
)
|
|
for line in result.stdout.strip().split('\n'):
|
|
if 'rfcp-server.exe' not in line:
|
|
continue
|
|
parts = line.split(',')
|
|
if len(parts) >= 2:
|
|
pid_str = parts[1].strip().strip('"')
|
|
try:
|
|
pid = int(pid_str)
|
|
if pid != my_pid:
|
|
subprocess.run(
|
|
['taskkill', '/F', '/PID', str(pid)],
|
|
capture_output=True, timeout=5,
|
|
)
|
|
killed_count += 1
|
|
_clog(f"Killed worker PID {pid}")
|
|
except (ValueError, subprocess.TimeoutExpired):
|
|
pass
|
|
except Exception as e:
|
|
_clog(f"Kill workers error: {e}")
|
|
# Fallback: kill ALL rfcp-server.exe
|
|
try:
|
|
subprocess.run(
|
|
['taskkill', '/F', '/IM', 'rfcp-server.exe', '/T'],
|
|
capture_output=True, timeout=5,
|
|
)
|
|
except Exception:
|
|
pass
|
|
else:
|
|
# Unix: pgrep + kill
|
|
try:
|
|
result = subprocess.run(
|
|
['pgrep', '-f', 'rfcp-server'],
|
|
capture_output=True, text=True, timeout=5,
|
|
)
|
|
for pid_str in result.stdout.strip().split('\n'):
|
|
if not pid_str:
|
|
continue
|
|
try:
|
|
pid = int(pid_str)
|
|
if pid != my_pid:
|
|
os.kill(pid, 9) # SIGKILL
|
|
killed_count += 1
|
|
_clog(f"Killed worker PID {pid}")
|
|
except (ValueError, ProcessLookupError, PermissionError):
|
|
pass
|
|
except Exception as e:
|
|
_clog(f"Kill workers error: {e}")
|
|
|
|
return killed_count
|
|
|
|
|
|
# ── Try to import Ray ──
|
|
|
|
RAY_AVAILABLE = False
|
|
try:
|
|
import ray
|
|
RAY_AVAILABLE = True
|
|
except ImportError:
|
|
ray = None # type: ignore
|
|
|
|
|
|
# ── Worker-level spatial index cache (persists across tasks in same worker) ──
|
|
|
|
_worker_spatial_idx = None
|
|
_worker_cache_key: Optional[str] = None
|
|
|
|
|
|
def _ray_process_chunk_impl(chunk, terrain_cache, buildings, osm_data, config):
|
|
"""Implementation: process a chunk of (lat, lon, elevation) tuples.
|
|
|
|
Called inside a Ray remote function. terrain_cache numpy arrays come
|
|
from the Ray object store via zero-copy.
|
|
"""
|
|
global _worker_spatial_idx, _worker_cache_key
|
|
|
|
# Inject terrain cache into the module-level singleton.
|
|
# For numpy arrays, Ray gives us a read-only view into shared memory.
|
|
from app.services.terrain_service import terrain_service
|
|
terrain_service._tile_cache = terrain_cache
|
|
|
|
# Build or reuse spatial index (expensive — ~1s for 350K buildings).
|
|
cache_key = config.get('cache_key', '')
|
|
if _worker_cache_key != cache_key:
|
|
if buildings:
|
|
from app.services.spatial_index import SpatialIndex
|
|
_worker_spatial_idx = SpatialIndex()
|
|
_worker_spatial_idx.build(buildings)
|
|
else:
|
|
_worker_spatial_idx = None
|
|
_worker_cache_key = cache_key
|
|
|
|
# Process points
|
|
from app.services.coverage_service import CoverageService, SiteParams, CoverageSettings
|
|
|
|
site = SiteParams(**config['site_dict'])
|
|
settings = CoverageSettings(**config['settings_dict'])
|
|
svc = CoverageService()
|
|
|
|
timing = {
|
|
"los": 0.0, "buildings": 0.0, "antenna": 0.0,
|
|
"dominant_path": 0.0, "street_canyon": 0.0,
|
|
"reflection": 0.0, "vegetation": 0.0,
|
|
}
|
|
|
|
precomputed = config.get('precomputed')
|
|
|
|
results = []
|
|
for lat, lon, point_elev in chunk:
|
|
pre = precomputed.get((lat, lon)) if precomputed else None
|
|
point = svc._calculate_point_sync(
|
|
site, lat, lon, settings,
|
|
buildings, osm_data.get('streets', []),
|
|
_worker_spatial_idx, osm_data.get('water_bodies', []),
|
|
osm_data.get('vegetation_areas', []),
|
|
config['site_elevation'], point_elev, timing,
|
|
precomputed_distance=pre.get('distance') if pre else None,
|
|
precomputed_path_loss=pre.get('path_loss') if pre else None,
|
|
)
|
|
if point.rsrp >= settings.min_signal:
|
|
results.append(point.model_dump())
|
|
|
|
return results
|
|
|
|
|
|
# ── Register the Ray remote function (only if Ray is available) ──
|
|
|
|
_ray_process_chunk = None
|
|
if RAY_AVAILABLE:
|
|
_ray_process_chunk = ray.remote(_ray_process_chunk_impl)
|
|
|
|
|
|
# ── Public API ──
|
|
|
|
|
|
def get_cpu_count() -> int:
|
|
"""Get number of usable CPU cores, capped at 14."""
|
|
try:
|
|
return min(mp.cpu_count() or 4, 14)
|
|
except Exception:
|
|
return 4
|
|
|
|
|
|
def get_parallel_backend() -> str:
|
|
"""Return which parallel backend is available."""
|
|
if RAY_AVAILABLE:
|
|
return "ray"
|
|
return "process_pool"
|
|
|
|
|
|
def _try_init_ray(num_cpus: int) -> bool:
|
|
"""Initialize Ray lazily. Returns True if Ray is ready."""
|
|
if not RAY_AVAILABLE:
|
|
return False
|
|
|
|
if ray.is_initialized():
|
|
return True
|
|
|
|
try:
|
|
data_path = os.environ.get('RFCP_DATA_PATH', './data')
|
|
ray_tmp = os.path.join(data_path, 'ray_tmp')
|
|
os.makedirs(ray_tmp, exist_ok=True)
|
|
|
|
ray.init(
|
|
num_cpus=num_cpus,
|
|
include_dashboard=False,
|
|
log_to_driver=True,
|
|
_temp_dir=ray_tmp,
|
|
)
|
|
print(f"[PARALLEL] Ray initialized: {num_cpus} CPUs, "
|
|
f"object store ~{ray.cluster_resources().get('object_store_memory', 0) / 1e9:.1f}GB",
|
|
flush=True)
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"[PARALLEL] Ray init failed: {e}", flush=True)
|
|
return False
|
|
|
|
|
|
def calculate_coverage_parallel(
|
|
grid: List[Tuple[float, float]],
|
|
point_elevations: Dict[Tuple[float, float], float],
|
|
site_dict: Dict,
|
|
settings_dict: Dict,
|
|
terrain_cache: Dict[str, np.ndarray],
|
|
buildings: List,
|
|
streets: List,
|
|
water_bodies: List,
|
|
vegetation_areas: List,
|
|
site_elevation: float,
|
|
num_workers: Optional[int] = None,
|
|
log_fn: Optional[Callable[[str], None]] = None,
|
|
cancel_token: Optional[CancellationToken] = None,
|
|
precomputed: Optional[Dict] = None,
|
|
progress_fn: Optional[Callable[[str, float], None]] = None,
|
|
) -> Tuple[List[Dict], Dict[str, float]]:
|
|
"""Calculate coverage points in parallel.
|
|
|
|
Uses Ray if available (shared memory, zero-copy numpy), otherwise
|
|
falls back to ProcessPoolExecutor or sequential single-threaded calculation.
|
|
|
|
cancel_token: cooperative cancellation — checked between chunks.
|
|
precomputed: dict mapping (lat, lon) -> {distance, path_loss} from GPU pre-computation.
|
|
"""
|
|
if log_fn is None:
|
|
log_fn = lambda msg: print(f"[PARALLEL] {msg}", flush=True)
|
|
|
|
if num_workers is None:
|
|
num_workers = get_cpu_count()
|
|
|
|
total_points = len(grid)
|
|
|
|
# Try Ray
|
|
if RAY_AVAILABLE and _try_init_ray(num_workers):
|
|
try:
|
|
return _calculate_with_ray(
|
|
grid, point_elevations, site_dict, settings_dict,
|
|
terrain_cache, buildings, streets, water_bodies,
|
|
vegetation_areas, site_elevation,
|
|
num_workers, log_fn, cancel_token, precomputed,
|
|
progress_fn,
|
|
)
|
|
except Exception as e:
|
|
log_fn(f"Ray execution failed: {e} — falling back to sequential")
|
|
|
|
# Fallback: ProcessPoolExecutor with reduced workers to avoid MemoryError
|
|
pool_workers = min(num_workers, 6)
|
|
if pool_workers > 1 and total_points > 100:
|
|
try:
|
|
return _calculate_with_process_pool(
|
|
grid, point_elevations, site_dict, settings_dict,
|
|
terrain_cache, buildings, streets, water_bodies,
|
|
vegetation_areas, site_elevation,
|
|
pool_workers, log_fn, cancel_token, precomputed,
|
|
progress_fn,
|
|
)
|
|
except Exception as e:
|
|
log_fn(f"ProcessPool failed: {e} — falling back to sequential")
|
|
|
|
# Last resort: sequential
|
|
log_fn(f"Sequential fallback: {total_points} points")
|
|
return _calculate_sequential(
|
|
grid, point_elevations, site_dict, settings_dict,
|
|
buildings, streets, water_bodies, vegetation_areas,
|
|
site_elevation, log_fn, cancel_token, precomputed,
|
|
progress_fn,
|
|
)
|
|
|
|
|
|
# ── Ray backend ──
|
|
|
|
|
|
def _calculate_with_ray(
|
|
grid, point_elevations, site_dict, settings_dict,
|
|
terrain_cache, buildings, streets, water_bodies,
|
|
vegetation_areas, site_elevation,
|
|
num_workers, log_fn, cancel_token=None, precomputed=None,
|
|
progress_fn=None,
|
|
):
|
|
"""Execute using Ray shared-memory object store."""
|
|
total_points = len(grid)
|
|
log_fn(f"Ray mode: {total_points} points, {num_workers} workers")
|
|
|
|
# ── Put large data into Ray object store ──
|
|
t_put = time.time()
|
|
|
|
terrain_ref = ray.put(terrain_cache)
|
|
buildings_ref = ray.put(buildings)
|
|
osm_ref = ray.put({
|
|
'streets': streets,
|
|
'water_bodies': water_bodies,
|
|
'vegetation_areas': vegetation_areas,
|
|
})
|
|
|
|
cache_key = f"{site_dict['lat']:.4f},{site_dict['lon']:.4f},{len(buildings)}"
|
|
config = {
|
|
'site_dict': site_dict,
|
|
'settings_dict': settings_dict,
|
|
'site_elevation': site_elevation,
|
|
'cache_key': cache_key,
|
|
}
|
|
if precomputed:
|
|
config['precomputed'] = precomputed
|
|
config_ref = ray.put(config)
|
|
|
|
put_time = time.time() - t_put
|
|
log_fn(f"ray.put() done in {put_time:.1f}s")
|
|
|
|
# ── Prepare and submit chunks ──
|
|
items = [
|
|
(lat, lon, point_elevations.get((lat, lon), 0.0))
|
|
for lat, lon in grid
|
|
]
|
|
|
|
# ~4 chunks per worker for granular progress
|
|
chunk_size = max(1, len(items) // (num_workers * 4))
|
|
chunks = [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]
|
|
log_fn(f"Submitting {len(chunks)} chunks of ~{chunk_size} points")
|
|
|
|
t_calc = time.time()
|
|
pending = [
|
|
_ray_process_chunk.remote(chunk, terrain_ref, buildings_ref, osm_ref, config_ref)
|
|
for chunk in chunks
|
|
]
|
|
|
|
# ── Collect results with progress via ray.wait() ──
|
|
all_results: List[Dict] = []
|
|
total_chunks = len(pending)
|
|
remaining = list(pending)
|
|
completed_chunks = 0
|
|
|
|
while remaining:
|
|
# Check cancellation before waiting
|
|
if cancel_token and cancel_token.is_cancelled:
|
|
log_fn(f"Cancelled — aborting {len(remaining)} remaining Ray chunks")
|
|
for ref in remaining:
|
|
try:
|
|
ray.cancel(ref, force=True)
|
|
except Exception:
|
|
pass
|
|
break
|
|
|
|
# Wait for at least 1 result, batch up to ~10% for progress logging
|
|
batch = max(1, min(len(remaining), total_chunks // 10 or 1))
|
|
done, remaining = ray.wait(remaining, num_returns=batch, timeout=30)
|
|
|
|
for ref in done:
|
|
try:
|
|
chunk_results = ray.get(ref)
|
|
all_results.extend(chunk_results)
|
|
except Exception as e:
|
|
log_fn(f"Chunk error: {e}")
|
|
|
|
completed_chunks += len(done)
|
|
pct = completed_chunks * 100 // total_chunks
|
|
elapsed = time.time() - t_calc
|
|
pts = len(all_results)
|
|
rate = pts / elapsed if elapsed > 0 else 0
|
|
eta = (total_points - pts) / rate if rate > 0 else 0
|
|
log_fn(f"Progress: {completed_chunks}/{total_chunks} chunks ({pct}%) — "
|
|
f"{pts} pts, {rate:.0f} pts/s, ETA {eta:.0f}s")
|
|
if progress_fn:
|
|
# Map chunk progress to 40%-95% range
|
|
progress_fn("Calculating coverage", 0.40 + 0.55 * (completed_chunks / total_chunks))
|
|
|
|
calc_time = time.time() - t_calc
|
|
log_fn(f"Ray done: {calc_time:.1f}s, {len(all_results)} results "
|
|
f"({calc_time / max(1, total_points) * 1000:.1f}ms/point)")
|
|
|
|
# Force garbage collection after Ray computation
|
|
gc.collect()
|
|
|
|
timing = {
|
|
"parallel_total": calc_time,
|
|
"ray_put": put_time,
|
|
"workers": num_workers,
|
|
"backend": "ray",
|
|
}
|
|
return all_results, timing
|
|
|
|
|
|
# ── ProcessPoolExecutor fallback ──
|
|
|
|
|
|
def _pool_worker_process_chunk(args):
|
|
"""Worker function for ProcessPoolExecutor. Processes a chunk of points."""
|
|
chunk, terrain_cache, buildings, osm_data, config = args
|
|
|
|
from app.services.terrain_service import terrain_service
|
|
terrain_service._tile_cache = terrain_cache
|
|
|
|
spatial_idx = None
|
|
if buildings:
|
|
from app.services.spatial_index import SpatialIndex
|
|
spatial_idx = SpatialIndex()
|
|
spatial_idx.build(buildings)
|
|
|
|
from app.services.coverage_service import CoverageService, SiteParams, CoverageSettings
|
|
|
|
site = SiteParams(**config['site_dict'])
|
|
settings = CoverageSettings(**config['settings_dict'])
|
|
svc = CoverageService()
|
|
|
|
timing = {
|
|
"los": 0.0, "buildings": 0.0, "antenna": 0.0,
|
|
"dominant_path": 0.0, "street_canyon": 0.0,
|
|
"reflection": 0.0, "vegetation": 0.0,
|
|
}
|
|
|
|
precomputed = config.get('precomputed')
|
|
|
|
results = []
|
|
for lat, lon, point_elev in chunk:
|
|
pre = precomputed.get((lat, lon)) if precomputed else None
|
|
point = svc._calculate_point_sync(
|
|
site, lat, lon, settings,
|
|
buildings, osm_data.get('streets', []),
|
|
spatial_idx, osm_data.get('water_bodies', []),
|
|
osm_data.get('vegetation_areas', []),
|
|
config['site_elevation'], point_elev, timing,
|
|
precomputed_distance=pre.get('distance') if pre else None,
|
|
precomputed_path_loss=pre.get('path_loss') if pre else None,
|
|
)
|
|
if point.rsrp >= settings.min_signal:
|
|
results.append(point.model_dump())
|
|
|
|
return results
|
|
|
|
|
|
def _store_terrain_in_shm(terrain_cache: Dict[str, np.ndarray], log_fn) -> Tuple[list, Dict[str, dict]]:
|
|
"""Store terrain tile arrays in shared memory. Returns (shm_blocks, tile_refs).
|
|
|
|
tile_refs is a dict mapping tile_name -> {shm_name, shape, dtype_str}
|
|
that workers use to reconstruct numpy arrays from shared memory.
|
|
"""
|
|
import multiprocessing.shared_memory as shm_mod
|
|
|
|
blocks = []
|
|
refs = {}
|
|
|
|
for tile_name, arr in terrain_cache.items():
|
|
try:
|
|
block = shm_mod.SharedMemory(create=True, size=arr.nbytes)
|
|
blocks.append(block)
|
|
# Copy tile data to shared memory
|
|
shm_arr = np.ndarray(arr.shape, dtype=arr.dtype, buffer=block.buf)
|
|
shm_arr[:] = arr[:]
|
|
refs[tile_name] = {
|
|
'shm_name': block.name,
|
|
'shape': arr.shape,
|
|
'dtype': str(arr.dtype),
|
|
}
|
|
except Exception as e:
|
|
log_fn(f"Failed to store tile {tile_name} in shm: {e}")
|
|
# Fallback: worker will have to use pickled copy
|
|
pass
|
|
|
|
return blocks, refs
|
|
|
|
|
|
def _pool_worker_shm_chunk(args):
|
|
"""Worker function that reads terrain from shared memory instead of pickle."""
|
|
import multiprocessing.shared_memory as shm_mod
|
|
|
|
chunk, terrain_shm_refs, buildings, osm_data, config = args
|
|
|
|
# Reconstruct terrain cache from shared memory (zero-copy numpy views)
|
|
terrain_cache = {}
|
|
for tile_name, ref in terrain_shm_refs.items():
|
|
try:
|
|
block = shm_mod.SharedMemory(name=ref['shm_name'])
|
|
terrain_cache[tile_name] = np.ndarray(
|
|
ref['shape'], dtype=ref['dtype'], buffer=block.buf,
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
# Inject terrain cache
|
|
from app.services.terrain_service import terrain_service
|
|
terrain_service._tile_cache = terrain_cache
|
|
|
|
# Build spatial index
|
|
global _worker_spatial_idx, _worker_cache_key
|
|
cache_key = config.get('cache_key', '')
|
|
if _worker_cache_key != cache_key:
|
|
if buildings:
|
|
from app.services.spatial_index import SpatialIndex
|
|
_worker_spatial_idx = SpatialIndex()
|
|
_worker_spatial_idx.build(buildings)
|
|
else:
|
|
_worker_spatial_idx = None
|
|
_worker_cache_key = cache_key
|
|
|
|
# Process points
|
|
from app.services.coverage_service import CoverageService, SiteParams, CoverageSettings
|
|
site = SiteParams(**config['site_dict'])
|
|
settings = CoverageSettings(**config['settings_dict'])
|
|
svc = CoverageService()
|
|
|
|
timing = {
|
|
"los": 0.0, "buildings": 0.0, "antenna": 0.0,
|
|
"dominant_path": 0.0, "street_canyon": 0.0,
|
|
"reflection": 0.0, "vegetation": 0.0,
|
|
}
|
|
|
|
precomputed = config.get('precomputed')
|
|
|
|
results = []
|
|
for lat, lon, point_elev in chunk:
|
|
pre = precomputed.get((lat, lon)) if precomputed else None
|
|
point = svc._calculate_point_sync(
|
|
site, lat, lon, settings,
|
|
buildings, osm_data.get('streets', []),
|
|
_worker_spatial_idx, osm_data.get('water_bodies', []),
|
|
osm_data.get('vegetation_areas', []),
|
|
config['site_elevation'], point_elev, timing,
|
|
precomputed_distance=pre.get('distance') if pre else None,
|
|
precomputed_path_loss=pre.get('path_loss') if pre else None,
|
|
)
|
|
if point.rsrp >= settings.min_signal:
|
|
results.append(point.model_dump())
|
|
|
|
return results
|
|
|
|
|
|
def _calculate_with_process_pool(
|
|
grid, point_elevations, site_dict, settings_dict,
|
|
terrain_cache, buildings, streets, water_bodies,
|
|
vegetation_areas, site_elevation,
|
|
num_workers, log_fn, cancel_token=None, precomputed=None,
|
|
progress_fn=None,
|
|
):
|
|
"""Execute using ProcessPoolExecutor.
|
|
|
|
Uses shared memory for terrain tiles (zero-copy numpy views) to reduce
|
|
memory usage compared to pickling full terrain arrays per worker.
|
|
"""
|
|
from concurrent.futures import ProcessPoolExecutor, as_completed
|
|
|
|
total_points = len(grid)
|
|
|
|
# Estimate pickle size for building data and cap workers accordingly
|
|
building_count = len(buildings)
|
|
if building_count > 10000:
|
|
num_workers = min(num_workers, 3)
|
|
log_fn(f"Large building set ({building_count}) — reducing workers to {num_workers}")
|
|
elif building_count > 5000:
|
|
num_workers = min(num_workers, 4)
|
|
|
|
log_fn(f"ProcessPool mode: {total_points} points, {num_workers} workers, "
|
|
f"{building_count} buildings")
|
|
|
|
# Store terrain tiles in shared memory
|
|
shm_blocks = []
|
|
terrain_shm_refs = {}
|
|
try:
|
|
shm_blocks, terrain_shm_refs = _store_terrain_in_shm(terrain_cache, log_fn)
|
|
if terrain_shm_refs:
|
|
tile_mb = sum(
|
|
np.prod(r['shape']) * np.dtype(r['dtype']).itemsize
|
|
for r in terrain_shm_refs.values()
|
|
) / (1024 * 1024)
|
|
log_fn(f"Stored {len(terrain_shm_refs)} terrain tiles in shared memory ({tile_mb:.0f} MB)")
|
|
use_shm = True
|
|
else:
|
|
use_shm = False
|
|
except Exception as e:
|
|
log_fn(f"Shared memory setup failed ({e}), using pickle fallback")
|
|
use_shm = False
|
|
|
|
items = [
|
|
(lat, lon, point_elevations.get((lat, lon), 0.0))
|
|
for lat, lon in grid
|
|
]
|
|
|
|
chunk_size = max(1, len(items) // (num_workers * 2))
|
|
chunks = [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]
|
|
log_fn(f"Submitting {len(chunks)} chunks of ~{chunk_size} points")
|
|
|
|
cache_key = f"{site_dict['lat']:.4f},{site_dict['lon']:.4f},{len(buildings)}"
|
|
config = {
|
|
'site_dict': site_dict,
|
|
'settings_dict': settings_dict,
|
|
'site_elevation': site_elevation,
|
|
'cache_key': cache_key,
|
|
}
|
|
if precomputed:
|
|
config['precomputed'] = precomputed
|
|
osm_data = {
|
|
'streets': streets,
|
|
'water_bodies': water_bodies,
|
|
'vegetation_areas': vegetation_areas,
|
|
}
|
|
|
|
t_calc = time.time()
|
|
all_results: List[Dict] = []
|
|
pool = None
|
|
|
|
try:
|
|
ctx = mp.get_context('spawn')
|
|
pool = ProcessPoolExecutor(max_workers=num_workers, mp_context=ctx)
|
|
_set_active_pool(pool)
|
|
|
|
if use_shm:
|
|
# Shared memory path: pass shm refs instead of terrain data
|
|
worker_fn = _pool_worker_shm_chunk
|
|
futures = {
|
|
pool.submit(
|
|
worker_fn,
|
|
(chunk, terrain_shm_refs, buildings, osm_data, config),
|
|
): i
|
|
for i, chunk in enumerate(chunks)
|
|
}
|
|
else:
|
|
# Pickle fallback path
|
|
futures = {
|
|
pool.submit(
|
|
_pool_worker_process_chunk,
|
|
(chunk, terrain_cache, buildings, osm_data, config),
|
|
): i
|
|
for i, chunk in enumerate(chunks)
|
|
}
|
|
|
|
completed_chunks = 0
|
|
for future in as_completed(futures):
|
|
if cancel_token and cancel_token.is_cancelled:
|
|
log_fn(f"Cancelled — cancelling {len(futures) - completed_chunks - 1} pending futures")
|
|
for f in futures:
|
|
f.cancel()
|
|
break
|
|
|
|
try:
|
|
chunk_results = future.result()
|
|
all_results.extend(chunk_results)
|
|
except Exception as e:
|
|
log_fn(f"Chunk error: {e}")
|
|
|
|
completed_chunks += 1
|
|
pct = completed_chunks * 100 // len(chunks)
|
|
elapsed = time.time() - t_calc
|
|
pts = len(all_results)
|
|
rate = pts / elapsed if elapsed > 0 else 0
|
|
eta = (total_points - pts) / rate if rate > 0 else 0
|
|
log_fn(f"Progress: {completed_chunks}/{len(chunks)} chunks ({pct}%) — "
|
|
f"{pts} pts, {rate:.0f} pts/s, ETA {eta:.0f}s")
|
|
if progress_fn:
|
|
progress_fn("Calculating coverage", 0.40 + 0.55 * (completed_chunks / len(chunks)))
|
|
|
|
except Exception as e:
|
|
log_fn(f"ProcessPool error: {e}")
|
|
|
|
finally:
|
|
_clear_active_pool()
|
|
if pool:
|
|
pool.shutdown(wait=False, cancel_futures=True)
|
|
time.sleep(0.5)
|
|
killed = _kill_worker_processes()
|
|
if killed > 0:
|
|
log_fn(f"Force killed {killed} orphaned workers")
|
|
# Cleanup shared memory blocks
|
|
for block in shm_blocks:
|
|
try:
|
|
block.close()
|
|
block.unlink()
|
|
except Exception:
|
|
pass
|
|
# Force garbage collection to release memory from workers
|
|
gc.collect()
|
|
|
|
calc_time = time.time() - t_calc
|
|
log_fn(f"ProcessPool done: {calc_time:.1f}s, {len(all_results)} results "
|
|
f"({calc_time / max(1, total_points) * 1000:.1f}ms/point)")
|
|
|
|
timing = {
|
|
"parallel_total": calc_time,
|
|
"workers": num_workers,
|
|
"backend": "process_pool" + ("/shm" if use_shm else "/pickle"),
|
|
}
|
|
return all_results, timing
|
|
|
|
|
|
# ── Sequential fallback ──
|
|
|
|
|
|
def _calculate_sequential(
|
|
grid, point_elevations, site_dict, settings_dict,
|
|
buildings, streets, water_bodies, vegetation_areas,
|
|
site_elevation, log_fn, cancel_token=None, precomputed=None,
|
|
progress_fn=None,
|
|
):
|
|
"""Sequential fallback — no extra dependencies, runs in calling thread."""
|
|
from app.services.coverage_service import CoverageService, SiteParams, CoverageSettings
|
|
from app.services.spatial_index import SpatialIndex
|
|
|
|
site = SiteParams(**site_dict)
|
|
settings = CoverageSettings(**settings_dict)
|
|
svc = CoverageService()
|
|
|
|
spatial_idx = None
|
|
if buildings:
|
|
spatial_idx = SpatialIndex()
|
|
spatial_idx.build(buildings)
|
|
|
|
total = len(grid)
|
|
log_interval = max(1, total // 20)
|
|
timing = {
|
|
"los": 0.0, "buildings": 0.0, "antenna": 0.0,
|
|
"dominant_path": 0.0, "street_canyon": 0.0,
|
|
"reflection": 0.0, "vegetation": 0.0,
|
|
}
|
|
|
|
t0 = time.time()
|
|
results = []
|
|
for i, (lat, lon) in enumerate(grid):
|
|
# Check cancellation
|
|
if cancel_token and cancel_token.is_cancelled:
|
|
log_fn(f"Sequential cancelled at {i}/{total}")
|
|
break
|
|
|
|
if i % log_interval == 0:
|
|
log_fn(f"Sequential: {i}/{total} ({i * 100 // total}%)")
|
|
if progress_fn:
|
|
progress_fn("Calculating coverage", 0.40 + 0.55 * (i / total))
|
|
|
|
point_elev = point_elevations.get((lat, lon), 0.0)
|
|
|
|
# Use precomputed values if available
|
|
pre = precomputed.get((lat, lon)) if precomputed else None
|
|
|
|
point = svc._calculate_point_sync(
|
|
site, lat, lon, settings,
|
|
buildings, streets, spatial_idx,
|
|
water_bodies, vegetation_areas,
|
|
site_elevation, point_elev, timing,
|
|
precomputed_distance=pre.get('distance') if pre else None,
|
|
precomputed_path_loss=pre.get('path_loss') if pre else None,
|
|
)
|
|
if point.rsrp >= settings.min_signal:
|
|
results.append(point.model_dump())
|
|
|
|
calc_time = time.time() - t0
|
|
log_fn(f"Sequential done: {calc_time:.1f}s, {len(results)} results "
|
|
f"({calc_time / max(1, total) * 1000:.1f}ms/point)")
|
|
|
|
# Force garbage collection after sequential computation
|
|
gc.collect()
|
|
|
|
timing["sequential_total"] = calc_time
|
|
timing["backend"] = "sequential"
|
|
return results, timing
|