Major refactoring of RFCP backend: - Modular propagation models (8 models) - SharedMemoryManager for terrain data - ProcessPoolExecutor parallel processing - WebSocket progress streaming - Building filtering pipeline (351k → 15k) - 82 unit tests Performance: Standard preset 38s → 5s (7.6x speedup) Known issue: Detailed preset timeout (fix in 3.1.0)
137 lines
4.3 KiB
Python
137 lines
4.3 KiB
Python
"""
|
|
Managed process pool with automatic cleanup.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import subprocess
|
|
import time
|
|
import multiprocessing as mp
|
|
from concurrent.futures import ProcessPoolExecutor, as_completed
|
|
from typing import List, Dict, Tuple, Optional, Callable
|
|
|
|
|
|
class ManagedProcessPool:
|
|
"""
|
|
Process pool wrapper with:
|
|
- Automatic cleanup on exit
|
|
- Worker process kill on failure
|
|
- Progress reporting
|
|
"""
|
|
|
|
def __init__(self, max_workers: int = 6):
|
|
self.max_workers = min(max_workers, 6)
|
|
self._pool: Optional[ProcessPoolExecutor] = None
|
|
|
|
def map_chunks(
|
|
self,
|
|
worker_fn: Callable,
|
|
chunks: List[tuple],
|
|
log_fn: Optional[Callable] = None,
|
|
) -> List[Dict]:
|
|
"""
|
|
Submit chunks to the pool and collect results.
|
|
|
|
Args:
|
|
worker_fn: Function to call for each chunk
|
|
chunks: List of (chunk_data, *args) tuples
|
|
log_fn: Progress logging function
|
|
|
|
Returns:
|
|
Flattened list of result dicts
|
|
"""
|
|
if log_fn is None:
|
|
log_fn = lambda msg: print(f"[POOL] {msg}", flush=True)
|
|
|
|
all_results: List[Dict] = []
|
|
|
|
try:
|
|
ctx = mp.get_context('spawn')
|
|
self._pool = ProcessPoolExecutor(
|
|
max_workers=self.max_workers, mp_context=ctx,
|
|
)
|
|
|
|
futures = {
|
|
self._pool.submit(worker_fn, chunk): i
|
|
for i, chunk in enumerate(chunks)
|
|
}
|
|
|
|
completed = 0
|
|
t0 = time.time()
|
|
|
|
for future in as_completed(futures):
|
|
try:
|
|
chunk_results = future.result()
|
|
all_results.extend(chunk_results)
|
|
except Exception as e:
|
|
log_fn(f"Chunk error: {e}")
|
|
|
|
completed += 1
|
|
elapsed = time.time() - t0
|
|
pct = completed * 100 // len(chunks)
|
|
log_fn(f"Progress: {completed}/{len(chunks)} ({pct}%)")
|
|
|
|
except Exception as e:
|
|
log_fn(f"Pool error: {e}")
|
|
|
|
finally:
|
|
if self._pool:
|
|
self._pool.shutdown(wait=False, cancel_futures=True)
|
|
time.sleep(0.5)
|
|
killed = self._kill_orphans()
|
|
if killed > 0:
|
|
log_fn(f"Cleaned up {killed} orphaned workers")
|
|
|
|
return all_results
|
|
|
|
@staticmethod
|
|
def _kill_orphans() -> int:
|
|
"""Kill orphaned rfcp-server worker processes."""
|
|
my_pid = os.getpid()
|
|
killed = 0
|
|
|
|
if sys.platform == 'win32':
|
|
try:
|
|
result = subprocess.run(
|
|
['tasklist', '/FI', 'IMAGENAME eq rfcp-server.exe', '/FO', 'CSV', '/NH'],
|
|
capture_output=True, text=True, timeout=5,
|
|
)
|
|
for line in result.stdout.strip().split('\n'):
|
|
if 'rfcp-server.exe' not in line:
|
|
continue
|
|
parts = line.split(',')
|
|
if len(parts) >= 2:
|
|
pid_str = parts[1].strip().strip('"')
|
|
try:
|
|
pid = int(pid_str)
|
|
if pid != my_pid:
|
|
subprocess.run(
|
|
['taskkill', '/F', '/PID', str(pid)],
|
|
capture_output=True, timeout=5,
|
|
)
|
|
killed += 1
|
|
except (ValueError, subprocess.TimeoutExpired):
|
|
pass
|
|
except Exception:
|
|
pass
|
|
else:
|
|
try:
|
|
result = subprocess.run(
|
|
['pgrep', '-f', 'rfcp-server'],
|
|
capture_output=True, text=True, timeout=5,
|
|
)
|
|
for pid_str in result.stdout.strip().split('\n'):
|
|
if not pid_str:
|
|
continue
|
|
try:
|
|
pid = int(pid_str)
|
|
if pid != my_pid:
|
|
os.kill(pid, 9)
|
|
killed += 1
|
|
except (ValueError, ProcessLookupError, PermissionError):
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
return killed
|