@mytec: iter2.4 ready for testing

This commit is contained in:
2026-02-01 10:48:23 +02:00
parent 7893c57bc9
commit 5488633e43
19 changed files with 1448 additions and 69 deletions

View File

@@ -24,11 +24,28 @@ Usage:
import os
import sys
import time
import threading
import multiprocessing as mp
from typing import List, Dict, Tuple, Any, Optional, Callable
import numpy as np
# ── Cancellation token ──
class CancellationToken:
"""Thread-safe cancellation token for cooperative cancellation."""
def __init__(self):
self._event = threading.Event()
def cancel(self):
self._event.set()
@property
def is_cancelled(self) -> bool:
return self._event.is_set()
# ── Try to import Ray ──
RAY_AVAILABLE = False
@@ -80,14 +97,19 @@ def _ray_process_chunk_impl(chunk, terrain_cache, buildings, osm_data, config):
"reflection": 0.0, "vegetation": 0.0,
}
precomputed = config.get('precomputed')
results = []
for lat, lon, point_elev in chunk:
pre = precomputed.get((lat, lon)) if precomputed else None
point = svc._calculate_point_sync(
site, lat, lon, settings,
buildings, osm_data.get('streets', []),
_worker_spatial_idx, osm_data.get('water_bodies', []),
osm_data.get('vegetation_areas', []),
config['site_elevation'], point_elev, timing,
precomputed_distance=pre.get('distance') if pre else None,
precomputed_path_loss=pre.get('path_loss') if pre else None,
)
if point.rsrp >= settings.min_signal:
results.append(point.model_dump())
@@ -162,13 +184,16 @@ def calculate_coverage_parallel(
site_elevation: float,
num_workers: Optional[int] = None,
log_fn: Optional[Callable[[str], None]] = None,
cancel_token: Optional[CancellationToken] = None,
precomputed: Optional[Dict] = None,
) -> Tuple[List[Dict], Dict[str, float]]:
"""Calculate coverage points in parallel.
Uses Ray if available (shared memory, zero-copy numpy), otherwise
falls back to sequential single-threaded calculation.
falls back to ProcessPoolExecutor or sequential single-threaded calculation.
Same signature as before — drop-in replacement.
cancel_token: cooperative cancellation — checked between chunks.
precomputed: dict mapping (lat, lon) -> {distance, path_loss} from GPU pre-computation.
"""
if log_fn is None:
log_fn = lambda msg: print(f"[PARALLEL] {msg}", flush=True)
@@ -185,7 +210,7 @@ def calculate_coverage_parallel(
grid, point_elevations, site_dict, settings_dict,
terrain_cache, buildings, streets, water_bodies,
vegetation_areas, site_elevation,
num_workers, log_fn,
num_workers, log_fn, cancel_token, precomputed,
)
except Exception as e:
log_fn(f"Ray execution failed: {e} — falling back to sequential")
@@ -198,7 +223,7 @@ def calculate_coverage_parallel(
grid, point_elevations, site_dict, settings_dict,
terrain_cache, buildings, streets, water_bodies,
vegetation_areas, site_elevation,
pool_workers, log_fn,
pool_workers, log_fn, cancel_token, precomputed,
)
except Exception as e:
log_fn(f"ProcessPool failed: {e} — falling back to sequential")
@@ -208,7 +233,7 @@ def calculate_coverage_parallel(
return _calculate_sequential(
grid, point_elevations, site_dict, settings_dict,
buildings, streets, water_bodies, vegetation_areas,
site_elevation, log_fn,
site_elevation, log_fn, cancel_token, precomputed,
)
@@ -219,15 +244,13 @@ def _calculate_with_ray(
grid, point_elevations, site_dict, settings_dict,
terrain_cache, buildings, streets, water_bodies,
vegetation_areas, site_elevation,
num_workers, log_fn,
num_workers, log_fn, cancel_token=None, precomputed=None,
):
"""Execute using Ray shared-memory object store."""
total_points = len(grid)
log_fn(f"Ray mode: {total_points} points, {num_workers} workers")
# ── Put large data into Ray object store ──
# Numpy arrays (terrain tiles) get zero-copy shared memory.
# Python objects (buildings) get serialized once, stored in plasma.
t_put = time.time()
terrain_ref = ray.put(terrain_cache)
@@ -239,12 +262,15 @@ def _calculate_with_ray(
})
cache_key = f"{site_dict['lat']:.4f},{site_dict['lon']:.4f},{len(buildings)}"
config_ref = ray.put({
config = {
'site_dict': site_dict,
'settings_dict': settings_dict,
'site_elevation': site_elevation,
'cache_key': cache_key,
})
}
if precomputed:
config['precomputed'] = precomputed
config_ref = ray.put(config)
put_time = time.time() - t_put
log_fn(f"ray.put() done in {put_time:.1f}s")
@@ -273,9 +299,19 @@ def _calculate_with_ray(
completed_chunks = 0
while remaining:
# Check cancellation before waiting
if cancel_token and cancel_token.is_cancelled:
log_fn(f"Cancelled — aborting {len(remaining)} remaining Ray chunks")
for ref in remaining:
try:
ray.cancel(ref, force=True)
except Exception:
pass
break
# Wait for at least 1 result, batch up to ~10% for progress logging
batch = max(1, min(len(remaining), total_chunks // 10 or 1))
done, remaining = ray.wait(remaining, num_returns=batch, timeout=600)
done, remaining = ray.wait(remaining, num_returns=batch, timeout=30)
for ref in done:
try:
@@ -333,14 +369,19 @@ def _pool_worker_process_chunk(args):
"reflection": 0.0, "vegetation": 0.0,
}
precomputed = config.get('precomputed')
results = []
for lat, lon, point_elev in chunk:
pre = precomputed.get((lat, lon)) if precomputed else None
point = svc._calculate_point_sync(
site, lat, lon, settings,
buildings, osm_data.get('streets', []),
spatial_idx, osm_data.get('water_bodies', []),
osm_data.get('vegetation_areas', []),
config['site_elevation'], point_elev, timing,
precomputed_distance=pre.get('distance') if pre else None,
precomputed_path_loss=pre.get('path_loss') if pre else None,
)
if point.rsrp >= settings.min_signal:
results.append(point.model_dump())
@@ -352,7 +393,7 @@ def _calculate_with_process_pool(
grid, point_elevations, site_dict, settings_dict,
terrain_cache, buildings, streets, water_bodies,
vegetation_areas, site_elevation,
num_workers, log_fn,
num_workers, log_fn, cancel_token=None, precomputed=None,
):
"""Execute using ProcessPoolExecutor with reduced workers to limit memory."""
from concurrent.futures import ProcessPoolExecutor, as_completed
@@ -375,6 +416,8 @@ def _calculate_with_process_pool(
'settings_dict': settings_dict,
'site_elevation': site_elevation,
}
if precomputed:
config['precomputed'] = precomputed
osm_data = {
'streets': streets,
'water_bodies': water_bodies,
@@ -395,6 +438,13 @@ def _calculate_with_process_pool(
completed_chunks = 0
for future in as_completed(futures):
# Check cancellation between chunks
if cancel_token and cancel_token.is_cancelled:
log_fn(f"Cancelled — cancelling {len(futures) - completed_chunks - 1} pending futures")
for f in futures:
f.cancel()
break
try:
chunk_results = future.result()
all_results.extend(chunk_results)
@@ -428,7 +478,7 @@ def _calculate_with_process_pool(
def _calculate_sequential(
grid, point_elevations, site_dict, settings_dict,
buildings, streets, water_bodies, vegetation_areas,
site_elevation, log_fn,
site_elevation, log_fn, cancel_token=None, precomputed=None,
):
"""Sequential fallback — no extra dependencies, runs in calling thread."""
from app.services.coverage_service import CoverageService, SiteParams, CoverageSettings
@@ -453,15 +503,26 @@ def _calculate_sequential(
t0 = time.time()
results = []
for i, (lat, lon) in enumerate(grid):
# Check cancellation
if cancel_token and cancel_token.is_cancelled:
log_fn(f"Sequential cancelled at {i}/{total}")
break
if i % log_interval == 0:
log_fn(f"Sequential: {i}/{total} ({i * 100 // total}%)")
point_elev = point_elevations.get((lat, lon), 0.0)
# Use precomputed values if available
pre = precomputed.get((lat, lon)) if precomputed else None
point = svc._calculate_point_sync(
site, lat, lon, settings,
buildings, streets, spatial_idx,
water_bodies, vegetation_areas,
site_elevation, point_elev, timing,
precomputed_distance=pre.get('distance') if pre else None,
precomputed_path_loss=pre.get('path_loss') if pre else None,
)
if point.rsrp >= settings.min_signal:
results.append(point.model_dump())