@mytec: iter2.4.2 start
This commit is contained in:
@@ -84,9 +84,15 @@ async def calculate_coverage(request: CoverageRequest) -> CoverageResponse:
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
cancel_token.cancel()
|
||||
raise HTTPException(408, "Calculation timeout (5 min) — try smaller radius or lower resolution")
|
||||
# Force cleanup orphaned worker processes
|
||||
from app.services.parallel_coverage_service import _kill_worker_processes
|
||||
killed = _kill_worker_processes()
|
||||
detail = f"Calculation timeout (5 min). Cleaned up {killed} workers." if killed else "Calculation timeout (5 min) — try smaller radius or lower resolution"
|
||||
raise HTTPException(408, detail)
|
||||
except asyncio.CancelledError:
|
||||
cancel_token.cancel()
|
||||
from app.services.parallel_coverage_service import _kill_worker_processes
|
||||
_kill_worker_processes()
|
||||
raise HTTPException(499, "Client disconnected")
|
||||
|
||||
computation_time = time.time() - start_time
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import os
|
||||
import asyncio
|
||||
import multiprocessing as mp
|
||||
from fastapi import APIRouter
|
||||
|
||||
@@ -42,3 +44,17 @@ async def get_system_info():
|
||||
"gpu": gpu_info,
|
||||
"gpu_available": gpu_info.get("available", False),
|
||||
}
|
||||
|
||||
|
||||
@router.post("/shutdown")
|
||||
async def shutdown():
|
||||
"""Graceful shutdown endpoint. Kills worker processes and exits."""
|
||||
from app.services.parallel_coverage_service import _kill_worker_processes
|
||||
|
||||
killed = _kill_worker_processes()
|
||||
|
||||
# Schedule hard exit after response is sent
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.call_later(0.5, lambda: os._exit(0))
|
||||
|
||||
return {"status": "shutting down", "workers_killed": killed}
|
||||
|
||||
@@ -21,6 +21,45 @@ class RayPath:
|
||||
is_valid: bool # Does this path exist?
|
||||
|
||||
|
||||
MAX_BUILDINGS_FOR_LINE = 100
|
||||
MAX_BUILDINGS_FOR_REFLECTION = 100
|
||||
MAX_DISTANCE_FROM_PATH = 500 # meters
|
||||
|
||||
|
||||
def _filter_buildings_by_distance(buildings, tx_point, rx_point, max_count=100, max_distance=500):
|
||||
"""Filter buildings to only those close to the TX-RX path.
|
||||
|
||||
Sort by distance to path midpoint, filter by max_distance, take top max_count.
|
||||
Uses squared Euclidean distance (no sqrt) for speed.
|
||||
"""
|
||||
if len(buildings) <= max_count:
|
||||
return buildings
|
||||
|
||||
mid_lat = (tx_point[0] + rx_point[0]) / 2
|
||||
mid_lon = (tx_point[1] + rx_point[1]) / 2
|
||||
|
||||
max_dist_sq = max_distance * max_distance
|
||||
|
||||
def dist_sq_to_midpoint(building):
|
||||
# Building centroid from geometry or fallback to midpoint
|
||||
geom = building.geometry
|
||||
if geom:
|
||||
blat = sum(p[1] for p in geom) / len(geom)
|
||||
blon = sum(p[0] for p in geom) / len(geom)
|
||||
else:
|
||||
blat, blon = mid_lat, mid_lon
|
||||
dlat = (blat - mid_lat) * 111000
|
||||
dlon = (blon - mid_lon) * 111000 * 0.7 # rough cos correction
|
||||
return dlat * dlat + dlon * dlon
|
||||
|
||||
scored = [(b, dist_sq_to_midpoint(b)) for b in buildings]
|
||||
scored.sort(key=lambda x: x[1])
|
||||
|
||||
# Filter by max distance and take top N
|
||||
filtered = [b for b, d in scored if d <= max_dist_sq]
|
||||
return filtered[:max_count]
|
||||
|
||||
|
||||
class DominantPathService:
|
||||
"""
|
||||
Find dominant propagation paths (2-3 strongest)
|
||||
@@ -420,6 +459,15 @@ class DominantPathService:
|
||||
else:
|
||||
line_buildings = buildings
|
||||
|
||||
# Filter to limit building count — prevents 600+ buildings per point
|
||||
original_line_count = len(line_buildings)
|
||||
line_buildings = _filter_buildings_by_distance(
|
||||
line_buildings,
|
||||
(tx_lat, tx_lon), (rx_lat, rx_lon),
|
||||
max_count=MAX_BUILDINGS_FOR_LINE,
|
||||
max_distance=MAX_DISTANCE_FROM_PATH,
|
||||
)
|
||||
|
||||
direct = self._check_direct_path_sync(
|
||||
tx_lat, tx_lon, tx_height,
|
||||
rx_lat, rx_lon, rx_height,
|
||||
@@ -442,13 +490,22 @@ class DominantPathService:
|
||||
else:
|
||||
reflection_buildings = buildings
|
||||
|
||||
# Filter reflection buildings to limit count
|
||||
original_refl_count = len(reflection_buildings)
|
||||
reflection_buildings = _filter_buildings_by_distance(
|
||||
reflection_buildings,
|
||||
(tx_lat, tx_lon), (rx_lat, rx_lon),
|
||||
max_count=MAX_BUILDINGS_FOR_REFLECTION,
|
||||
max_distance=MAX_DISTANCE_FROM_PATH,
|
||||
)
|
||||
|
||||
# Log building counts for first 3 points so user can verify filtering
|
||||
DominantPathService._log_count += 1
|
||||
if DominantPathService._log_count <= 3:
|
||||
import sys
|
||||
msg = (f"[DOMINANT_PATH] Point #{DominantPathService._log_count}: "
|
||||
f"line_bldgs={len(line_buildings)}, "
|
||||
f"refl_bldgs={len(reflection_buildings)}, "
|
||||
f"line_bldgs={len(line_buildings)} (from {original_line_count}), "
|
||||
f"refl_bldgs={len(reflection_buildings)} (from {original_refl_count}), "
|
||||
f"total_available={len(buildings)}, "
|
||||
f"spatial_idx={'YES' if spatial_idx else 'NO'}, "
|
||||
f"early_exit={'YES' if direct and direct.is_valid and not direct.materials_crossed else 'NO'}")
|
||||
|
||||
@@ -28,6 +28,7 @@ import threading
|
||||
import multiprocessing as mp
|
||||
from typing import List, Dict, Tuple, Any, Optional, Callable
|
||||
import numpy as np
|
||||
import psutil
|
||||
|
||||
|
||||
# ── Cancellation token ──
|
||||
@@ -46,6 +47,46 @@ class CancellationToken:
|
||||
return self._event.is_set()
|
||||
|
||||
|
||||
# ── Worker process cleanup ──
|
||||
|
||||
def _kill_worker_processes() -> int:
|
||||
"""Kill all child processes of the current process.
|
||||
|
||||
Uses psutil to find and terminate/kill child processes that may be
|
||||
orphaned after ProcessPoolExecutor timeout or cancellation.
|
||||
Returns the number of children killed.
|
||||
"""
|
||||
try:
|
||||
current = psutil.Process(os.getpid())
|
||||
children = current.children(recursive=True)
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
||||
return 0
|
||||
|
||||
if not children:
|
||||
return 0
|
||||
|
||||
count = len(children)
|
||||
|
||||
# First: graceful terminate
|
||||
for child in children:
|
||||
try:
|
||||
child.terminate()
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
||||
pass
|
||||
|
||||
# Wait up to 3 seconds for graceful exit
|
||||
gone, alive = psutil.wait_procs(children, timeout=3)
|
||||
|
||||
# Force kill survivors
|
||||
for p in alive:
|
||||
try:
|
||||
p.kill()
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
||||
pass
|
||||
|
||||
return count
|
||||
|
||||
|
||||
# ── Try to import Ray ──
|
||||
|
||||
RAY_AVAILABLE = False
|
||||
@@ -426,10 +467,12 @@ def _calculate_with_process_pool(
|
||||
|
||||
t_calc = time.time()
|
||||
all_results: List[Dict] = []
|
||||
pool = None
|
||||
|
||||
with ProcessPoolExecutor(max_workers=num_workers) as executor:
|
||||
try:
|
||||
pool = ProcessPoolExecutor(max_workers=num_workers)
|
||||
futures = {
|
||||
executor.submit(
|
||||
pool.submit(
|
||||
_pool_worker_process_chunk,
|
||||
(chunk, terrain_cache, buildings, osm_data, config),
|
||||
): i
|
||||
@@ -460,6 +503,17 @@ def _calculate_with_process_pool(
|
||||
log_fn(f"Progress: {completed_chunks}/{len(chunks)} chunks ({pct}%) — "
|
||||
f"{pts} pts, {rate:.0f} pts/s, ETA {eta:.0f}s")
|
||||
|
||||
except Exception as e:
|
||||
log_fn(f"ProcessPool error: {e}")
|
||||
|
||||
finally:
|
||||
# CRITICAL: Always cleanup pool and orphaned workers
|
||||
if pool:
|
||||
pool.shutdown(wait=False, cancel_futures=True)
|
||||
killed = _kill_worker_processes()
|
||||
if killed > 0:
|
||||
log_fn(f"Killed {killed} orphaned worker processes")
|
||||
|
||||
calc_time = time.time() - t_calc
|
||||
log_fn(f"ProcessPool done: {calc_time:.1f}s, {len(all_results)} results "
|
||||
f"({calc_time / max(1, total_points) * 1000:.1f}ms/point)")
|
||||
|
||||
Reference in New Issue
Block a user