Files
rfcp/backend/app/services/parallel_coverage_service.py
2026-02-04 00:50:52 +02:00

1166 lines
41 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Parallel coverage calculation.
Primary backend: Ray (shared-memory object store, zero-copy numpy arrays)
Fallback: ProcessPoolExecutor (4-6 workers to limit memory)
Last resort: Sequential (single-threaded, no extra dependencies)
Ray advantages over ProcessPoolExecutor:
- ray.put() stores terrain cache ONCE in shared memory
- Workers access numpy arrays via zero-copy (no per-worker pickle/copy)
- Eliminates MemoryError on Detailed preset with large terrain + buildings
ProcessPoolExecutor fallback:
- Used when Ray is unavailable (e.g. PyInstaller builds)
- Capped at 6 workers to prevent MemoryError from data pickling
- Each worker gets a full copy of terrain/buildings (no shared memory)
Usage:
from app.services.parallel_coverage_service import (
calculate_coverage_parallel, get_cpu_count, RAY_AVAILABLE,
)
"""
import gc
import os
import sys
import subprocess
import time
import threading
import multiprocessing as mp
from typing import List, Dict, Tuple, Any, Optional, Callable
import numpy as np
# ── Cancellation token ──
class CancellationToken:
"""Thread-safe cancellation token for cooperative cancellation."""
def __init__(self):
self._event = threading.Event()
def cancel(self):
self._event.set()
@property
def is_cancelled(self) -> bool:
return self._event.is_set()
# ── Active pool tracking (for graceful shutdown) ──
_active_pool = None # Global ref to current ProcessPoolExecutor
_active_pool_lock = threading.Lock()
def _set_active_pool(pool):
global _active_pool
with _active_pool_lock:
_active_pool = pool
def _clear_active_pool():
global _active_pool
with _active_pool_lock:
_active_pool = None
# ── Worker process cleanup ──
def _clog(msg: str):
"""Log with [PARALLEL] prefix."""
print(f"[PARALLEL] {msg}", flush=True)
def _kill_worker_processes() -> int:
"""Kill ALL rfcp-server processes except the current (main) process.
First shuts down the active ProcessPoolExecutor (if any), then uses
process NAME matching to kill remaining workers.
Returns the number of processes killed.
"""
global _active_pool
# Step 0: Shut down active ProcessPoolExecutor gracefully
with _active_pool_lock:
pool = _active_pool
_active_pool = None
if pool is not None:
try:
pool.shutdown(wait=False, cancel_futures=True)
_clog("Active ProcessPoolExecutor shutdown requested")
except Exception as e:
_clog(f"Pool shutdown error: {e}")
my_pid = os.getpid()
killed_count = 0
if sys.platform == 'win32':
try:
# List all rfcp-server.exe processes in CSV format
result = subprocess.run(
['tasklist', '/FI', 'IMAGENAME eq rfcp-server.exe', '/FO', 'CSV', '/NH'],
capture_output=True, text=True, timeout=5,
)
for line in result.stdout.strip().split('\n'):
if 'rfcp-server.exe' not in line:
continue
parts = line.split(',')
if len(parts) >= 2:
pid_str = parts[1].strip().strip('"')
try:
pid = int(pid_str)
if pid != my_pid:
subprocess.run(
['taskkill', '/F', '/PID', str(pid)],
capture_output=True, timeout=5,
)
killed_count += 1
_clog(f"Killed worker PID {pid}")
except (ValueError, subprocess.TimeoutExpired):
pass
except Exception as e:
_clog(f"Kill workers error: {e}")
# Fallback: kill ALL rfcp-server.exe
try:
subprocess.run(
['taskkill', '/F', '/IM', 'rfcp-server.exe', '/T'],
capture_output=True, timeout=5,
)
except Exception:
pass
else:
# Unix: pgrep + kill
try:
result = subprocess.run(
['pgrep', '-f', 'rfcp-server'],
capture_output=True, text=True, timeout=5,
)
for pid_str in result.stdout.strip().split('\n'):
if not pid_str:
continue
try:
pid = int(pid_str)
if pid != my_pid:
os.kill(pid, 9) # SIGKILL
killed_count += 1
_clog(f"Killed worker PID {pid}")
except (ValueError, ProcessLookupError, PermissionError):
pass
except Exception as e:
_clog(f"Kill workers error: {e}")
return killed_count
# ── Try to import Ray ──
RAY_AVAILABLE = False
try:
import ray
RAY_AVAILABLE = True
except ImportError:
ray = None # type: ignore
# ── Worker-level caches (persist across tasks in same worker process) ──
_worker_spatial_idx = None
_worker_cache_key: Optional[str] = None
# Shared-memory buildings/OSM — unpickled once per worker, cached by key
_worker_shared_buildings = None
_worker_shared_osm_data = None
_worker_shared_data_key: Optional[str] = None
def _ray_process_chunk_impl(chunk, terrain_cache, buildings, osm_data, config):
"""Implementation: process a chunk of (lat, lon, elevation) tuples.
Called inside a Ray remote function. terrain_cache numpy arrays come
from the Ray object store via zero-copy.
"""
global _worker_spatial_idx, _worker_cache_key
# Inject terrain cache into the module-level singleton.
# For numpy arrays, Ray gives us a read-only view into shared memory.
from app.services.terrain_service import terrain_service
terrain_service._tile_cache = terrain_cache
# Build or reuse spatial index (expensive — ~1s for 350K buildings).
cache_key = config.get('cache_key', '')
if _worker_cache_key != cache_key:
if buildings:
from app.services.spatial_index import SpatialIndex
_worker_spatial_idx = SpatialIndex()
_worker_spatial_idx.build(buildings)
else:
_worker_spatial_idx = None
_worker_cache_key = cache_key
# Process points
from app.services.coverage_service import CoverageService, SiteParams, CoverageSettings
site = SiteParams(**config['site_dict'])
settings = CoverageSettings(**config['settings_dict'])
svc = CoverageService()
timing = {
"los": 0.0, "buildings": 0.0, "antenna": 0.0,
"dominant_path": 0.0, "street_canyon": 0.0,
"reflection": 0.0, "vegetation": 0.0,
"lod_none": 0, "lod_simplified": 0, "lod_full": 0,
}
precomputed = config.get('precomputed')
results = []
for lat, lon, point_elev in chunk:
pre = precomputed.get((lat, lon)) if precomputed else None
point = svc._calculate_point_sync(
site, lat, lon, settings,
buildings, osm_data.get('streets', []),
_worker_spatial_idx, osm_data.get('water_bodies', []),
osm_data.get('vegetation_areas', []),
config['site_elevation'], point_elev, timing,
precomputed_distance=pre.get('distance') if pre else None,
precomputed_path_loss=pre.get('path_loss') if pre else None,
precomputed_has_los=pre.get('has_los') if pre else None,
precomputed_terrain_loss=pre.get('terrain_loss') if pre else None,
precomputed_antenna_loss=pre.get('antenna_loss') if pre else None,
)
if point.rsrp >= settings.min_signal:
results.append(point.model_dump())
return results
# ── Register the Ray remote function (only if Ray is available) ──
_ray_process_chunk = None
if RAY_AVAILABLE:
_ray_process_chunk = ray.remote(_ray_process_chunk_impl)
# ── Public API ──
def get_cpu_count() -> int:
"""Get number of usable CPU cores, capped at 6.
Each worker holds its own copy of buildings + OSM data + spatial index
(~200-400 MB per worker). Capping at 6 prevents OOM on systems with
8-16 GB RAM (especially WSL2 with limited memory allocation).
"""
try:
return min(mp.cpu_count() or 4, 6)
except Exception:
return 4
def get_parallel_backend() -> str:
"""Return which parallel backend is available."""
if RAY_AVAILABLE:
return "ray"
return "process_pool"
def _try_init_ray(num_cpus: int) -> bool:
"""Initialize Ray lazily. Returns True if Ray is ready."""
if not RAY_AVAILABLE:
return False
if ray.is_initialized():
return True
try:
data_path = os.environ.get('RFCP_DATA_PATH', './data')
ray_tmp = os.path.join(data_path, 'ray_tmp')
os.makedirs(ray_tmp, exist_ok=True)
ray.init(
num_cpus=num_cpus,
include_dashboard=False,
log_to_driver=True,
_temp_dir=ray_tmp,
)
print(f"[PARALLEL] Ray initialized: {num_cpus} CPUs, "
f"object store ~{ray.cluster_resources().get('object_store_memory', 0) / 1e9:.1f}GB",
flush=True)
return True
except Exception as e:
print(f"[PARALLEL] Ray init failed: {e}", flush=True)
return False
def calculate_coverage_parallel(
grid: List[Tuple[float, float]],
point_elevations: Dict[Tuple[float, float], float],
site_dict: Dict,
settings_dict: Dict,
terrain_cache: Dict[str, np.ndarray],
buildings: List,
streets: List,
water_bodies: List,
vegetation_areas: List,
site_elevation: float,
num_workers: Optional[int] = None,
log_fn: Optional[Callable[[str], None]] = None,
cancel_token: Optional[CancellationToken] = None,
precomputed: Optional[Dict] = None,
progress_fn: Optional[Callable[[str, float], None]] = None,
) -> Tuple[List[Dict], Dict[str, float]]:
"""Calculate coverage points in parallel.
Uses Ray if available (shared memory, zero-copy numpy), otherwise
falls back to ProcessPoolExecutor or sequential single-threaded calculation.
cancel_token: cooperative cancellation — checked between chunks.
precomputed: dict mapping (lat, lon) -> {distance, path_loss} from GPU pre-computation.
"""
if log_fn is None:
log_fn = lambda msg: print(f"[PARALLEL] {msg}", flush=True)
if num_workers is None:
num_workers = get_cpu_count()
total_points = len(grid)
# Try Ray
if RAY_AVAILABLE and _try_init_ray(num_workers):
try:
return _calculate_with_ray(
grid, point_elevations, site_dict, settings_dict,
terrain_cache, buildings, streets, water_bodies,
vegetation_areas, site_elevation,
num_workers, log_fn, cancel_token, precomputed,
progress_fn,
)
except Exception as e:
log_fn(f"Ray execution failed: {e} — falling back to sequential")
# Fallback: ProcessPoolExecutor (shared memory eliminates per-chunk pickle)
pool_workers = num_workers
# Scale workers down based on data volume to prevent OOM.
# Each worker unpickles + holds its own copy of buildings, OSM data, and
# spatial index. With large datasets the per-worker memory can exceed
# 300 MB, so reduce workers to keep total under ~2 GB.
data_items = len(buildings) + len(streets) + len(water_bodies) + len(vegetation_areas)
if data_items > 20000:
pool_workers = min(pool_workers, 2)
log_fn(f"Data volume high ({data_items} items) — capping workers at {pool_workers}")
elif data_items > 10000:
pool_workers = min(pool_workers, 3)
log_fn(f"Data volume moderate ({data_items} items) — capping workers at {pool_workers}")
elif data_items > 5000:
pool_workers = min(pool_workers, 4)
log_fn(f"Data volume elevated ({data_items} items) — capping workers at {pool_workers}")
log_fn(f"ProcessPool: {pool_workers} workers (cpu_count={num_workers}, data_items={data_items})")
if pool_workers > 1 and total_points > 100:
try:
return _calculate_with_process_pool(
grid, point_elevations, site_dict, settings_dict,
terrain_cache, buildings, streets, water_bodies,
vegetation_areas, site_elevation,
pool_workers, log_fn, cancel_token, precomputed,
progress_fn,
)
except (MemoryError, OSError) as e:
log_fn(f"ProcessPool OOM/OS error: {e} — falling back to sequential")
except Exception as e:
log_fn(f"ProcessPool failed: {e} — falling back to sequential")
# Last resort: sequential
log_fn(f"Sequential fallback: {total_points} points")
return _calculate_sequential(
grid, point_elevations, site_dict, settings_dict,
buildings, streets, water_bodies, vegetation_areas,
site_elevation, log_fn, cancel_token, precomputed,
progress_fn,
)
# ── Ray backend ──
def _calculate_with_ray(
grid, point_elevations, site_dict, settings_dict,
terrain_cache, buildings, streets, water_bodies,
vegetation_areas, site_elevation,
num_workers, log_fn, cancel_token=None, precomputed=None,
progress_fn=None,
):
"""Execute using Ray shared-memory object store."""
total_points = len(grid)
log_fn(f"Ray mode: {total_points} points, {num_workers} workers")
# ── Put large data into Ray object store ──
t_put = time.time()
terrain_ref = ray.put(terrain_cache)
buildings_ref = ray.put(buildings)
osm_ref = ray.put({
'streets': streets,
'water_bodies': water_bodies,
'vegetation_areas': vegetation_areas,
})
cache_key = f"{site_dict['lat']:.4f},{site_dict['lon']:.4f},{len(buildings)}"
config = {
'site_dict': site_dict,
'settings_dict': settings_dict,
'site_elevation': site_elevation,
'cache_key': cache_key,
}
if precomputed:
config['precomputed'] = precomputed
config_ref = ray.put(config)
put_time = time.time() - t_put
log_fn(f"ray.put() done in {put_time:.1f}s")
# ── Prepare and submit chunks ──
items = [
(lat, lon, point_elevations.get((lat, lon), 0.0))
for lat, lon in grid
]
# Larger chunks to amortize IPC overhead (was num_workers*4)
chunk_size = max(1, min(400, len(items) // max(2, num_workers)))
chunks = [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]
log_fn(f"Submitting {len(chunks)} chunks of ~{chunk_size} points")
t_calc = time.time()
pending = [
_ray_process_chunk.remote(chunk, terrain_ref, buildings_ref, osm_ref, config_ref)
for chunk in chunks
]
# ── Collect results with progress via ray.wait() ──
all_results: List[Dict] = []
total_chunks = len(pending)
remaining = list(pending)
completed_chunks = 0
while remaining:
# Check cancellation before waiting
if cancel_token and cancel_token.is_cancelled:
log_fn(f"Cancelled — aborting {len(remaining)} remaining Ray chunks")
for ref in remaining:
try:
ray.cancel(ref, force=True)
except Exception:
pass
break
# Wait for at least 1 result, batch up to ~10% for progress logging
batch = max(1, min(len(remaining), total_chunks // 10 or 1))
done, remaining = ray.wait(remaining, num_returns=batch, timeout=30)
for ref in done:
try:
chunk_results = ray.get(ref)
all_results.extend(chunk_results)
except Exception as e:
log_fn(f"Chunk error: {e}")
completed_chunks += len(done)
pct = completed_chunks * 100 // total_chunks
elapsed = time.time() - t_calc
pts = len(all_results)
rate = pts / elapsed if elapsed > 0 else 0
eta = (total_points - pts) / rate if rate > 0 else 0
log_fn(f"Progress: {completed_chunks}/{total_chunks} chunks ({pct}%) — "
f"{pts} pts, {rate:.0f} pts/s, ETA {eta:.0f}s")
if progress_fn:
# Map chunk progress to 40%-95% range
progress_fn("Calculating coverage", 0.40 + 0.55 * (completed_chunks / total_chunks))
calc_time = time.time() - t_calc
log_fn(f"Ray done: {calc_time:.1f}s, {len(all_results)} results "
f"({calc_time / max(1, total_points) * 1000:.1f}ms/point)")
# Force garbage collection after Ray computation
gc.collect()
timing = {
"parallel_total": calc_time,
"ray_put": put_time,
"workers": num_workers,
"backend": "ray",
}
return all_results, timing
# ── ProcessPoolExecutor fallback ──
def _pool_worker_process_chunk(args):
"""Worker function for ProcessPoolExecutor. Processes a chunk of points."""
chunk, terrain_cache, buildings, osm_data, config = args
from app.services.terrain_service import terrain_service
terrain_service._tile_cache = terrain_cache
spatial_idx = None
if buildings:
from app.services.spatial_index import SpatialIndex
spatial_idx = SpatialIndex()
spatial_idx.build(buildings)
from app.services.coverage_service import CoverageService, SiteParams, CoverageSettings
site = SiteParams(**config['site_dict'])
settings = CoverageSettings(**config['settings_dict'])
svc = CoverageService()
timing = {
"los": 0.0, "buildings": 0.0, "antenna": 0.0,
"dominant_path": 0.0, "street_canyon": 0.0,
"reflection": 0.0, "vegetation": 0.0,
"lod_none": 0, "lod_simplified": 0, "lod_full": 0,
}
precomputed = config.get('precomputed')
results = []
for lat, lon, point_elev in chunk:
pre = precomputed.get((lat, lon)) if precomputed else None
point = svc._calculate_point_sync(
site, lat, lon, settings,
buildings, osm_data.get('streets', []),
spatial_idx, osm_data.get('water_bodies', []),
osm_data.get('vegetation_areas', []),
config['site_elevation'], point_elev, timing,
precomputed_distance=pre.get('distance') if pre else None,
precomputed_path_loss=pre.get('path_loss') if pre else None,
precomputed_has_los=pre.get('has_los') if pre else None,
precomputed_terrain_loss=pre.get('terrain_loss') if pre else None,
precomputed_antenna_loss=pre.get('antenna_loss') if pre else None,
)
if point.rsrp >= settings.min_signal:
results.append(point.model_dump())
return results
def _store_terrain_in_shm(terrain_cache: Dict[str, np.ndarray], log_fn) -> Tuple[list, Dict[str, dict]]:
"""Store terrain tile arrays in shared memory. Returns (shm_blocks, tile_refs).
tile_refs is a dict mapping tile_name -> {shm_name, shape, dtype_str}
that workers use to reconstruct numpy arrays from shared memory.
"""
import multiprocessing.shared_memory as shm_mod
blocks = []
refs = {}
for tile_name, arr in terrain_cache.items():
try:
block = shm_mod.SharedMemory(create=True, size=arr.nbytes)
blocks.append(block)
# Copy tile data to shared memory
shm_arr = np.ndarray(arr.shape, dtype=arr.dtype, buffer=block.buf)
shm_arr[:] = arr[:]
refs[tile_name] = {
'shm_name': block.name,
'shape': arr.shape,
'dtype': str(arr.dtype),
}
except Exception as e:
log_fn(f"Failed to store tile {tile_name} in shm: {e}")
# Fallback: worker will have to use pickled copy
pass
return blocks, refs
def _store_pickle_in_shm(data, label: str, log_fn) -> Tuple[Optional[Any], Optional[dict]]:
"""Pickle arbitrary data into a SharedMemory block.
Returns (shm_block, ref_dict) where ref_dict = {shm_name, size}.
On failure returns (None, None) and caller should fall back to pickle.
"""
import multiprocessing.shared_memory as shm_mod
import pickle
try:
blob = pickle.dumps(data, protocol=pickle.HIGHEST_PROTOCOL)
size = len(blob)
block = shm_mod.SharedMemory(create=True, size=size)
block.buf[:size] = blob
mb = size / (1024 * 1024)
log_fn(f"{label} in shared memory: {mb:.1f} MB")
return block, {'shm_name': block.name, 'size': size}
except Exception as e:
log_fn(f"Failed to store {label} in shm: {e}")
return None, None
def _pool_worker_shm_chunk(args):
"""Worker function that reads terrain from shared memory instead of pickle."""
import multiprocessing.shared_memory as shm_mod
chunk, terrain_shm_refs, buildings, osm_data, config = args
# Reconstruct terrain cache from shared memory (zero-copy numpy views)
terrain_cache = {}
for tile_name, ref in terrain_shm_refs.items():
try:
block = shm_mod.SharedMemory(name=ref['shm_name'])
terrain_cache[tile_name] = np.ndarray(
ref['shape'], dtype=ref['dtype'], buffer=block.buf,
)
except Exception:
pass
# Inject terrain cache
from app.services.terrain_service import terrain_service
terrain_service._tile_cache = terrain_cache
# Build spatial index
global _worker_spatial_idx, _worker_cache_key
cache_key = config.get('cache_key', '')
if _worker_cache_key != cache_key:
if buildings:
from app.services.spatial_index import SpatialIndex
_worker_spatial_idx = SpatialIndex()
_worker_spatial_idx.build(buildings)
else:
_worker_spatial_idx = None
_worker_cache_key = cache_key
# Process points
from app.services.coverage_service import CoverageService, SiteParams, CoverageSettings
site = SiteParams(**config['site_dict'])
settings = CoverageSettings(**config['settings_dict'])
svc = CoverageService()
timing = {
"los": 0.0, "buildings": 0.0, "antenna": 0.0,
"dominant_path": 0.0, "street_canyon": 0.0,
"reflection": 0.0, "vegetation": 0.0,
"lod_none": 0, "lod_simplified": 0, "lod_full": 0,
}
precomputed = config.get('precomputed')
results = []
for lat, lon, point_elev in chunk:
pre = precomputed.get((lat, lon)) if precomputed else None
point = svc._calculate_point_sync(
site, lat, lon, settings,
buildings, osm_data.get('streets', []),
_worker_spatial_idx, osm_data.get('water_bodies', []),
osm_data.get('vegetation_areas', []),
config['site_elevation'], point_elev, timing,
precomputed_distance=pre.get('distance') if pre else None,
precomputed_path_loss=pre.get('path_loss') if pre else None,
precomputed_has_los=pre.get('has_los') if pre else None,
precomputed_terrain_loss=pre.get('terrain_loss') if pre else None,
precomputed_antenna_loss=pre.get('antenna_loss') if pre else None,
)
if point.rsrp >= settings.min_signal:
results.append(point.model_dump())
return results
_worker_chunk_count: int = 0 # per-worker chunk counter
def _pool_worker_shm_shared(args):
"""Worker: terrain + buildings + OSM all via shared memory.
Per-chunk args are tiny (~8 KB): just point coords, shm refs, and config.
Buildings and OSM data are unpickled from shared memory ONCE per worker
and cached in module globals for subsequent chunks.
"""
import multiprocessing.shared_memory as shm_mod
import pickle
global _worker_chunk_count
_worker_chunk_count += 1
pid = os.getpid()
t_worker_start = time.perf_counter()
chunk, terrain_shm_refs, shared_data_refs, config = args
# ── Reconstruct terrain from shared memory ──
t0 = time.perf_counter()
terrain_cache = {}
for tile_name, ref in terrain_shm_refs.items():
try:
block = shm_mod.SharedMemory(name=ref['shm_name'])
terrain_cache[tile_name] = np.ndarray(
ref['shape'], dtype=ref['dtype'], buffer=block.buf,
)
except Exception:
pass
from app.services.terrain_service import terrain_service
terrain_service._tile_cache = terrain_cache
t_terrain_shm = time.perf_counter() - t0
# ── Read buildings + OSM from shared memory (cached per worker) ──
global _worker_shared_buildings, _worker_shared_osm_data, _worker_shared_data_key
global _worker_spatial_idx, _worker_cache_key
data_key = config.get('cache_key', '')
cached = (_worker_shared_data_key == data_key)
t_unpickle_bld = 0.0
t_unpickle_osm = 0.0
t_spatial = 0.0
if not cached:
# First chunk for this calculation — unpickle from shm
buildings_ref = shared_data_refs.get('buildings')
osm_ref = shared_data_refs.get('osm_data')
if buildings_ref:
try:
t0 = time.perf_counter()
blk = shm_mod.SharedMemory(name=buildings_ref['shm_name'])
_worker_shared_buildings = pickle.loads(bytes(blk.buf[:buildings_ref['size']]))
t_unpickle_bld = time.perf_counter() - t0
except Exception:
_worker_shared_buildings = []
else:
_worker_shared_buildings = []
if osm_ref:
try:
t0 = time.perf_counter()
blk = shm_mod.SharedMemory(name=osm_ref['shm_name'])
_worker_shared_osm_data = pickle.loads(bytes(blk.buf[:osm_ref['size']]))
t_unpickle_osm = time.perf_counter() - t0
except Exception:
_worker_shared_osm_data = {}
else:
_worker_shared_osm_data = {}
_worker_shared_data_key = data_key
# Rebuild spatial index for new data
t0 = time.perf_counter()
if _worker_shared_buildings:
from app.services.spatial_index import SpatialIndex
_worker_spatial_idx = SpatialIndex()
_worker_spatial_idx.build(_worker_shared_buildings)
else:
_worker_spatial_idx = None
_worker_cache_key = data_key
t_spatial = time.perf_counter() - t0
print(
f"[WORKER {pid}] Init: terrain_shm={t_terrain_shm*1000:.1f}ms "
f"unpickle_bld={t_unpickle_bld*1000:.1f}ms "
f"unpickle_osm={t_unpickle_osm*1000:.1f}ms "
f"spatial={t_spatial*1000:.1f}ms "
f"buildings={len(_worker_shared_buildings or [])} "
f"tiles={len(terrain_cache)}",
flush=True,
)
print(
f"[WORKER {pid}] Processing chunk {_worker_chunk_count}, "
f"cached={cached}, points={len(chunk)}",
flush=True,
)
buildings = _worker_shared_buildings or []
osm_data = _worker_shared_osm_data or {}
# ── Imports + object creation (timed) ──
t0 = time.perf_counter()
from app.services.coverage_service import CoverageService, SiteParams, CoverageSettings
t_import = time.perf_counter() - t0
t0 = time.perf_counter()
site = SiteParams(**config['site_dict'])
settings = CoverageSettings(**config['settings_dict'])
svc = CoverageService()
t_pydantic = time.perf_counter() - t0
timing = {
"los": 0.0, "buildings": 0.0, "antenna": 0.0,
"dominant_path": 0.0, "street_canyon": 0.0,
"reflection": 0.0, "vegetation": 0.0,
"lod_none": 0, "lod_simplified": 0, "lod_full": 0,
}
precomputed = config.get('precomputed')
streets = osm_data.get('streets', [])
water = osm_data.get('water_bodies', [])
veg = osm_data.get('vegetation_areas', [])
site_elev = config['site_elevation']
t_init_done = time.perf_counter()
init_ms = (t_init_done - t_worker_start) * 1000
# ── Process points with per-point profiling (first 3 only) ──
results = []
t_loop_start = time.perf_counter()
t_model_dump_total = 0.0
n_dumped = 0
for i, (lat, lon, point_elev) in enumerate(chunk):
pre = precomputed.get((lat, lon)) if precomputed else None
# Snapshot timing dict before call (for first 3 points)
if i < 3:
timing_before = {k: v for k, v in timing.items()}
t_pt = time.perf_counter()
point = svc._calculate_point_sync(
site, lat, lon, settings,
buildings, streets,
_worker_spatial_idx, water, veg,
site_elev, point_elev, timing,
precomputed_distance=pre.get('distance') if pre else None,
precomputed_path_loss=pre.get('path_loss') if pre else None,
precomputed_has_los=pre.get('has_los') if pre else None,
precomputed_terrain_loss=pre.get('terrain_loss') if pre else None,
precomputed_antenna_loss=pre.get('antenna_loss') if pre else None,
)
if i < 3:
t_pt_done = time.perf_counter()
pt_ms = (t_pt_done - t_pt) * 1000
deltas = {k: (timing[k] - timing_before.get(k, 0)) * 1000 for k in timing}
parts = " ".join(f"{k}={v:.2f}" for k, v in deltas.items() if v > 0.001)
print(
f"[WORKER {pid}] Point {i}: {pt_ms:.2f}ms "
f"rsrp={point.rsrp:.1f} dist={point.distance:.0f}m "
f"breakdown=[{parts}]",
flush=True,
)
if point.rsrp >= settings.min_signal:
t_md = time.perf_counter()
results.append(point.model_dump())
t_model_dump_total += time.perf_counter() - t_md
n_dumped += 1
t_loop_done = time.perf_counter()
loop_ms = (t_loop_done - t_loop_start) * 1000
total_ms = (t_loop_done - t_worker_start) * 1000
avg_pt = loop_ms / len(chunk) if chunk else 0
avg_dump = (t_model_dump_total * 1000 / n_dumped) if n_dumped else 0
print(
f"[WORKER {pid}] Chunk done: total={total_ms:.0f}ms "
f"init={init_ms:.0f}ms loop={loop_ms:.0f}ms "
f"avg_pt={avg_pt:.2f}ms model_dump={avg_dump:.2f}ms×{n_dumped} "
f"import={t_import*1000:.1f}ms pydantic={t_pydantic*1000:.1f}ms "
f"terrain_shm={t_terrain_shm*1000:.1f}ms "
f"results={len(results)}/{len(chunk)}",
flush=True,
)
return results
def _calculate_with_process_pool(
grid, point_elevations, site_dict, settings_dict,
terrain_cache, buildings, streets, water_bodies,
vegetation_areas, site_elevation,
num_workers, log_fn, cancel_token=None, precomputed=None,
progress_fn=None,
):
"""Execute using ProcessPoolExecutor.
Uses shared memory for terrain tiles (zero-copy numpy views), buildings,
and OSM data (pickle-once, read-many) to eliminate per-chunk serialization
overhead.
"""
from concurrent.futures import ProcessPoolExecutor, as_completed
total_points = len(grid)
building_count = len(buildings)
data_items = building_count + len(streets) + len(water_bodies) + len(vegetation_areas)
log_fn(f"ProcessPool mode: {total_points} points, {num_workers} workers, "
f"{building_count} buildings, {data_items} total OSM items")
# Log memory at start
try:
with open('/proc/self/status') as f:
for line in f:
if line.startswith('VmRSS:'):
log_fn(f"Memory before calculation: {line.strip()}")
break
except Exception:
pass
# Store terrain tiles in shared memory
shm_blocks = []
terrain_shm_refs = {}
try:
shm_blocks, terrain_shm_refs = _store_terrain_in_shm(terrain_cache, log_fn)
if terrain_shm_refs:
tile_mb = sum(
np.prod(r['shape']) * np.dtype(r['dtype']).itemsize
for r in terrain_shm_refs.values()
) / (1024 * 1024)
log_fn(f"Stored {len(terrain_shm_refs)} terrain tiles in shared memory ({tile_mb:.0f} MB)")
use_shm = True
else:
use_shm = False
except Exception as e:
log_fn(f"Shared memory setup failed ({e}), using pickle fallback")
use_shm = False
# Store buildings + OSM data in shared memory (pickle once, read many)
shared_data_refs = {}
if use_shm:
bld_block, bld_ref = _store_pickle_in_shm(buildings, "Buildings", log_fn)
if bld_block:
shm_blocks.append(bld_block)
shared_data_refs['buildings'] = bld_ref
osm_data_dict = {
'streets': streets,
'water_bodies': water_bodies,
'vegetation_areas': vegetation_areas,
}
osm_block, osm_ref = _store_pickle_in_shm(osm_data_dict, "OSM data", log_fn)
if osm_block:
shm_blocks.append(osm_block)
shared_data_refs['osm_data'] = osm_ref
items = [
(lat, lon, point_elevations.get((lat, lon), 0.0))
for lat, lon in grid
]
# Target larger chunks to amortize IPC overhead (was num_workers*2)
chunk_size = max(1, min(400, len(items) // max(2, num_workers)))
chunks = [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]
log_fn(f"Submitting {len(chunks)} chunks of ~{chunk_size} points")
cache_key = f"{site_dict['lat']:.4f},{site_dict['lon']:.4f},{len(buildings)}"
config = {
'site_dict': site_dict,
'settings_dict': settings_dict,
'site_elevation': site_elevation,
'cache_key': cache_key,
}
if precomputed:
config['precomputed'] = precomputed
osm_data = {
'streets': streets,
'water_bodies': water_bodies,
'vegetation_areas': vegetation_areas,
}
t_calc = time.time()
all_results: List[Dict] = []
pool = None
try:
ctx = mp.get_context('spawn')
pool = ProcessPoolExecutor(max_workers=num_workers, mp_context=ctx)
_set_active_pool(pool)
if use_shm and shared_data_refs:
# Full shared memory path: terrain + buildings + OSM all via shm
worker_fn = _pool_worker_shm_shared
futures = {
pool.submit(
worker_fn,
(chunk, terrain_shm_refs, shared_data_refs, config),
): i
for i, chunk in enumerate(chunks)
}
elif use_shm and data_items <= 2000:
# Terrain-only shm — buildings/OSM pickled per chunk.
# Only safe for small datasets; large datasets would OOM from
# pickle copies (num_chunks × pickle_size).
log_fn(f"Terrain-only shm (small data: {data_items} items)")
worker_fn = _pool_worker_shm_chunk
futures = {
pool.submit(
worker_fn,
(chunk, terrain_shm_refs, buildings, osm_data, config),
): i
for i, chunk in enumerate(chunks)
}
elif data_items <= 2000:
# Full pickle fallback — only safe for small datasets
log_fn(f"Full pickle path (small data: {data_items} items)")
futures = {
pool.submit(
_pool_worker_process_chunk,
(chunk, terrain_cache, buildings, osm_data, config),
): i
for i, chunk in enumerate(chunks)
}
else:
# Large dataset + shared memory failed → per-chunk pickle would OOM.
# Bail out; caller will fall back to sequential.
log_fn(f"Shared memory failed for large dataset ({data_items} items) "
f"— skipping ProcessPool to avoid OOM")
raise MemoryError(
f"Cannot safely pickle {data_items} OSM items per chunk"
)
completed_chunks = 0
for future in as_completed(futures):
if cancel_token and cancel_token.is_cancelled:
log_fn(f"Cancelled — cancelling {len(futures) - completed_chunks - 1} pending futures")
for f in futures:
f.cancel()
break
try:
chunk_results = future.result()
all_results.extend(chunk_results)
except Exception as e:
log_fn(f"Chunk error: {e}")
completed_chunks += 1
pct = completed_chunks * 100 // len(chunks)
elapsed = time.time() - t_calc
pts = len(all_results)
rate = pts / elapsed if elapsed > 0 else 0
eta = (total_points - pts) / rate if rate > 0 else 0
log_fn(f"Progress: {completed_chunks}/{len(chunks)} chunks ({pct}%) — "
f"{pts} pts, {rate:.0f} pts/s, ETA {eta:.0f}s")
if progress_fn:
progress_fn("Calculating coverage", 0.40 + 0.55 * (completed_chunks / len(chunks)))
except MemoryError:
raise # Propagate to caller for sequential fallback
except Exception as e:
log_fn(f"ProcessPool error: {e}")
finally:
_clear_active_pool()
if pool:
pool.shutdown(wait=False, cancel_futures=True)
time.sleep(0.5)
killed = _kill_worker_processes()
if killed > 0:
log_fn(f"Force killed {killed} orphaned workers")
# Cleanup shared memory blocks
for block in shm_blocks:
try:
block.close()
block.unlink()
except Exception:
pass
# Release large local references before GC
chunks = None # noqa: F841
items = None # noqa: F841
osm_data = None # noqa: F841
shared_data_refs = None # noqa: F841
# Force garbage collection to release memory from workers
gc.collect()
# Log memory after cleanup
try:
with open('/proc/self/status') as f:
for line in f:
if line.startswith('VmRSS:'):
log_fn(f"Memory after cleanup: {line.strip()}")
break
except Exception:
pass
calc_time = time.time() - t_calc
log_fn(f"ProcessPool done: {calc_time:.1f}s, {len(all_results)} results "
f"({calc_time / max(1, total_points) * 1000:.1f}ms/point)")
timing = {
"parallel_total": calc_time,
"workers": num_workers,
"backend": "process_pool" + (
"/shm_full" if (use_shm and shared_data_refs)
else "/shm_terrain" if use_shm
else "/pickle"
),
}
return all_results, timing
# ── Sequential fallback ──
def _calculate_sequential(
grid, point_elevations, site_dict, settings_dict,
buildings, streets, water_bodies, vegetation_areas,
site_elevation, log_fn, cancel_token=None, precomputed=None,
progress_fn=None,
):
"""Sequential fallback — no extra dependencies, runs in calling thread."""
from app.services.coverage_service import CoverageService, SiteParams, CoverageSettings
from app.services.spatial_index import SpatialIndex
site = SiteParams(**site_dict)
settings = CoverageSettings(**settings_dict)
svc = CoverageService()
spatial_idx = None
if buildings:
spatial_idx = SpatialIndex()
spatial_idx.build(buildings)
total = len(grid)
log_interval = max(1, total // 20)
timing = {
"los": 0.0, "buildings": 0.0, "antenna": 0.0,
"dominant_path": 0.0, "street_canyon": 0.0,
"reflection": 0.0, "vegetation": 0.0,
"lod_none": 0, "lod_simplified": 0, "lod_full": 0,
}
t0 = time.time()
results = []
for i, (lat, lon) in enumerate(grid):
# Check cancellation
if cancel_token and cancel_token.is_cancelled:
log_fn(f"Sequential cancelled at {i}/{total}")
break
if i % log_interval == 0:
log_fn(f"Sequential: {i}/{total} ({i * 100 // total}%)")
if progress_fn:
progress_fn("Calculating coverage", 0.40 + 0.55 * (i / total))
point_elev = point_elevations.get((lat, lon), 0.0)
# Use precomputed values if available
pre = precomputed.get((lat, lon)) if precomputed else None
point = svc._calculate_point_sync(
site, lat, lon, settings,
buildings, streets, spatial_idx,
water_bodies, vegetation_areas,
site_elevation, point_elev, timing,
precomputed_distance=pre.get('distance') if pre else None,
precomputed_path_loss=pre.get('path_loss') if pre else None,
precomputed_has_los=pre.get('has_los') if pre else None,
precomputed_terrain_loss=pre.get('terrain_loss') if pre else None,
precomputed_antenna_loss=pre.get('antenna_loss') if pre else None,
)
if point.rsrp >= settings.min_signal:
results.append(point.model_dump())
calc_time = time.time() - t0
log_fn(f"Sequential done: {calc_time:.1f}s, {len(results)} results "
f"({calc_time / max(1, total) * 1000:.1f}ms/point)")
# Force garbage collection after sequential computation
gc.collect()
timing["sequential_total"] = calc_time
timing["backend"] = "sequential"
return results, timing