Files
rfcp/backend/app/parallel/pool.py
mytec defa3ad440 @mytec: feat: Phase 3.0 Architecture Refactor
Major refactoring of RFCP backend:
- Modular propagation models (8 models)
- SharedMemoryManager for terrain data
- ProcessPoolExecutor parallel processing
- WebSocket progress streaming
- Building filtering pipeline (351k → 15k)
- 82 unit tests

Performance: Standard preset 38s → 5s (7.6x speedup)

Known issue: Detailed preset timeout (fix in 3.1.0)
2026-02-01 23:12:26 +02:00

137 lines
4.3 KiB
Python

"""
Managed process pool with automatic cleanup.
"""
import os
import sys
import subprocess
import time
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import List, Dict, Tuple, Optional, Callable
class ManagedProcessPool:
"""
Process pool wrapper with:
- Automatic cleanup on exit
- Worker process kill on failure
- Progress reporting
"""
def __init__(self, max_workers: int = 6):
self.max_workers = min(max_workers, 6)
self._pool: Optional[ProcessPoolExecutor] = None
def map_chunks(
self,
worker_fn: Callable,
chunks: List[tuple],
log_fn: Optional[Callable] = None,
) -> List[Dict]:
"""
Submit chunks to the pool and collect results.
Args:
worker_fn: Function to call for each chunk
chunks: List of (chunk_data, *args) tuples
log_fn: Progress logging function
Returns:
Flattened list of result dicts
"""
if log_fn is None:
log_fn = lambda msg: print(f"[POOL] {msg}", flush=True)
all_results: List[Dict] = []
try:
ctx = mp.get_context('spawn')
self._pool = ProcessPoolExecutor(
max_workers=self.max_workers, mp_context=ctx,
)
futures = {
self._pool.submit(worker_fn, chunk): i
for i, chunk in enumerate(chunks)
}
completed = 0
t0 = time.time()
for future in as_completed(futures):
try:
chunk_results = future.result()
all_results.extend(chunk_results)
except Exception as e:
log_fn(f"Chunk error: {e}")
completed += 1
elapsed = time.time() - t0
pct = completed * 100 // len(chunks)
log_fn(f"Progress: {completed}/{len(chunks)} ({pct}%)")
except Exception as e:
log_fn(f"Pool error: {e}")
finally:
if self._pool:
self._pool.shutdown(wait=False, cancel_futures=True)
time.sleep(0.5)
killed = self._kill_orphans()
if killed > 0:
log_fn(f"Cleaned up {killed} orphaned workers")
return all_results
@staticmethod
def _kill_orphans() -> int:
"""Kill orphaned rfcp-server worker processes."""
my_pid = os.getpid()
killed = 0
if sys.platform == 'win32':
try:
result = subprocess.run(
['tasklist', '/FI', 'IMAGENAME eq rfcp-server.exe', '/FO', 'CSV', '/NH'],
capture_output=True, text=True, timeout=5,
)
for line in result.stdout.strip().split('\n'):
if 'rfcp-server.exe' not in line:
continue
parts = line.split(',')
if len(parts) >= 2:
pid_str = parts[1].strip().strip('"')
try:
pid = int(pid_str)
if pid != my_pid:
subprocess.run(
['taskkill', '/F', '/PID', str(pid)],
capture_output=True, timeout=5,
)
killed += 1
except (ValueError, subprocess.TimeoutExpired):
pass
except Exception:
pass
else:
try:
result = subprocess.run(
['pgrep', '-f', 'rfcp-server'],
capture_output=True, text=True, timeout=5,
)
for pid_str in result.stdout.strip().split('\n'):
if not pid_str:
continue
try:
pid = int(pid_str)
if pid != my_pid:
os.kill(pid, 9)
killed += 1
except (ValueError, ProcessLookupError, PermissionError):
pass
except Exception:
pass
return killed