Files
rfcp/RFCP-Iteration-3.3.0-Architecture-Refactor.md
2026-02-02 13:48:30 +02:00

22 KiB

RFCP - Iteration 3.3.0: Performance Architecture Refactor

Overview

Major refactoring based on research into professional RF tools (Signal-Server, SPLAT!, CloudRF SLEIPNIR, Sionna RT).

Root cause identified: Pickle serialization overhead dominates computation time.

  • DP_TIMING shows: 0.6-0.9ms per point (actual calculation)
  • Real throughput: 258ms per point
  • 99% of time is IPC overhead, not calculation!

Target: Reduce 5km Detailed from timeout (300s) to <30s


Part 1: Eliminate Pickle Overhead (CRITICAL)

1.1 Shared Memory for Buildings

Currently terrain is in shared memory, but 15,000 buildings are pickled for every chunk.

File: backend/app/services/parallel_coverage_service.py

from multiprocessing import shared_memory
import numpy as np

def buildings_to_shared_memory(buildings: list) -> tuple:
    """
    Convert buildings to numpy arrays and store in shared memory.
    
    Returns: (shm_name, shape, dtype) for reconstruction in workers
    """
    # Extract building data into numpy arrays
    # For each building we need: lat, lon, height, num_vertices, vertices_flat
    
    # Simplified: store as structured array
    building_data = []
    all_vertices = []
    vertex_offsets = [0]
    
    for b in buildings:
        coords = extract_coords(b)
        height = b.get('properties', {}).get('height', 10.0)
        
        building_data.append({
            'lat': np.mean([c[1] for c in coords]),
            'lon': np.mean([c[0] for c in coords]),
            'height': height,
            'vertex_start': len(all_vertices),
            'vertex_count': len(coords)
        })
        all_vertices.extend(coords)
        vertex_offsets.append(len(all_vertices))
    
    # Create numpy arrays
    buildings_arr = np.array([
        (b['lat'], b['lon'], b['height'], b['vertex_start'], b['vertex_count'])
        for b in building_data
    ], dtype=[
        ('lat', 'f8'), ('lon', 'f8'), ('height', 'f4'),
        ('vertex_start', 'i4'), ('vertex_count', 'i2')
    ])
    
    vertices_arr = np.array(all_vertices, dtype=[('lon', 'f8'), ('lat', 'f8')])
    
    # Store in shared memory
    shm_buildings = shared_memory.SharedMemory(
        create=True, 
        size=buildings_arr.nbytes,
        name=f"rfcp_buildings_{os.getpid()}"
    )
    shm_vertices = shared_memory.SharedMemory(
        create=True,
        size=vertices_arr.nbytes,
        name=f"rfcp_vertices_{os.getpid()}"
    )
    
    # Copy data
    np.ndarray(buildings_arr.shape, dtype=buildings_arr.dtype, 
               buffer=shm_buildings.buf)[:] = buildings_arr
    np.ndarray(vertices_arr.shape, dtype=vertices_arr.dtype,
               buffer=shm_vertices.buf)[:] = vertices_arr
    
    return {
        'buildings': (shm_buildings.name, buildings_arr.shape, buildings_arr.dtype),
        'vertices': (shm_vertices.name, vertices_arr.shape, vertices_arr.dtype)
    }


def buildings_from_shared_memory(shm_info: dict) -> tuple:
    """Reconstruct buildings arrays from shared memory in worker."""
    shm_b = shared_memory.SharedMemory(name=shm_info['buildings'][0])
    shm_v = shared_memory.SharedMemory(name=shm_info['vertices'][0])
    
    buildings = np.ndarray(
        shm_info['buildings'][1], 
        dtype=shm_info['buildings'][2],
        buffer=shm_b.buf
    )
    vertices = np.ndarray(
        shm_info['vertices'][1],
        dtype=shm_info['vertices'][2], 
        buffer=shm_v.buf
    )
    
    return buildings, vertices, shm_b, shm_v

1.2 Increase Batch Size

Current: 7 chunks of ~144 points = high IPC overhead per point Target: 2-3 chunks of ~300-400 points = amortize IPC cost

# In parallel_coverage_service.py
def calculate_optimal_chunk_size(total_points: int, num_workers: int) -> int:
    """
    Calculate chunk size to minimize IPC overhead.
    
    Rule: computation_time should be 10-100x serialization_time
    For RF calculations: ~1ms compute, ~50ms serialize
    So batch at least 500 points to make compute dominate.
    """
    min_chunk = 300  # Minimum to amortize IPC
    max_chunk = 1000  # Maximum for memory
    
    ideal_chunks = max(2, num_workers)  # At least 2 chunks per worker
    ideal_size = total_points // ideal_chunks
    
    return max(min_chunk, min(max_chunk, ideal_size))

