""" Managed process pool with automatic cleanup. """ import os import sys import subprocess import time import multiprocessing as mp from concurrent.futures import ProcessPoolExecutor, as_completed from typing import List, Dict, Tuple, Optional, Callable class ManagedProcessPool: """ Process pool wrapper with: - Automatic cleanup on exit - Worker process kill on failure - Progress reporting """ def __init__(self, max_workers: int = 6): self.max_workers = min(max_workers, 6) self._pool: Optional[ProcessPoolExecutor] = None def map_chunks( self, worker_fn: Callable, chunks: List[tuple], log_fn: Optional[Callable] = None, ) -> List[Dict]: """ Submit chunks to the pool and collect results. Args: worker_fn: Function to call for each chunk chunks: List of (chunk_data, *args) tuples log_fn: Progress logging function Returns: Flattened list of result dicts """ if log_fn is None: log_fn = lambda msg: print(f"[POOL] {msg}", flush=True) all_results: List[Dict] = [] try: ctx = mp.get_context('spawn') self._pool = ProcessPoolExecutor( max_workers=self.max_workers, mp_context=ctx, ) futures = { self._pool.submit(worker_fn, chunk): i for i, chunk in enumerate(chunks) } completed = 0 t0 = time.time() for future in as_completed(futures): try: chunk_results = future.result() all_results.extend(chunk_results) except Exception as e: log_fn(f"Chunk error: {e}") completed += 1 elapsed = time.time() - t0 pct = completed * 100 // len(chunks) log_fn(f"Progress: {completed}/{len(chunks)} ({pct}%)") except Exception as e: log_fn(f"Pool error: {e}") finally: if self._pool: self._pool.shutdown(wait=False, cancel_futures=True) time.sleep(0.5) killed = self._kill_orphans() if killed > 0: log_fn(f"Cleaned up {killed} orphaned workers") return all_results @staticmethod def _kill_orphans() -> int: """Kill orphaned rfcp-server worker processes.""" my_pid = os.getpid() killed = 0 if sys.platform == 'win32': try: result = subprocess.run( ['tasklist', '/FI', 'IMAGENAME eq rfcp-server.exe', '/FO', 'CSV', '/NH'], capture_output=True, text=True, timeout=5, ) for line in result.stdout.strip().split('\n'): if 'rfcp-server.exe' not in line: continue parts = line.split(',') if len(parts) >= 2: pid_str = parts[1].strip().strip('"') try: pid = int(pid_str) if pid != my_pid: subprocess.run( ['taskkill', '/F', '/PID', str(pid)], capture_output=True, timeout=5, ) killed += 1 except (ValueError, subprocess.TimeoutExpired): pass except Exception: pass else: try: result = subprocess.run( ['pgrep', '-f', 'rfcp-server'], capture_output=True, text=True, timeout=5, ) for pid_str in result.stdout.strip().split('\n'): if not pid_str: continue try: pid = int(pid_str) if pid != my_pid: os.kill(pid, 9) killed += 1 except (ValueError, ProcessLookupError, PermissionError): pass except Exception: pass return killed