# RFCP - Iteration 3.3.0: Performance Architecture Refactor ## Overview Major refactoring based on research into professional RF tools (Signal-Server, SPLAT!, CloudRF SLEIPNIR, Sionna RT). **Root cause identified:** Pickle serialization overhead dominates computation time. - DP_TIMING shows: 0.6-0.9ms per point (actual calculation) - Real throughput: 258ms per point - **99% of time is IPC overhead, not calculation!** **Target:** Reduce 5km Detailed from timeout (300s) to <30s --- ## Part 1: Eliminate Pickle Overhead (CRITICAL) ### 1.1 Shared Memory for Buildings Currently terrain is in shared memory, but **15,000 buildings are pickled for every chunk**. **File:** `backend/app/services/parallel_coverage_service.py` ```python from multiprocessing import shared_memory import numpy as np def buildings_to_shared_memory(buildings: list) -> tuple: """ Convert buildings to numpy arrays and store in shared memory. Returns: (shm_name, shape, dtype) for reconstruction in workers """ # Extract building data into numpy arrays # For each building we need: lat, lon, height, num_vertices, vertices_flat # Simplified: store as structured array building_data = [] all_vertices = [] vertex_offsets = [0] for b in buildings: coords = extract_coords(b) height = b.get('properties', {}).get('height', 10.0) building_data.append({ 'lat': np.mean([c[1] for c in coords]), 'lon': np.mean([c[0] for c in coords]), 'height': height, 'vertex_start': len(all_vertices), 'vertex_count': len(coords) }) all_vertices.extend(coords) vertex_offsets.append(len(all_vertices)) # Create numpy arrays buildings_arr = np.array([ (b['lat'], b['lon'], b['height'], b['vertex_start'], b['vertex_count']) for b in building_data ], dtype=[ ('lat', 'f8'), ('lon', 'f8'), ('height', 'f4'), ('vertex_start', 'i4'), ('vertex_count', 'i2') ]) vertices_arr = np.array(all_vertices, dtype=[('lon', 'f8'), ('lat', 'f8')]) # Store in shared memory shm_buildings = shared_memory.SharedMemory( create=True, size=buildings_arr.nbytes, name=f"rfcp_buildings_{os.getpid()}" ) shm_vertices = shared_memory.SharedMemory( create=True, size=vertices_arr.nbytes, name=f"rfcp_vertices_{os.getpid()}" ) # Copy data np.ndarray(buildings_arr.shape, dtype=buildings_arr.dtype, buffer=shm_buildings.buf)[:] = buildings_arr np.ndarray(vertices_arr.shape, dtype=vertices_arr.dtype, buffer=shm_vertices.buf)[:] = vertices_arr return { 'buildings': (shm_buildings.name, buildings_arr.shape, buildings_arr.dtype), 'vertices': (shm_vertices.name, vertices_arr.shape, vertices_arr.dtype) } def buildings_from_shared_memory(shm_info: dict) -> tuple: """Reconstruct buildings arrays from shared memory in worker.""" shm_b = shared_memory.SharedMemory(name=shm_info['buildings'][0]) shm_v = shared_memory.SharedMemory(name=shm_info['vertices'][0]) buildings = np.ndarray( shm_info['buildings'][1], dtype=shm_info['buildings'][2], buffer=shm_b.buf ) vertices = np.ndarray( shm_info['vertices'][1], dtype=shm_info['vertices'][2], buffer=shm_v.buf ) return buildings, vertices, shm_b, shm_v ``` ### 1.2 Increase Batch Size **Current:** 7 chunks of ~144 points = high IPC overhead per point **Target:** 2-3 chunks of ~300-400 points = amortize IPC cost ```python # In parallel_coverage_service.py def calculate_optimal_chunk_size(total_points: int, num_workers: int) -> int: """ Calculate chunk size to minimize IPC overhead. Rule: computation_time should be 10-100x serialization_time For RF calculations: ~1ms compute, ~50ms serialize So batch at least 500 points to make compute dominate. """ min_chunk = 300 # Minimum to amortize IPC max_chunk = 1000 # Maximum for memory ideal_chunks = max(2, num_workers) # At least 2 chunks per worker ideal_size = total_points // ideal_chunks return max(min_chunk, min(max_chunk, ideal_size)) ``` ### 1.3 Pre-build Spatial Index Once Currently spatial index may be rebuilt per-chunk. Build once and share reference. ```python class SharedSpatialIndex: """Spatial index that can be shared across processes via shared memory.""" def __init__(self, buildings_shm_info: dict): self.buildings, self.vertices, _, _ = buildings_from_shared_memory(buildings_shm_info) self._build_grid() def _build_grid(self): """Build simple grid-based spatial index.""" # Grid cells of ~100m self.cell_size = 0.001 # ~111m in degrees self.grid = defaultdict(list) for i, b in enumerate(self.buildings): cell_x = int(b['lon'] / self.cell_size) cell_y = int(b['lat'] / self.cell_size) self.grid[(cell_x, cell_y)].append(i) def query_radius(self, lat: float, lon: float, radius_m: float) -> list: """Get building indices within radius.""" radius_deg = radius_m / 111000 cells_to_check = int(radius_deg / self.cell_size) + 1 center_x = int(lon / self.cell_size) center_y = int(lat / self.cell_size) result = [] for dx in range(-cells_to_check, cells_to_check + 1): for dy in range(-cells_to_check, cells_to_check + 1): result.extend(self.grid.get((center_x + dx, center_y + dy), [])) return result ``` --- ## Part 2: Radial Calculation Pattern (Signal-Server style) Instead of grid, calculate along radial spokes for faster coverage estimation. ### 2.1 Radial Engine **File:** `backend/app/services/radial_coverage_service.py` (NEW) ```python """ Radial coverage calculation engine inspired by Signal-Server/SPLAT! Instead of calculating every grid point independently: 1. Cast rays from TX in all directions (0-360°) 2. Sample terrain along each ray (profile) 3. Apply propagation model to profile 4. Interpolate between rays for final grid This is 10-50x faster because: - Terrain profiles are linear (cache-friendly) - No building geometry per-point (use clutter model) - Embarrassingly parallel by azimuth """ import numpy as np from concurrent.futures import ThreadPoolExecutor import math class RadialCoverageEngine: def __init__(self, terrain_service, propagation_model): self.terrain = terrain_service self.model = propagation_model def calculate_coverage( self, tx_lat: float, tx_lon: float, tx_height: float, radius_m: float, frequency_mhz: float, tx_power_dbm: float, num_radials: int = 360, # 1° resolution samples_per_radial: int = 100, num_threads: int = 8 ) -> dict: """ Calculate coverage using radial ray-casting. Returns dict with 'radials' (raw data) and 'grid' (interpolated). """ # Pre-load terrain tiles self._preload_terrain(tx_lat, tx_lon, radius_m) # Calculate radials in parallel (by azimuth sectors) sector_size = num_radials // num_threads with ThreadPoolExecutor(max_workers=num_threads) as executor: futures = [] for i in range(num_threads): start_az = i * sector_size end_az = (i + 1) * sector_size if i < num_threads - 1 else num_radials futures.append(executor.submit( self._calculate_sector, tx_lat, tx_lon, tx_height, radius_m, frequency_mhz, tx_power_dbm, start_az, end_az, samples_per_radial )) # Collect results all_radials = [] for f in futures: all_radials.extend(f.result()) return { 'radials': all_radials, 'center': (tx_lat, tx_lon), 'radius': radius_m, 'num_radials': num_radials } def _calculate_sector( self, tx_lat, tx_lon, tx_height, radius_m, frequency_mhz, tx_power_dbm, start_az, end_az, samples_per_radial ) -> list: """Calculate radials for one azimuth sector.""" results = [] for az in range(start_az, end_az): radial = self._calculate_radial( tx_lat, tx_lon, tx_height, radius_m, frequency_mhz, tx_power_dbm, az, samples_per_radial ) results.append(radial) return results def _calculate_radial( self, tx_lat, tx_lon, tx_height, radius_m, frequency_mhz, tx_power_dbm, azimuth_deg, num_samples ) -> dict: """ Calculate signal strength along one radial. Uses terrain profile + Longley-Rice style calculation. """ az_rad = math.radians(azimuth_deg) cos_lat = math.cos(math.radians(tx_lat)) # Sample points along radial distances = np.linspace(100, radius_m, num_samples) # Calculate lat/lon for each sample lat_offsets = (distances / 111000) * math.cos(az_rad) lon_offsets = (distances / (111000 * cos_lat)) * math.sin(az_rad) lats = tx_lat + lat_offsets lons = tx_lon + lon_offsets # Get terrain profile elevations = np.array([ self.terrain.get_elevation_sync(lat, lon) for lat, lon in zip(lats, lons) ]) tx_elevation = self.terrain.get_elevation_sync(tx_lat, tx_lon) # Calculate path loss for each point rsrp_values = [] los_flags = [] for i, (dist, elev) in enumerate(zip(distances, elevations)): # Simple LOS check using terrain profile up to this point profile = elevations[:i+1] has_los = self._check_los_profile( tx_elevation + tx_height, elev + 1.5, # RX height profile, distances[:i+1] ) # Path loss (using configured model) path_loss = self.model.calculate_path_loss( frequency_mhz, dist, tx_height, 1.5, has_los=has_los ) # Add diffraction loss if NLOS if not has_los: diff_loss = self._calculate_diffraction_loss( tx_elevation + tx_height, elev + 1.5, profile, distances[:i+1], frequency_mhz ) path_loss += diff_loss rsrp = tx_power_dbm - path_loss rsrp_values.append(rsrp) los_flags.append(has_los) return { 'azimuth': azimuth_deg, 'distances': distances.tolist(), 'lats': lats.tolist(), 'lons': lons.tolist(), 'rsrp': rsrp_values, 'has_los': los_flags } def _check_los_profile(self, tx_h, rx_h, profile, distances) -> bool: """Check LOS using terrain profile (Fresnel zone clearance).""" if len(profile) < 2: return True total_dist = distances[-1] # Line from TX to RX for i in range(1, len(profile) - 1): d = distances[i] # Expected height on LOS line expected_h = tx_h + (rx_h - tx_h) * (d / total_dist) # Actual terrain height actual_h = profile[i] if actual_h > expected_h - 0.6: # Small clearance margin return False return True def _calculate_diffraction_loss(self, tx_h, rx_h, profile, distances, freq_mhz) -> float: """Calculate diffraction loss using Deygout method.""" # Find main obstacle max_v = -999 max_idx = -1 total_dist = distances[-1] wavelength = 300 / freq_mhz # meters for i in range(1, len(profile) - 1): d1 = distances[i] d2 = total_dist - d1 # Height of LOS line at this point los_h = tx_h + (rx_h - tx_h) * (d1 / total_dist) # Obstacle height above LOS h = profile[i] - los_h if h > 0: # Fresnel parameter v = h * math.sqrt(2 * (d1 + d2) / (wavelength * d1 * d2)) if v > max_v: max_v = v max_idx = i if max_v < -0.78: return 0.0 # Knife-edge diffraction loss (ITU-R P.526) if max_v < 0: loss = 6.02 + 9.11 * max_v - 1.27 * max_v * max_v elif max_v < 2.4: loss = 6.02 + 9.11 * max_v + 1.65 * max_v * max_v else: loss = 12.953 + 20 * math.log10(max_v) return max(0, loss) ``` --- ## Part 3: Propagation Model Updates ### 3.1 Add Longley-Rice ITM Support **File:** `backend/app/services/propagation_models/itm_model.py` (NEW) ```python """ Longley-Rice Irregular Terrain Model (ITM) Best for: VHF/UHF terrain-based propagation (20 MHz - 20 GHz) Based on: itmlogic Python package Key parameters: - Earth dielectric constant (eps): 4-81 (15 typical for ground) - Ground conductivity (sgm): 0.001-5.0 S/m - Atmospheric refractivity (ens): 250-400 N-units (301 typical) - Climate: 1=Equatorial, 2=Continental Subtropical, etc. """ try: from itmlogic import itmlogic_p2p HAS_ITMLOGIC = True except ImportError: HAS_ITMLOGIC = False from .base_model import BasePropagationModel, PropagationInput, PropagationResult class LongleyRiceModel(BasePropagationModel): """Longley-Rice ITM propagation model.""" name = "Longley-Rice-ITM" frequency_range = (20, 20000) # MHz distance_range = (1000, 2000000) # meters # Default ITM parameters DEFAULT_PARAMS = { 'eps': 15.0, # Earth dielectric constant 'sgm': 0.005, # Ground conductivity (S/m) 'ens': 301.0, # Atmospheric refractivity (N-units) 'pol': 0, # Polarization: 0=horizontal, 1=vertical 'mdvar': 12, # Mode of variability 'klim': 5, # Climate: 5=Continental Temperate } # Ground parameters by type GROUND_PARAMS = { 'average': {'eps': 15.0, 'sgm': 0.005}, 'poor': {'eps': 4.0, 'sgm': 0.001}, 'good': {'eps': 25.0, 'sgm': 0.020}, 'fresh_water': {'eps': 81.0, 'sgm': 0.010}, 'sea_water': {'eps': 81.0, 'sgm': 5.0}, 'forest': {'eps': 12.0, 'sgm': 0.003}, } def __init__(self, ground_type: str = 'average', climate: int = 5): if not HAS_ITMLOGIC: raise ImportError("itmlogic package required: pip install itmlogic") self.params = self.DEFAULT_PARAMS.copy() if ground_type in self.GROUND_PARAMS: self.params.update(self.GROUND_PARAMS[ground_type]) self.params['klim'] = climate def calculate(self, input: PropagationInput) -> PropagationResult: """Calculate path loss using ITM point-to-point mode.""" # ITM requires terrain profile if not hasattr(input, 'terrain_profile') or input.terrain_profile is None: # Fallback to free-space if no terrain return self._free_space_fallback(input) result = itmlogic_p2p( input.terrain_profile, # Elevation samples input.frequency_mhz, input.tx_height_m, input.rx_height_m, self.params['eps'], self.params['sgm'], self.params['ens'], self.params['pol'], self.params['mdvar'], self.params['klim'] ) return PropagationResult( path_loss_db=result['dbloss'], model_name=self.name, details={ 'mode': result.get('propmode', 'unknown'), 'variability': result.get('var', 0), } ) def _free_space_fallback(self, input: PropagationInput) -> PropagationResult: """Free-space path loss when no terrain available.""" fspl = 20 * np.log10(input.distance_m) + 20 * np.log10(input.frequency_mhz) - 27.55 return PropagationResult( path_loss_db=fspl, model_name=f"{self.name} (FSPL fallback)", details={'mode': 'free_space'} ) ``` ### 3.2 Add VHF/UHF Model Selection **File:** `backend/app/services/propagation_models/model_selector.py` ```python """ Automatic propagation model selection based on frequency and environment. """ def select_model_for_frequency( frequency_mhz: float, environment: str = 'urban', has_terrain: bool = True ) -> BasePropagationModel: """ Select appropriate propagation model. Frequency bands: - VHF: 30-300 MHz (tactical radios, FM broadcast) - UHF: 300-3000 MHz (tactical radios, TV, early cellular) - Cellular: 700-2600 MHz (LTE bands) - mmWave: 24-100 GHz (5G) Decision tree: 1. VHF/UHF with terrain → Longley-Rice ITM 2. Urban cellular → COST-231 Hata 3. Suburban/rural cellular → Okumura-Hata 4. mmWave → 3GPP 38.901 """ # VHF (30-300 MHz) if 30 <= frequency_mhz <= 300: if has_terrain: return LongleyRiceModel(ground_type='average', climate=5) else: return FreeSpaceModel() # Fallback # UHF (300-1000 MHz) elif 300 < frequency_mhz <= 1000: if has_terrain: return LongleyRiceModel(ground_type='average', climate=5) else: return OkumuraHataModel(environment=environment) # Cellular (1000-2600 MHz) elif 1000 < frequency_mhz <= 2600: if environment == 'urban': return Cost231HataModel() else: return OkumuraHataModel(environment=environment) # Higher frequencies else: return FreeSpaceModel() # Or implement 3GPP 38.901 # Frequency band constants for UI FREQUENCY_BANDS = { 'VHF_LOW': (30, 88, "VHF Low (30-88 MHz) - Military/Public Safety"), 'VHF_HIGH': (136, 174, "VHF High (136-174 MHz) - Marine/Aviation"), 'UHF_LOW': (400, 512, "UHF (400-512 MHz) - Public Safety/Tactical"), 'UHF_TV': (470, 862, "UHF TV (470-862 MHz)"), 'LTE_700': (700, 800, "LTE Band 28/20 (700-800 MHz)"), 'LTE_900': (880, 960, "LTE Band 8 (900 MHz)"), 'LTE_1800': (1710, 1880, "LTE Band 3 (1800 MHz)"), 'LTE_2100': (1920, 2170, "LTE Band 1 (2100 MHz)"), 'LTE_2600': (2500, 2690, "LTE Band 7 (2600 MHz)"), } ``` --- ## Part 4: Progress Bar Fix (WebSocket) ### 4.1 Proper Progress Streaming The 5% bug persists because WebSocket messages aren't reaching frontend. **Debug approach:** ```python # In coverage calculation, add explicit progress logging async def calculate_with_progress(self, ...): total_points = len(points) for i, chunk_result in enumerate(chunk_results): progress = int((i + 1) / total_chunks * 100) # Log to console AND send via WebSocket logger.info(f"[PROGRESS] {progress}% - chunk {i+1}/{total_chunks}") if progress_callback: await progress_callback(progress, f"Calculating... {i+1}/{total_chunks}") await asyncio.sleep(0) # Yield to event loop ``` **Frontend fix - check WebSocket subscription:** ```typescript // In App.tsx or CoverageStore useEffect(() => { const ws = new WebSocket('ws://localhost:8888/ws/coverage'); ws.onmessage = (event) => { const data = JSON.parse(event.data); console.log('[WS] Received:', data); // DEBUG if (data.type === 'progress') { setProgress(data.progress); setProgressStatus(data.status); } }; ws.onerror = (e) => console.error('[WS] Error:', e); ws.onclose = () => console.log('[WS] Closed'); return () => ws.close(); }, []); ``` --- ## Part 5: Testing & Validation ### 5.1 Performance Benchmarks After refactoring, expected performance: | Scenario | Before | After | Speedup | |----------|--------|-------|---------| | 5km Standard | 5s | 3s | 1.7x | | 5km Detailed | timeout | 25s | 12x | | 10km Standard | 30s | 10s | 3x | | 10km Detailed | timeout | 60s | 5x | ### 5.2 Test Commands ```powershell # Quick test cd D:\root\rfcp\installer .\test-detailed-quick.bat # Check for [PROGRESS] logs in output # Check for [DP_TIMING] logs # Verify shared memory cleanup # Check Task Manager for memory after calculation ``` --- ## Implementation Order 1. **Shared Memory for Buildings** (biggest impact) - Part 1.1 2. **Increase Batch Size** - Part 1.2 3. **Progress Bar Debug** - Part 4 4. **Radial Engine** (optional, for preview mode) - Part 2 5. **Longley-Rice ITM** (for VHF/UHF) - Part 3 --- ## Dependencies to Add ``` # requirements.txt additions itmlogic>=0.1.0 # Longley-Rice ITM implementation ``` --- ## Commit Message ``` feat: Iteration 3.3.0 - Performance Architecture Refactor Performance: - Add shared memory for buildings (eliminate pickle overhead) - Increase batch size to 300-500 points (amortize IPC) - Add radial coverage engine (Signal-Server style) Propagation Models: - Add Longley-Rice ITM for VHF/UHF (20 MHz - 20 GHz) - Add automatic model selection by frequency - Add frequency band constants for UI Bug Fixes: - Debug and fix WebSocket progress (5% stuck bug) Expected: 5km Detailed from timeout → ~25s (12x speedup) ``` --- ## Notes for Claude Code This is a significant refactoring. Approach step by step: 1. First implement shared memory for buildings 2. Test that alone - should see major speedup 3. Then increase batch size 4. Test again 5. Then tackle progress bar 6. Radial engine and ITM can be separate iterations if needed The key insight: **99% of time is IPC overhead, not calculation**. Fixing pickle serialization is the #1 priority. --- *"Fast per-point means nothing if IPC eats your lunch"* 🍽️