1.3 Pre-build Spatial Index Once

Currently spatial index may be rebuilt per-chunk. Build once and share reference.

class SharedSpatialIndex:
    """Spatial index that can be shared across processes via shared memory."""
    
    def __init__(self, buildings_shm_info: dict):
        self.buildings, self.vertices, _, _ = buildings_from_shared_memory(buildings_shm_info)
        self._build_grid()
    
    def _build_grid(self):
        """Build simple grid-based spatial index."""
        # Grid cells of ~100m
        self.cell_size = 0.001  # ~111m in degrees
        self.grid = defaultdict(list)
        
        for i, b in enumerate(self.buildings):
            cell_x = int(b['lon'] / self.cell_size)
            cell_y = int(b['lat'] / self.cell_size)
            self.grid[(cell_x, cell_y)].append(i)
    
    def query_radius(self, lat: float, lon: float, radius_m: float) -> list:
        """Get building indices within radius."""
        radius_deg = radius_m / 111000
        cells_to_check = int(radius_deg / self.cell_size) + 1
        
        center_x = int(lon / self.cell_size)
        center_y = int(lat / self.cell_size)
        
        result = []
        for dx in range(-cells_to_check, cells_to_check + 1):
            for dy in range(-cells_to_check, cells_to_check + 1):
                result.extend(self.grid.get((center_x + dx, center_y + dy), []))
        
        return result

Part 2: Radial Calculation Pattern (Signal-Server style)

Instead of grid, calculate along radial spokes for faster coverage estimation.

2.1 Radial Engine

File: backend/app/services/radial_coverage_service.py (NEW)

"""
Radial coverage calculation engine inspired by Signal-Server/SPLAT!

Instead of calculating every grid point independently:
1. Cast rays from TX in all directions (0-360°)
2. Sample terrain along each ray (profile)
3. Apply propagation model to profile
4. Interpolate between rays for final grid

This is 10-50x faster because:
- Terrain profiles are linear (cache-friendly)
- No building geometry per-point (use clutter model)
- Embarrassingly parallel by azimuth
"""

import numpy as np
from concurrent.futures import ThreadPoolExecutor
import math

class RadialCoverageEngine:
    def __init__(self, terrain_service, propagation_model):
        self.terrain = terrain_service
        self.model = propagation_model
    
    def calculate_coverage(
        self,
        tx_lat: float, tx_lon: float, tx_height: float,
        radius_m: float,
        frequency_mhz: float,
        tx_power_dbm: float,
        num_radials: int = 360,  # 1° resolution
        samples_per_radial: int = 100,
        num_threads: int = 8
    ) -> dict:
        """
        Calculate coverage using radial ray-casting.
        
        Returns dict with 'radials' (raw data) and 'grid' (interpolated).
        """
        # Pre-load terrain tiles
        self._preload_terrain(tx_lat, tx_lon, radius_m)
        
        # Calculate radials in parallel (by azimuth sectors)
        sector_size = num_radials // num_threads
        
        with ThreadPoolExecutor(max_workers=num_threads) as executor:
            futures = []
            for i in range(num_threads):
                start_az = i * sector_size
                end_az = (i + 1) * sector_size if i < num_threads - 1 else num_radials
                
                futures.append(executor.submit(
                    self._calculate_sector,
                    tx_lat, tx_lon, tx_height,
                    radius_m, frequency_mhz, tx_power_dbm,
                    start_az, end_az, samples_per_radial
                ))
            
            # Collect results
            all_radials = []
            for f in futures:
                all_radials.extend(f.result())
        
        return {
            'radials': all_radials,
            'center': (tx_lat, tx_lon),
            'radius': radius_m,
            'num_radials': num_radials
        }
    
    def _calculate_sector(
        self,
        tx_lat, tx_lon, tx_height,
        radius_m, frequency_mhz, tx_power_dbm,
        start_az, end_az, samples_per_radial
    ) -> list:
        """Calculate radials for one azimuth sector."""
        results = []
        
        for az in range(start_az, end_az):
            radial = self._calculate_radial(
                tx_lat, tx_lon, tx_height,
                radius_m, frequency_mhz, tx_power_dbm,
                az, samples_per_radial
            )
            results.append(radial)
        
        return results
    
    def _calculate_radial(
        self,
        tx_lat, tx_lon, tx_height,
        radius_m, frequency_mhz, tx_power_dbm,
        azimuth_deg, num_samples
    ) -> dict:
        """
        Calculate signal strength along one radial.
        
        Uses terrain profile + Longley-Rice style calculation.
        """
        az_rad = math.radians(azimuth_deg)
        cos_lat = math.cos(math.radians(tx_lat))
        
        # Sample points along radial
        distances = np.linspace(100, radius_m, num_samples)
        
        # Calculate lat/lon for each sample
        lat_offsets = (distances / 111000) * math.cos(az_rad)
        lon_offsets = (distances / (111000 * cos_lat)) * math.sin(az_rad)
        
        lats = tx_lat + lat_offsets
        lons = tx_lon + lon_offsets
        
        # Get terrain profile
        elevations = np.array([
            self.terrain.get_elevation_sync(lat, lon)
            for lat, lon in zip(lats, lons)
        ])
        
        tx_elevation = self.terrain.get_elevation_sync(tx_lat, tx_lon)
        
        # Calculate path loss for each point
        rsrp_values = []
        los_flags = []
        
        for i, (dist, elev) in enumerate(zip(distances, elevations)):
            # Simple LOS check using terrain profile up to this point
            profile = elevations[:i+1]
            has_los = self._check_los_profile(
                tx_elevation + tx_height,
                elev + 1.5,  # RX height
                profile,
                distances[:i+1]
            )
            
            # Path loss (using configured model)
            path_loss = self.model.calculate_path_loss(
                frequency_mhz, dist, tx_height, 1.5,
                has_los=has_los
            )
            
            # Add diffraction loss if NLOS
            if not has_los:
                diff_loss = self._calculate_diffraction_loss(
                    tx_elevation + tx_height,
                    elev + 1.5,
                    profile,
                    distances[:i+1],
                    frequency_mhz
                )
                path_loss += diff_loss
            
            rsrp = tx_power_dbm - path_loss
            rsrp_values.append(rsrp)
            los_flags.append(has_los)
        
        return {
            'azimuth': azimuth_deg,
            'distances': distances.tolist(),
            'lats': lats.tolist(),
            'lons': lons.tolist(),
            'rsrp': rsrp_values,
            'has_los': los_flags
        }
    
    def _check_los_profile(self, tx_h, rx_h, profile, distances) -> bool:
        """Check LOS using terrain profile (Fresnel zone clearance)."""
        if len(profile) < 2:
            return True
        
        total_dist = distances[-1]
        
        # Line from TX to RX
        for i in range(1, len(profile) - 1):
            d = distances[i]
            # Expected height on LOS line
            expected_h = tx_h + (rx_h - tx_h) * (d / total_dist)
            # Actual terrain height
            actual_h = profile[i]
            
            if actual_h > expected_h - 0.6:  # Small clearance margin
                return False
        
        return True
    
    def _calculate_diffraction_loss(self, tx_h, rx_h, profile, distances, freq_mhz) -> float:
        """Calculate diffraction loss using Deygout method."""
        # Find main obstacle
        max_v = -999
        max_idx = -1
        total_dist = distances[-1]
        wavelength = 300 / freq_mhz  # meters
        
        for i in range(1, len(profile) - 1):
            d1 = distances[i]
            d2 = total_dist - d1
            
            # Height of LOS line at this point
            los_h = tx_h + (rx_h - tx_h) * (d1 / total_dist)
            
            # Obstacle height above LOS
            h = profile[i] - los_h
            
            if h > 0:
                # Fresnel parameter
                v = h * math.sqrt(2 * (d1 + d2) / (wavelength * d1 * d2))
                if v > max_v:
                    max_v = v
                    max_idx = i
        
        if max_v < -0.78:
            return 0.0
        
        # Knife-edge diffraction loss (ITU-R P.526)
        if max_v < 0:
            loss = 6.02 + 9.11 * max_v - 1.27 * max_v * max_v
        elif max_v < 2.4:
            loss = 6.02 + 9.11 * max_v + 1.65 * max_v * max_v
        else:
            loss = 12.953 + 20 * math.log10(max_v)
        
        return max(0, loss)

Part 3: Propagation Model Updates

3.1 Add Longley-Rice ITM Support

File: backend/app/services/propagation_models/itm_model.py (NEW)

"""
Longley-Rice Irregular Terrain Model (ITM)

Best for: VHF/UHF terrain-based propagation (20 MHz - 20 GHz)
Based on: itmlogic Python package

Key parameters:
- Earth dielectric constant (eps): 4-81 (15 typical for ground)
- Ground conductivity (sgm): 0.001-5.0 S/m
- Atmospheric refractivity (ens): 250-400 N-units (301 typical)
- Climate: 1=Equatorial, 2=Continental Subtropical, etc.
"""

try:
    from itmlogic import itmlogic_p2p
    HAS_ITMLOGIC = True
except ImportError:
    HAS_ITMLOGIC = False

from .base_model import BasePropagationModel, PropagationInput, PropagationResult

class LongleyRiceModel(BasePropagationModel):
    """Longley-Rice ITM propagation model."""
    
    name = "Longley-Rice-ITM"
    frequency_range = (20, 20000)  # MHz
    distance_range = (1000, 2000000)  # meters
    
    # Default ITM parameters
    DEFAULT_PARAMS = {
        'eps': 15.0,      # Earth dielectric constant
        'sgm': 0.005,     # Ground conductivity (S/m)
        'ens': 301.0,     # Atmospheric refractivity (N-units)
        'pol': 0,         # Polarization: 0=horizontal, 1=vertical
        'mdvar': 12,      # Mode of variability
        'klim': 5,        # Climate: 5=Continental Temperate
    }
    
    # Ground parameters by type
    GROUND_PARAMS = {
        'average':    {'eps': 15.0, 'sgm': 0.005},
        'poor':       {'eps': 4.0,  'sgm': 0.001},
        'good':       {'eps': 25.0, 'sgm': 0.020},
        'fresh_water': {'eps': 81.0, 'sgm': 0.010},
        'sea_water':  {'eps': 81.0, 'sgm': 5.0},
        'forest':     {'eps': 12.0, 'sgm': 0.003},
    }
    
    def __init__(self, ground_type: str = 'average', climate: int = 5):
        if not HAS_ITMLOGIC:
            raise ImportError("itmlogic package required: pip install itmlogic")
        
        self.params = self.DEFAULT_PARAMS.copy()
        if ground_type in self.GROUND_PARAMS:
            self.params.update(self.GROUND_PARAMS[ground_type])
        self.params['klim'] = climate
    
    def calculate(self, input: PropagationInput) -> PropagationResult:
        """Calculate path loss using ITM point-to-point mode."""
        
        # ITM requires terrain profile
        if not hasattr(input, 'terrain_profile') or input.terrain_profile is None:
            # Fallback to free-space if no terrain
            return self._free_space_fallback(input)
        
        result = itmlogic_p2p(
            input.terrain_profile,  # Elevation samples
            input.frequency_mhz,
            input.tx_height_m,
            input.rx_height_m,
            self.params['eps'],
            self.params['sgm'],
            self.params['ens'],
            self.params['pol'],
            self.params['mdvar'],
            self.params['klim']
        )
        
        return PropagationResult(
            path_loss_db=result['dbloss'],
            model_name=self.name,
            details={
                'mode': result.get('propmode', 'unknown'),
                'variability': result.get('var', 0),
            }
        )
    
    def _free_space_fallback(self, input: PropagationInput) -> PropagationResult:
        """Free-space path loss when no terrain available."""
        fspl = 20 * np.log10(input.distance_m) + 20 * np.log10(input.frequency_mhz) - 27.55
        return PropagationResult(
            path_loss_db=fspl,
            model_name=f"{self.name} (FSPL fallback)",
            details={'mode': 'free_space'}
        )

3.2 Add VHF/UHF Model Selection

File: backend/app/services/propagation_models/model_selector.py

"""
Automatic propagation model selection based on frequency and environment.
"""

def select_model_for_frequency(
    frequency_mhz: float,
    environment: str = 'urban',
    has_terrain: bool = True
) -> BasePropagationModel:
    """
    Select appropriate propagation model.
    
    Frequency bands:
    - VHF: 30-300 MHz (tactical radios, FM broadcast)
    - UHF: 300-3000 MHz (tactical radios, TV, early cellular)
    - Cellular: 700-2600 MHz (LTE bands)
    - mmWave: 24-100 GHz (5G)
    
    Decision tree:
    1. VHF/UHF with terrain → Longley-Rice ITM
    2. Urban cellular → COST-231 Hata
    3. Suburban/rural cellular → Okumura-Hata
    4. mmWave → 3GPP 38.901
    """
    
    # VHF (30-300 MHz)
    if 30 <= frequency_mhz <= 300:
        if has_terrain:
            return LongleyRiceModel(ground_type='average', climate=5)
        else:
            return FreeSpaceModel()  # Fallback
    
    # UHF (300-1000 MHz)
    elif 300 < frequency_mhz <= 1000:
        if has_terrain:
            return LongleyRiceModel(ground_type='average', climate=5)
        else:
            return OkumuraHataModel(environment=environment)
    
    # Cellular (1000-2600 MHz)
    elif 1000 < frequency_mhz <= 2600:
        if environment == 'urban':
            return Cost231HataModel()
        else:
            return OkumuraHataModel(environment=environment)
    
    # Higher frequencies
    else:
        return FreeSpaceModel()  # Or implement 3GPP 38.901


# Frequency band constants for UI
FREQUENCY_BANDS = {
    'VHF_LOW': (30, 88, "VHF Low (30-88 MHz) - Military/Public Safety"),
    'VHF_HIGH': (136, 174, "VHF High (136-174 MHz) - Marine/Aviation"),
    'UHF_LOW': (400, 512, "UHF (400-512 MHz) - Public Safety/Tactical"),
    'UHF_TV': (470, 862, "UHF TV (470-862 MHz)"),
    'LTE_700': (700, 800, "LTE Band 28/20 (700-800 MHz)"),
    'LTE_900': (880, 960, "LTE Band 8 (900 MHz)"),
    'LTE_1800': (1710, 1880, "LTE Band 3 (1800 MHz)"),
    'LTE_2100': (1920, 2170, "LTE Band 1 (2100 MHz)"),
    'LTE_2600': (2500, 2690, "LTE Band 7 (2600 MHz)"),
}

Part 4: Progress Bar Fix (WebSocket)

4.1 Proper Progress Streaming

The 5% bug persists because WebSocket messages aren't reaching frontend.

Debug approach:

# In coverage calculation, add explicit progress logging
async def calculate_with_progress(self, ...):
    total_points = len(points)
    
    for i, chunk_result in enumerate(chunk_results):
        progress = int((i + 1) / total_chunks * 100)
        
        # Log to console AND send via WebSocket
        logger.info(f"[PROGRESS] {progress}% - chunk {i+1}/{total_chunks}")
        
        if progress_callback:
            await progress_callback(progress, f"Calculating... {i+1}/{total_chunks}")
            await asyncio.sleep(0)  # Yield to event loop

Frontend fix - check WebSocket subscription:

// In App.tsx or CoverageStore
useEffect(() => {
    const ws = new WebSocket('ws://localhost:8888/ws/coverage');
    
    ws.onmessage = (event) => {
        const data = JSON.parse(event.data);
        console.log('[WS] Received:', data);  // DEBUG
        
        if (data.type === 'progress') {
            setProgress(data.progress);
            setProgressStatus(data.status);
        }
    };
    
    ws.onerror = (e) => console.error('[WS] Error:', e);
    ws.onclose = () => console.log('[WS] Closed');
    
    return () => ws.close();
}, []);

Part 5: Testing & Validation

5.1 Performance Benchmarks

After refactoring, expected performance:

Scenario Before After Speedup
5km Standard 5s 3s 1.7x
5km Detailed timeout 25s 12x
10km Standard 30s 10s 3x
10km Detailed timeout 60s 5x

5.2 Test Commands

# Quick test
cd D:\root\rfcp\installer
.\test-detailed-quick.bat

# Check for [PROGRESS] logs in output
# Check for [DP_TIMING] logs

# Verify shared memory cleanup
# Check Task Manager for memory after calculation

Implementation Order

  1. Shared Memory for Buildings (biggest impact) - Part 1.1
  2. Increase Batch Size - Part 1.2
  3. Progress Bar Debug - Part 4
  4. Radial Engine (optional, for preview mode) - Part 2
  5. Longley-Rice ITM (for VHF/UHF) - Part 3

Dependencies to Add

# requirements.txt additions
itmlogic>=0.1.0  # Longley-Rice ITM implementation

Commit Message

feat: Iteration 3.3.0 - Performance Architecture Refactor

Performance:
- Add shared memory for buildings (eliminate pickle overhead)
- Increase batch size to 300-500 points (amortize IPC)
- Add radial coverage engine (Signal-Server style)

Propagation Models:
- Add Longley-Rice ITM for VHF/UHF (20 MHz - 20 GHz)
- Add automatic model selection by frequency
- Add frequency band constants for UI

Bug Fixes:
- Debug and fix WebSocket progress (5% stuck bug)

Expected: 5km Detailed from timeout → ~25s (12x speedup)

Notes for Claude Code

This is a significant refactoring. Approach step by step:

  1. First implement shared memory for buildings
  2. Test that alone - should see major speedup
  3. Then increase batch size
  4. Test again
  5. Then tackle progress bar
  6. Radial engine and ITM can be separate iterations if needed

The key insight: 99% of time is IPC overhead, not calculation. Fixing pickle serialization is the #1 priority.


"Fast per-point means nothing if IPC eats your lunch" 🍽️