diff --git a/RFCP-Iteration-1.4-Advanced-Propagation.md b/RFCP-Iteration-1.4-Advanced-Propagation.md new file mode 100644 index 0000000..e2a8cab --- /dev/null +++ b/RFCP-Iteration-1.4-Advanced-Propagation.md @@ -0,0 +1,1574 @@ +# RFCP Backend - Iteration 1.4: Advanced Propagation Models + +**Date:** January 31, 2025 +**Type:** Backend Development +**Estimated:** 14-18 hours +**Location:** `/opt/rfcp/backend/` + +--- + +## ๐ŸŽฏ Goal + +Implement realistic RF propagation with building materials, dominant path analysis, street canyon model, and reflections. Add UI toggles for model selection. + +--- + +## ๐Ÿ“‹ Pre-reading + +1. `RFCP-Iteration-1.3-Coverage-OSM-Buildings.md` โ€” current state +2. ITU-R P.1411 โ€” Propagation for short-range outdoor +3. ITU-R P.2040 โ€” Building material properties + +--- + +## ๐Ÿ“Š Current State + +```bash +# Backend 1.3 complete +/opt/rfcp/backend/app/services/ +โ”œโ”€โ”€ terrain_service.py # SRTM elevation โœ… +โ”œโ”€โ”€ los_service.py # Line-of-sight + Fresnel โœ… +โ”œโ”€โ”€ buildings_service.py # OSM buildings fetch โœ… +โ””โ”€โ”€ coverage_service.py # Basic Okumura-Hata โœ… + +# Issues: +# - points_with_buildings: 0 (intersection bug) +# - No material consideration +# - No reflections +# - Slow (2 min for 1km radius) +``` + +--- + +## ๐Ÿ—๏ธ Architecture + +### Propagation Model Layers + +``` +โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” +โ”‚ RSRP Calculation โ”‚ +โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค +โ”‚ Base: Okumura-Hata / COST-231 โ”‚ +โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค +โ”‚ [1] Dominant Path - find best 2-3 rays โ”‚ +โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค +โ”‚ [2] Materials - wall penetration loss โ”‚ +โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค +โ”‚ [3] Street Canyon - signal along roads โ”‚ +โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค +โ”‚ [4] Reflections - bounced signals โ”‚ +โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค +โ”‚ Terrain (SRTM) + Buildings (OSM) โ”‚ +โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ +``` + +### Settings Model + +```python +class PropagationSettings(BaseModel): + # Existing + use_terrain: bool = True + use_buildings: bool = True + + # New toggles + use_materials: bool = True + use_dominant_path: bool = False # Slower but more accurate + use_street_canyon: bool = False # Requires road network + use_reflections: bool = False # Adds reflection paths + + # Presets + preset: str = "standard" # fast, standard, detailed, full + +# Presets mapping +PRESETS = { + "fast": { + "use_terrain": True, + "use_buildings": False, + "use_materials": False, + "use_dominant_path": False, + "use_street_canyon": False, + "use_reflections": False, + }, + "standard": { + "use_terrain": True, + "use_buildings": True, + "use_materials": True, + "use_dominant_path": False, + "use_street_canyon": False, + "use_reflections": False, + }, + "detailed": { + "use_terrain": True, + "use_buildings": True, + "use_materials": True, + "use_dominant_path": True, + "use_street_canyon": False, + "use_reflections": False, + }, + "full": { + "use_terrain": True, + "use_buildings": True, + "use_materials": True, + "use_dominant_path": True, + "use_street_canyon": True, + "use_reflections": True, + }, +} +``` + +--- + +## โœ… Tasks + +### Task 1: Building Materials Service (2-3 hours) + +**app/services/materials_service.py:** + +```python +from enum import Enum +from typing import Optional + +class BuildingMaterial(Enum): + """Building materials with RF properties""" + CONCRETE = "concrete" + BRICK = "brick" + GLASS = "glass" + WOOD = "wood" + METAL = "metal" + STONE = "stone" + PLASTER = "plaster" + UNKNOWN = "unknown" + +# ITU-R P.2040 based attenuation (dB per wall at 1-3 GHz) +MATERIAL_LOSS = { + BuildingMaterial.CONCRETE: 15.0, + BuildingMaterial.BRICK: 10.0, + BuildingMaterial.GLASS: 3.0, + BuildingMaterial.WOOD: 5.0, + BuildingMaterial.METAL: 25.0, # Or full reflection + BuildingMaterial.STONE: 12.0, + BuildingMaterial.PLASTER: 4.0, + BuildingMaterial.UNKNOWN: 10.0, # Default assumption +} + +# Reflection coefficient (0-1, portion of signal reflected) +MATERIAL_REFLECTION = { + BuildingMaterial.CONCRETE: 0.6, + BuildingMaterial.BRICK: 0.5, + BuildingMaterial.GLASS: 0.3, + BuildingMaterial.WOOD: 0.2, + BuildingMaterial.METAL: 0.9, + BuildingMaterial.STONE: 0.55, + BuildingMaterial.PLASTER: 0.3, + BuildingMaterial.UNKNOWN: 0.4, +} + +class MaterialsService: + """Building material detection and RF properties""" + + # OSM building:material tag mapping + OSM_MATERIAL_MAP = { + "concrete": BuildingMaterial.CONCRETE, + "brick": BuildingMaterial.BRICK, + "glass": BuildingMaterial.GLASS, + "wood": BuildingMaterial.WOOD, + "metal": BuildingMaterial.METAL, + "steel": BuildingMaterial.METAL, + "stone": BuildingMaterial.STONE, + "plaster": BuildingMaterial.PLASTER, + "cement_block": BuildingMaterial.CONCRETE, + "timber": BuildingMaterial.WOOD, + } + + # Fallback by building type + BUILDING_TYPE_MATERIAL = { + "industrial": BuildingMaterial.METAL, + "warehouse": BuildingMaterial.METAL, + "garage": BuildingMaterial.METAL, + "shed": BuildingMaterial.WOOD, + "house": BuildingMaterial.BRICK, + "residential": BuildingMaterial.CONCRETE, + "apartments": BuildingMaterial.CONCRETE, + "commercial": BuildingMaterial.GLASS, # Often glass facades + "office": BuildingMaterial.GLASS, + "retail": BuildingMaterial.GLASS, + "church": BuildingMaterial.STONE, + "cathedral": BuildingMaterial.STONE, + "school": BuildingMaterial.BRICK, + "hospital": BuildingMaterial.CONCRETE, + "university": BuildingMaterial.CONCRETE, + } + + def detect_material(self, building_tags: dict) -> BuildingMaterial: + """Detect building material from OSM tags""" + + # Direct material tag + if "building:material" in building_tags: + material_str = building_tags["building:material"].lower() + if material_str in self.OSM_MATERIAL_MAP: + return self.OSM_MATERIAL_MAP[material_str] + + # Facade material (often more relevant for RF) + if "building:facade:material" in building_tags: + material_str = building_tags["building:facade:material"].lower() + if material_str in self.OSM_MATERIAL_MAP: + return self.OSM_MATERIAL_MAP[material_str] + + # Fallback by building type + building_type = building_tags.get("building", "yes").lower() + if building_type in self.BUILDING_TYPE_MATERIAL: + return self.BUILDING_TYPE_MATERIAL[building_type] + + return BuildingMaterial.UNKNOWN + + def get_penetration_loss(self, material: BuildingMaterial, frequency_mhz: float = 1800) -> float: + """ + Get RF penetration loss through wall + + Frequency correction: +2dB per octave above 1GHz + """ + base_loss = MATERIAL_LOSS[material] + + # Frequency correction (simplified) + freq_factor = max(0, (frequency_mhz - 1000) / 1000) * 2 + + return base_loss + freq_factor + + def get_reflection_coefficient(self, material: BuildingMaterial) -> float: + """Get reflection coefficient (0-1)""" + return MATERIAL_REFLECTION[material] + + def get_reflection_loss(self, material: BuildingMaterial) -> float: + """Get loss due to reflection (dB)""" + coeff = MATERIAL_REFLECTION[material] + if coeff <= 0: + return 30.0 # Effectively no reflection + + # Reflection loss in dB = -10 * log10(coefficient) + import math + return -10 * math.log10(coeff) + + +materials_service = MaterialsService() +``` + +**Update buildings_service.py:** + +```python +# Add to Building model +class Building(BaseModel): + id: int + geometry: List[List[float]] + height: float + levels: Optional[int] = None + building_type: Optional[str] = None + material: Optional[str] = None # NEW + tags: dict = {} # NEW - store all tags for material detection +``` + +--- + +### Task 2: Dominant Path Service (4-5 hours) + +**app/services/dominant_path_service.py:** + +```python +import numpy as np +from typing import List, Tuple, Optional +from dataclasses import dataclass +from app.services.terrain_service import terrain_service +from app.services.buildings_service import buildings_service, Building +from app.services.materials_service import materials_service, BuildingMaterial + +@dataclass +class RayPath: + """Single ray path from TX to RX""" + path_type: str # "direct", "reflected", "diffracted", "street" + total_distance: float # meters + path_loss: float # dB + reflection_points: List[Tuple[float, float]] # [(lat, lon), ...] + materials_crossed: List[BuildingMaterial] + is_valid: bool # Does this path exist? + +class DominantPathService: + """ + Find dominant propagation paths (2-3 strongest) + + Path types: + 1. Direct (LoS if available) + 2. Single reflection off building + 3. Over-roof diffraction + 4. Around-corner diffraction + """ + + MAX_REFLECTIONS = 2 + MAX_PATHS = 3 + + async def find_dominant_paths( + self, + tx_lat: float, tx_lon: float, tx_height: float, + rx_lat: float, rx_lon: float, rx_height: float, + frequency_mhz: float, + buildings: List[Building] + ) -> List[RayPath]: + """Find the dominant propagation paths""" + + paths = [] + + # 1. Try direct path + direct = await self._check_direct_path( + tx_lat, tx_lon, tx_height, + rx_lat, rx_lon, rx_height, + frequency_mhz, buildings + ) + if direct: + paths.append(direct) + + # 2. Try single-bounce reflections + reflections = await self._find_reflection_paths( + tx_lat, tx_lon, tx_height, + rx_lat, rx_lon, rx_height, + frequency_mhz, buildings + ) + paths.extend(reflections[:2]) # Max 2 reflection paths + + # 3. Try over-roof diffraction (if direct blocked) + if not direct or not direct.is_valid: + diffracted = await self._find_diffraction_path( + tx_lat, tx_lon, tx_height, + rx_lat, rx_lon, rx_height, + frequency_mhz, buildings + ) + if diffracted: + paths.append(diffracted) + + # Sort by path loss (best first) and return top N + paths.sort(key=lambda p: p.path_loss) + return paths[:self.MAX_PATHS] + + async def _check_direct_path( + self, + tx_lat, tx_lon, tx_height, + rx_lat, rx_lon, rx_height, + frequency_mhz, + buildings: List[Building] + ) -> Optional[RayPath]: + """Check if direct LoS path exists""" + from app.services.los_service import los_service + + # Check terrain LoS + los_result = await los_service.check_line_of_sight( + tx_lat, tx_lon, tx_height, + rx_lat, rx_lon, rx_height + ) + + if not los_result["has_los"]: + return RayPath( + path_type="direct", + total_distance=los_result.get("distance", 0), + path_loss=float('inf'), + reflection_points=[], + materials_crossed=[], + is_valid=False + ) + + # Check building intersections + materials_crossed = [] + for building in buildings: + intersection = self._line_intersects_building_3d( + tx_lat, tx_lon, tx_height, + rx_lat, rx_lon, rx_height, + building + ) + if intersection: + material = materials_service.detect_material(building.tags) + materials_crossed.append(material) + + # Calculate path loss + distance = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon) + path_loss = self._calculate_path_loss(distance, frequency_mhz, tx_height, rx_height) + + # Add material penetration losses + for material in materials_crossed: + path_loss += materials_service.get_penetration_loss(material, frequency_mhz) + + return RayPath( + path_type="direct", + total_distance=distance, + path_loss=path_loss, + reflection_points=[], + materials_crossed=materials_crossed, + is_valid=len(materials_crossed) < 3 # Too many walls = not viable + ) + + async def _find_reflection_paths( + self, + tx_lat, tx_lon, tx_height, + rx_lat, rx_lon, rx_height, + frequency_mhz, + buildings: List[Building] + ) -> List[RayPath]: + """Find viable single-bounce reflection paths""" + + reflection_paths = [] + + for building in buildings: + # Find potential reflection points on building walls + reflection_point = self._find_reflection_point( + tx_lat, tx_lon, rx_lat, rx_lon, building + ) + + if not reflection_point: + continue + + ref_lat, ref_lon = reflection_point + + # Check if both segments are clear + # TX -> Reflection point + dist1 = terrain_service.haversine_distance(tx_lat, tx_lon, ref_lat, ref_lon) + # Reflection point -> RX + dist2 = terrain_service.haversine_distance(ref_lat, ref_lon, rx_lat, rx_lon) + + total_distance = dist1 + dist2 + + # Don't consider if much longer than direct path + direct_distance = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon) + if total_distance > direct_distance * 2: + continue + + # Calculate path loss + path_loss = self._calculate_path_loss(total_distance, frequency_mhz, tx_height, rx_height) + + # Add reflection loss + material = materials_service.detect_material(building.tags) + path_loss += materials_service.get_reflection_loss(material) + + reflection_paths.append(RayPath( + path_type="reflected", + total_distance=total_distance, + path_loss=path_loss, + reflection_points=[(ref_lat, ref_lon)], + materials_crossed=[], + is_valid=True + )) + + return reflection_paths + + async def _find_diffraction_path( + self, + tx_lat, tx_lon, tx_height, + rx_lat, rx_lon, rx_height, + frequency_mhz, + buildings: List[Building] + ) -> Optional[RayPath]: + """Find over-roof diffraction path""" + + # Find highest obstacle between TX and RX + max_height = 0 + obstacle_lat, obstacle_lon = None, None + + # Sample points along direct path + num_samples = 20 + for i in range(1, num_samples - 1): + t = i / num_samples + lat = tx_lat + t * (rx_lat - tx_lat) + lon = tx_lon + t * (rx_lon - tx_lon) + + # Check terrain + terrain_elev = await terrain_service.get_elevation(lat, lon) + if terrain_elev > max_height: + max_height = terrain_elev + obstacle_lat, obstacle_lon = lat, lon + + # Check buildings at this point + for building in buildings: + if buildings_service.point_in_building(lat, lon, building): + if building.height > max_height: + max_height = building.height + obstacle_lat, obstacle_lon = lat, lon + + if not obstacle_lat: + return None + + # Calculate diffraction loss (simplified knife-edge) + distance = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon) + + # Fresnel parameter + tx_elev = await terrain_service.get_elevation(tx_lat, tx_lon) + rx_elev = await terrain_service.get_elevation(rx_lat, rx_lon) + + tx_total = tx_elev + tx_height + rx_total = rx_elev + rx_height + + # Height of LoS at obstacle point + d1 = terrain_service.haversine_distance(tx_lat, tx_lon, obstacle_lat, obstacle_lon) + los_height = tx_total + (rx_total - tx_total) * (d1 / distance) + + clearance = los_height - max_height + + # Knife-edge diffraction loss + diffraction_loss = self._knife_edge_loss(clearance, frequency_mhz, distance, d1) + + path_loss = self._calculate_path_loss(distance, frequency_mhz, tx_height, rx_height) + path_loss += diffraction_loss + + return RayPath( + path_type="diffracted", + total_distance=distance, + path_loss=path_loss, + reflection_points=[(obstacle_lat, obstacle_lon)], + materials_crossed=[], + is_valid=True + ) + + def _find_reflection_point( + self, + tx_lat: float, tx_lon: float, + rx_lat: float, rx_lon: float, + building: Building + ) -> Optional[Tuple[float, float]]: + """Find specular reflection point on building wall""" + + # Simplified: find closest wall segment and calculate reflection + geometry = building.geometry + + best_point = None + best_score = float('inf') + + for i in range(len(geometry) - 1): + wall_start = geometry[i] + wall_end = geometry[i + 1] + + # Find reflection point on this wall segment + ref_point = self._specular_reflection( + tx_lon, tx_lat, rx_lon, rx_lat, + wall_start[0], wall_start[1], + wall_end[0], wall_end[1] + ) + + if ref_point: + # Score by total path length + d1 = np.sqrt((ref_point[0] - tx_lon)**2 + (ref_point[1] - tx_lat)**2) + d2 = np.sqrt((ref_point[0] - rx_lon)**2 + (ref_point[1] - rx_lat)**2) + score = d1 + d2 + + if score < best_score: + best_score = score + best_point = (ref_point[1], ref_point[0]) # Return as (lat, lon) + + return best_point + + def _specular_reflection( + self, + tx_x, tx_y, rx_x, rx_y, + wall_x1, wall_y1, wall_x2, wall_y2 + ) -> Optional[Tuple[float, float]]: + """Calculate specular reflection point on wall segment""" + + # Wall vector + wall_dx = wall_x2 - wall_x1 + wall_dy = wall_y2 - wall_y1 + wall_len = np.sqrt(wall_dx**2 + wall_dy**2) + + if wall_len < 1e-10: + return None + + # Wall normal + normal_x = -wall_dy / wall_len + normal_y = wall_dx / wall_len + + # Mirror TX across wall + # Project TX onto wall + tx_rel_x = tx_x - wall_x1 + tx_rel_y = tx_y - wall_y1 + + dot = tx_rel_x * normal_x + tx_rel_y * normal_y + + mirror_x = tx_x - 2 * dot * normal_x + mirror_y = tx_y - 2 * dot * normal_y + + # Find intersection of (mirror -> RX) with wall + # Parametric line: mirror + t * (rx - mirror) + dx = rx_x - mirror_x + dy = rx_y - mirror_y + + # Wall parametric: wall1 + s * (wall2 - wall1) + denom = dx * wall_dy - dy * wall_dx + + if abs(denom) < 1e-10: + return None # Parallel + + t = ((wall_x1 - mirror_x) * wall_dy - (wall_y1 - mirror_y) * wall_dx) / denom + s = ((wall_x1 - mirror_x) * dy - (wall_y1 - mirror_y) * dx) / (-denom) + + # Check if intersection is on wall segment and between mirror and RX + if 0 <= s <= 1 and 0 <= t <= 1: + ref_x = mirror_x + t * dx + ref_y = mirror_y + t * dy + return (ref_x, ref_y) + + return None + + def _line_intersects_building_3d( + self, + lat1, lon1, height1, + lat2, lon2, height2, + building: Building + ) -> bool: + """Check if 3D line intersects building volume""" + # Sample along line + for t in np.linspace(0, 1, 20): + lat = lat1 + t * (lat2 - lat1) + lon = lon1 + t * (lon2 - lon1) + height = height1 + t * (height2 - height1) + + if buildings_service.point_in_building(lat, lon, building): + if height < building.height: + return True + return False + + def _calculate_path_loss(self, distance, frequency_mhz, tx_height, rx_height) -> float: + """Okumura-Hata path loss""" + d_km = max(distance / 1000, 0.1) + + a_hm = (1.1 * np.log10(frequency_mhz) - 0.7) * rx_height - (1.56 * np.log10(frequency_mhz) - 0.8) + + L = (69.55 + 26.16 * np.log10(frequency_mhz) - 13.82 * np.log10(tx_height) - a_hm + + (44.9 - 6.55 * np.log10(tx_height)) * np.log10(d_km)) + + return L + + def _knife_edge_loss(self, clearance, frequency_mhz, total_distance, d1) -> float: + """Knife-edge diffraction loss""" + if clearance >= 0: + return 0.0 + + wavelength = 300 / frequency_mhz + d2 = total_distance - d1 + + # Fresnel parameter v + v = abs(clearance) * np.sqrt(2 * (d1 + d2) / (wavelength * d1 * d2)) + + # Lee's approximation + if v <= -0.78: + return 0 + elif v < 0: + return 6.02 + 9.11 * v - 1.27 * v**2 + elif v < 2.4: + return 6.02 + 9.11 * v + 1.27 * v**2 + else: + return 13 + 20 * np.log10(v) + + +dominant_path_service = DominantPathService() +``` + +--- + +### Task 3: Street Canyon Service (4-5 hours) + +**app/services/street_canyon_service.py:** + +```python +import numpy as np +from typing import List, Tuple, Optional +from dataclasses import dataclass +import httpx +from pathlib import Path +import json + +@dataclass +class Street: + """Street segment from OSM""" + id: int + name: Optional[str] + geometry: List[Tuple[float, float]] # [(lat, lon), ...] + width: float # meters + highway_type: str # residential, primary, secondary, etc. + +class StreetCanyonService: + """ + Street canyon propagation model (ITU-R P.1411) + + Signal propagates along streets with reflections from building walls. + Loss increases at corners/turns. + """ + + OVERPASS_URL = "https://overpass-api.de/api/interpreter" + + # Default street widths by type + STREET_WIDTHS = { + "motorway": 25.0, + "trunk": 20.0, + "primary": 15.0, + "secondary": 12.0, + "tertiary": 10.0, + "residential": 8.0, + "unclassified": 6.0, + "service": 5.0, + "footway": 2.0, + "path": 1.5, + } + + # Corner/turn loss + CORNER_LOSS_90 = 10.0 # dB for 90-degree turn + CORNER_LOSS_45 = 4.0 # dB for 45-degree turn + + def __init__(self, cache_dir: str = "/opt/rfcp/backend/data/streets"): + self.cache_dir = Path(cache_dir) + self.cache_dir.mkdir(exist_ok=True, parents=True) + self._cache: dict[str, List[Street]] = {} + + async def fetch_streets( + self, + min_lat: float, min_lon: float, + max_lat: float, max_lon: float + ) -> List[Street]: + """Fetch street network from OSM""" + + cache_key = f"{min_lat:.3f}_{min_lon:.3f}_{max_lat:.3f}_{max_lon:.3f}" + + # Check cache + if cache_key in self._cache: + return self._cache[cache_key] + + cache_file = self.cache_dir / f"{cache_key}.json" + if cache_file.exists(): + with open(cache_file) as f: + data = json.load(f) + streets = [Street(**s) for s in data] + self._cache[cache_key] = streets + return streets + + # Fetch from Overpass + query = f""" + [out:json][timeout:30]; + way["highway"]({min_lat},{min_lon},{max_lat},{max_lon}); + out body; + >; + out skel qt; + """ + + try: + async with httpx.AsyncClient(timeout=60.0) as client: + response = await client.post(self.OVERPASS_URL, data={"data": query}) + response.raise_for_status() + data = response.json() + except Exception as e: + print(f"Street fetch error: {e}") + return [] + + streets = self._parse_streets(data) + + # Cache + with open(cache_file, 'w') as f: + json.dump([{ + "id": s.id, + "name": s.name, + "geometry": s.geometry, + "width": s.width, + "highway_type": s.highway_type + } for s in streets], f) + + self._cache[cache_key] = streets + return streets + + def _parse_streets(self, data: dict) -> List[Street]: + """Parse Overpass response into Street objects""" + + nodes = {} + for element in data.get("elements", []): + if element["type"] == "node": + nodes[element["id"]] = (element["lat"], element["lon"]) + + streets = [] + for element in data.get("elements", []): + if element["type"] != "way": + continue + + tags = element.get("tags", {}) + if "highway" not in tags: + continue + + highway_type = tags["highway"] + + # Skip non-road types + if highway_type in ["bus_stop", "crossing", "traffic_signals"]: + continue + + geometry = [] + for node_id in element.get("nodes", []): + if node_id in nodes: + geometry.append(nodes[node_id]) + + if len(geometry) < 2: + continue + + # Get width + width = self._get_street_width(tags) + + streets.append(Street( + id=element["id"], + name=tags.get("name"), + geometry=geometry, + width=width, + highway_type=highway_type + )) + + return streets + + def _get_street_width(self, tags: dict) -> float: + """Estimate street width from OSM tags""" + + # Explicit width + if "width" in tags: + try: + return float(tags["width"].replace("m", "").strip()) + except: + pass + + # Calculate from lanes + if "lanes" in tags: + try: + lanes = int(tags["lanes"]) + return lanes * 3.5 # ~3.5m per lane + except: + pass + + # Default by highway type + highway_type = tags.get("highway", "residential") + return self.STREET_WIDTHS.get(highway_type, 8.0) + + async def calculate_street_canyon_loss( + self, + tx_lat: float, tx_lon: float, tx_height: float, + rx_lat: float, rx_lon: float, rx_height: float, + frequency_mhz: float, + streets: List[Street] + ) -> Tuple[float, List[Tuple[float, float]]]: + """ + Calculate path loss through street canyon + + Returns: + (path_loss_db, street_path as list of points) + """ + + # Find path along streets from TX to RX + street_path = self._find_street_path(tx_lat, tx_lon, rx_lat, rx_lon, streets) + + if not street_path: + return float('inf'), [] # No street path found + + # Calculate loss along path + total_loss = 0.0 + total_distance = 0.0 + + for i in range(len(street_path) - 1): + p1 = street_path[i] + p2 = street_path[i + 1] + + # Segment distance + from app.services.terrain_service import TerrainService + segment_dist = TerrainService.haversine_distance(p1[0], p1[1], p2[0], p2[1]) + total_distance += segment_dist + + # Street canyon loss (ITU-R P.1411 simplified) + # L = 32.4 + 20*log10(f_MHz) + 20*log10(d_km) + if segment_dist > 0: + segment_loss = 32.4 + 20 * np.log10(frequency_mhz) + 20 * np.log10(segment_dist / 1000 + 0.001) + total_loss += segment_loss * (segment_dist / total_distance) if total_distance > 0 else 0 + + # Corner loss + if i > 0: + corner_angle = self._calculate_corner_angle( + street_path[i-1], p1, p2 + ) + corner_loss = self._corner_loss(corner_angle) + total_loss += corner_loss + + return total_loss, street_path + + def _find_street_path( + self, + start_lat: float, start_lon: float, + end_lat: float, end_lon: float, + streets: List[Street] + ) -> List[Tuple[float, float]]: + """ + Find path along streets (simplified A* / greedy) + + Returns list of (lat, lon) waypoints + """ + + # Find nearest street point to start and end + start_point = self._nearest_street_point(start_lat, start_lon, streets) + end_point = self._nearest_street_point(end_lat, end_lon, streets) + + if not start_point or not end_point: + return [] + + # Simplified: just return direct street segments + # Full implementation would use A* pathfinding + path = [(start_lat, start_lon), start_point] + + # Add intermediate points along streets toward destination + current = start_point + visited = set() + + for _ in range(50): # Max iterations + if self._distance(current, end_point) < 50: # Within 50m + break + + # Find next street segment toward destination + next_point = self._next_street_point(current, end_point, streets, visited) + if not next_point: + break + + path.append(next_point) + visited.add((round(current[0], 5), round(current[1], 5))) + current = next_point + + path.append(end_point) + path.append((end_lat, end_lon)) + + return path + + def _nearest_street_point( + self, + lat: float, lon: float, + streets: List[Street] + ) -> Optional[Tuple[float, float]]: + """Find nearest point on any street""" + + best_point = None + best_dist = float('inf') + + for street in streets: + for point in street.geometry: + dist = self._distance((lat, lon), point) + if dist < best_dist: + best_dist = dist + best_point = point + + return best_point if best_dist < 200 else None # Max 200m to street + + def _next_street_point( + self, + current: Tuple[float, float], + target: Tuple[float, float], + streets: List[Street], + visited: set + ) -> Optional[Tuple[float, float]]: + """Find next street point toward target""" + + best_point = None + best_score = float('inf') + + for street in streets: + for i, point in enumerate(street.geometry): + if (round(point[0], 5), round(point[1], 5)) in visited: + continue + + dist_from_current = self._distance(current, point) + dist_to_target = self._distance(point, target) + + # Must be close to current position + if dist_from_current > 100: + continue + + # Score: prefer points closer to target + score = dist_to_target + dist_from_current * 0.5 + + if score < best_score: + best_score = score + best_point = point + + return best_point + + def _distance(self, p1: Tuple[float, float], p2: Tuple[float, float]) -> float: + """Quick distance approximation (meters)""" + lat_diff = (p1[0] - p2[0]) * 111000 + lon_diff = (p1[1] - p2[1]) * 111000 * np.cos(np.radians(p1[0])) + return np.sqrt(lat_diff**2 + lon_diff**2) + + def _calculate_corner_angle( + self, + p1: Tuple[float, float], + p2: Tuple[float, float], + p3: Tuple[float, float] + ) -> float: + """Calculate angle at corner (degrees)""" + + v1 = (p1[0] - p2[0], p1[1] - p2[1]) + v2 = (p3[0] - p2[0], p3[1] - p2[1]) + + dot = v1[0] * v2[0] + v1[1] * v2[1] + mag1 = np.sqrt(v1[0]**2 + v1[1]**2) + mag2 = np.sqrt(v2[0]**2 + v2[1]**2) + + if mag1 * mag2 < 1e-10: + return 180.0 + + cos_angle = dot / (mag1 * mag2) + cos_angle = max(-1, min(1, cos_angle)) + + return np.degrees(np.arccos(cos_angle)) + + def _corner_loss(self, angle_degrees: float) -> float: + """Calculate loss due to corner/turn""" + + # Straight = 180ยฐ, right angle = 90ยฐ + turn_angle = abs(180 - angle_degrees) + + if turn_angle < 15: + return 0.0 + elif turn_angle < 45: + return self.CORNER_LOSS_45 * (turn_angle / 45) + elif turn_angle < 90: + return self.CORNER_LOSS_45 + (self.CORNER_LOSS_90 - self.CORNER_LOSS_45) * ((turn_angle - 45) / 45) + else: + return self.CORNER_LOSS_90 + (turn_angle - 90) * 0.2 # Extra loss for sharp turns + + +street_canyon_service = StreetCanyonService() +``` + +--- + +### Task 4: Reflection Service (3-4 hours) + +**app/services/reflection_service.py:** + +```python +import numpy as np +from typing import List, Tuple, Optional +from dataclasses import dataclass +from app.services.buildings_service import Building +from app.services.materials_service import materials_service + +@dataclass +class ReflectionPath: + """A reflection path with one or more bounces""" + points: List[Tuple[float, float]] # [TX, reflection1, reflection2, ..., RX] + total_distance: float + total_loss: float + reflection_count: int + materials: List[str] + +class ReflectionService: + """ + Calculate reflection paths for RF propagation + + - Single bounce (most common) + - Double bounce (around corners) + - Ground reflection + """ + + MAX_BOUNCES = 2 + GROUND_REFLECTION_COEFF = 0.3 # Depends on surface + + async def find_reflection_paths( + self, + tx_lat: float, tx_lon: float, tx_height: float, + rx_lat: float, rx_lon: float, rx_height: float, + frequency_mhz: float, + buildings: List[Building], + include_ground: bool = True + ) -> List[ReflectionPath]: + """Find all viable reflection paths""" + + paths = [] + + # Single-bounce building reflections + for building in buildings: + path = self._find_single_bounce( + tx_lat, tx_lon, tx_height, + rx_lat, rx_lon, rx_height, + frequency_mhz, building + ) + if path: + paths.append(path) + + # Ground reflection + if include_ground: + ground_path = self._calculate_ground_reflection( + tx_lat, tx_lon, tx_height, + rx_lat, rx_lon, rx_height, + frequency_mhz + ) + if ground_path: + paths.append(ground_path) + + # Sort by loss (best first) + paths.sort(key=lambda p: p.total_loss) + + return paths[:5] # Return top 5 + + def _find_single_bounce( + self, + tx_lat, tx_lon, tx_height, + rx_lat, rx_lon, rx_height, + frequency_mhz, + building: Building + ) -> Optional[ReflectionPath]: + """Find single-bounce reflection off building""" + + # Find reflection point on building walls + geometry = building.geometry + + for i in range(len(geometry) - 1): + wall_start = geometry[i] + wall_end = geometry[i + 1] + + ref_point = self._specular_reflection_point( + (tx_lon, tx_lat), (rx_lon, rx_lat), + wall_start, wall_end + ) + + if not ref_point: + continue + + ref_lat, ref_lon = ref_point[1], ref_point[0] + + # Calculate distances + from app.services.terrain_service import TerrainService + d1 = TerrainService.haversine_distance(tx_lat, tx_lon, ref_lat, ref_lon) + d2 = TerrainService.haversine_distance(ref_lat, ref_lon, rx_lat, rx_lon) + total_dist = d1 + d2 + + # Direct distance check - reflection shouldn't be much longer + direct_dist = TerrainService.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon) + if total_dist > direct_dist * 1.5: + continue + + # Path loss + path_loss = self._free_space_loss(total_dist, frequency_mhz) + + # Reflection loss + material = materials_service.detect_material(building.tags) + reflection_loss = materials_service.get_reflection_loss(material) + + total_loss = path_loss + reflection_loss + + return ReflectionPath( + points=[(tx_lat, tx_lon), (ref_lat, ref_lon), (rx_lat, rx_lon)], + total_distance=total_dist, + total_loss=total_loss, + reflection_count=1, + materials=[material.value] + ) + + return None + + def _calculate_ground_reflection( + self, + tx_lat, tx_lon, tx_height, + rx_lat, rx_lon, rx_height, + frequency_mhz + ) -> Optional[ReflectionPath]: + """Calculate ground reflection path""" + + from app.services.terrain_service import TerrainService + + # Reflection point (simplified - midpoint for flat ground) + mid_lat = (tx_lat + rx_lat) / 2 + mid_lon = (tx_lon + rx_lon) / 2 + + # Path lengths + d1 = TerrainService.haversine_distance(tx_lat, tx_lon, mid_lat, mid_lon) + d2 = TerrainService.haversine_distance(mid_lat, mid_lon, rx_lat, rx_lon) + + # Actual path length considering heights + path1 = np.sqrt(d1**2 + tx_height**2) + path2 = np.sqrt(d2**2 + rx_height**2) + total_dist = path1 + path2 + + # Path loss + path_loss = self._free_space_loss(total_dist, frequency_mhz) + + # Ground reflection loss (~5-10 dB typically) + ground_reflection_loss = -10 * np.log10(self.GROUND_REFLECTION_COEFF) + + # Phase difference can cause constructive or destructive interference + # Simplified: assume average case + total_loss = path_loss + ground_reflection_loss + + return ReflectionPath( + points=[(tx_lat, tx_lon), (mid_lat, mid_lon), (rx_lat, rx_lon)], + total_distance=total_dist, + total_loss=total_loss, + reflection_count=1, + materials=["ground"] + ) + + def _specular_reflection_point( + self, + tx: Tuple[float, float], # (lon, lat) + rx: Tuple[float, float], + wall_start: List[float], # [lon, lat] + wall_end: List[float] + ) -> Optional[Tuple[float, float]]: + """Calculate specular reflection point on wall""" + + # Wall vector + wx = wall_end[0] - wall_start[0] + wy = wall_end[1] - wall_start[1] + wall_len = np.sqrt(wx**2 + wy**2) + + if wall_len < 1e-10: + return None + + # Normalize + wx /= wall_len + wy /= wall_len + + # Wall normal (perpendicular) + nx = -wy + ny = wx + + # Vector from wall start to TX + tx_rel_x = tx[0] - wall_start[0] + tx_rel_y = tx[1] - wall_start[1] + + # Distance from TX to wall line + dist_to_wall = tx_rel_x * nx + tx_rel_y * ny + + # Mirror TX across wall + mirror_x = tx[0] - 2 * dist_to_wall * nx + mirror_y = tx[1] - 2 * dist_to_wall * ny + + # Line from mirror to RX + dx = rx[0] - mirror_x + dy = rx[1] - mirror_y + + # Find intersection with wall + # Parametric: wall_start + t * wall_dir + # Parametric: mirror + s * (rx - mirror) + + denom = dx * wy - dy * wx + if abs(denom) < 1e-10: + return None + + t = ((wall_start[0] - mirror_x) * wy - (wall_start[1] - mirror_y) * wx) / denom + s = ((wall_start[0] - mirror_x) * dy - (wall_start[1] - mirror_y) * dx) / (-denom) if denom != 0 else 0 + + # Check if on wall segment and between mirror and RX + if 0 <= s <= 1 and 0 <= t <= 1: + ref_x = mirror_x + t * dx + ref_y = mirror_y + t * dy + return (ref_x, ref_y) + + return None + + def _free_space_loss(self, distance: float, frequency_mhz: float) -> float: + """Free space path loss (dB)""" + if distance <= 0: + distance = 1 + + # FSPL = 20*log10(d) + 20*log10(f) + 20*log10(4*pi/c) + # Simplified: FSPL = 32.45 + 20*log10(f_MHz) + 20*log10(d_km) + d_km = distance / 1000 + return 32.45 + 20 * np.log10(frequency_mhz) + 20 * np.log10(d_km + 0.001) + + def combine_paths( + self, + direct_power_dbm: float, + reflection_paths: List[ReflectionPath], + tx_power_dbm: float + ) -> float: + """ + Combine direct and reflected signals (power sum) + + Returns total received power in dBm + """ + + # Convert to linear power + powers = [] + + if direct_power_dbm > -150: # Valid direct signal + powers.append(10 ** (direct_power_dbm / 10)) + + for path in reflection_paths: + reflected_power_dbm = tx_power_dbm - path.total_loss + if reflected_power_dbm > -150: + powers.append(10 ** (reflected_power_dbm / 10)) + + if not powers: + return -150.0 # No signal + + # Sum powers (incoherent addition - conservative estimate) + total_power = sum(powers) + + return 10 * np.log10(total_power) + + +reflection_service = ReflectionService() +``` + +--- + +### Task 5: Update Coverage Service (2-3 hours) + +**Update app/services/coverage_service.py:** + +```python +# Add imports +from app.services.materials_service import materials_service +from app.services.dominant_path_service import dominant_path_service +from app.services.street_canyon_service import street_canyon_service +from app.services.reflection_service import reflection_service + +# Update CoverageSettings +class CoverageSettings(BaseModel): + radius: float = 10000 + resolution: float = 200 + min_signal: float = -120 + + # Layer toggles + use_terrain: bool = True + use_buildings: bool = True + use_materials: bool = True + use_dominant_path: bool = False + use_street_canyon: bool = False + use_reflections: bool = False + + # Preset + preset: Optional[str] = None # fast, standard, detailed, full + +# Add preset application +PRESETS = { + "fast": {...}, + "standard": {...}, + "detailed": {...}, + "full": {...}, +} + +def apply_preset(settings: CoverageSettings) -> CoverageSettings: + if settings.preset and settings.preset in PRESETS: + for key, value in PRESETS[settings.preset].items(): + setattr(settings, key, value) + return settings + +# Update _calculate_point method +async def _calculate_point(self, site, lat, lon, settings, buildings, streets): + """Enhanced point calculation with all propagation models""" + + # ... existing code ... + + # Material-aware building loss + if settings.use_buildings and settings.use_materials: + for building in buildings: + if self._intersects_building(site, lat, lon, building): + material = materials_service.detect_material(building.tags) + building_loss += materials_service.get_penetration_loss( + material, site.frequency + ) + + # Dominant path (find best route) + if settings.use_dominant_path: + paths = await dominant_path_service.find_dominant_paths( + site.lat, site.lon, site.height, + lat, lon, 1.5, + site.frequency, buildings + ) + if paths: + best_path = paths[0] + # Use best path's loss instead of simple calculation + path_loss = best_path.path_loss + + # Street canyon + if settings.use_street_canyon and streets: + canyon_loss, street_path = await street_canyon_service.calculate_street_canyon_loss( + site.lat, site.lon, site.height, + lat, lon, 1.5, + site.frequency, streets + ) + # Use canyon loss if better than direct + if canyon_loss < path_loss + terrain_loss + building_loss: + path_loss = canyon_loss + terrain_loss = 0 + building_loss = 0 + + # Reflections + reflection_gain = 0.0 + if settings.use_reflections: + reflection_paths = await reflection_service.find_reflection_paths( + site.lat, site.lon, site.height, + lat, lon, 1.5, + site.frequency, buildings + ) + if reflection_paths: + # Combine direct and reflected + direct_rsrp = site.power + site.gain - path_loss - terrain_loss - building_loss + combined_rsrp = reflection_service.combine_paths( + direct_rsrp, reflection_paths, site.power + site.gain + ) + reflection_gain = combined_rsrp - direct_rsrp + + # Final RSRP + rsrp = site.power + site.gain - path_loss - antenna_loss - terrain_loss - building_loss + reflection_gain + + return CoveragePoint( + lat=lat, lon=lon, rsrp=rsrp, + distance=distance, has_los=has_los, + terrain_loss=terrain_loss, + building_loss=building_loss, + reflection_gain=reflection_gain # NEW + ) +``` + +--- + +### Task 6: Update API & Add Presets Endpoint (1-2 hours) + +**Update app/api/routes/coverage.py:** + +```python +@router.get("/presets") +async def get_presets(): + """Get available propagation model presets""" + return { + "presets": { + "fast": { + "description": "Quick calculation - terrain only", + "use_terrain": True, + "use_buildings": False, + "use_materials": False, + "use_dominant_path": False, + "use_street_canyon": False, + "use_reflections": False, + "estimated_speed": "~5 seconds for 5km radius" + }, + "standard": { + "description": "Balanced - terrain + buildings with materials", + "use_terrain": True, + "use_buildings": True, + "use_materials": True, + "use_dominant_path": False, + "use_street_canyon": False, + "use_reflections": False, + "estimated_speed": "~30 seconds for 5km radius" + }, + "detailed": { + "description": "Accurate - adds dominant path analysis", + "use_terrain": True, + "use_buildings": True, + "use_materials": True, + "use_dominant_path": True, + "use_street_canyon": False, + "use_reflections": False, + "estimated_speed": "~2 minutes for 5km radius" + }, + "full": { + "description": "Maximum realism - all models enabled", + "use_terrain": True, + "use_buildings": True, + "use_materials": True, + "use_dominant_path": True, + "use_street_canyon": True, + "use_reflections": True, + "estimated_speed": "~5 minutes for 5km radius" + } + } + } + +# Update CoverageResponse +class CoverageResponse(BaseModel): + points: List[CoveragePoint] + count: int + settings: CoverageSettings + stats: dict + computation_time: float # NEW - seconds + models_used: List[str] # NEW - which models were active +``` + +--- + +## ๐Ÿ“ Files Summary + +### New Files: +``` +app/services/ +โ”œโ”€โ”€ materials_service.py # Building material RF properties +โ”œโ”€โ”€ dominant_path_service.py # Multi-path ray analysis +โ”œโ”€โ”€ street_canyon_service.py # Street network propagation +โ””โ”€โ”€ reflection_service.py # Reflection calculations + +data/streets/ # Cached street network +``` + +### Modified Files: +``` +app/services/ +โ”œโ”€โ”€ buildings_service.py # Add material & tags fields +โ””โ”€โ”€ coverage_service.py # Integrate all models + +app/api/routes/ +โ””โ”€โ”€ coverage.py # Add presets endpoint, timing + +app/models/ +โ””โ”€โ”€ coverage.py # Extended settings model +``` + +--- + +## โœ… Success Criteria + +- [ ] `/api/coverage/presets` returns all 4 presets with descriptions +- [ ] `preset: "full"` enables all toggles +- [ ] Material detection working (check logs for detected materials) +- [ ] Reflection paths found (check `reflection_gain` in response) +- [ ] Street canyon reduces loss along roads +- [ ] `computation_time` reported in response +- [ ] `models_used` shows active models +- [ ] Performance: "fast" < 10s, "standard" < 1min, "detailed" < 3min, "full" < 10min for 5km + +--- + +## ๐Ÿงช Test Commands + +```bash +# Test presets endpoint +curl https://api.rfcp.eliah.one/api/coverage/presets | jq + +# Fast preset +curl -X POST "https://api.rfcp.eliah.one/api/coverage/calculate" \ + -H "Content-Type: application/json" \ + -d '{"sites":[{"lat":48.46,"lon":35.05,"height":30,"power":43,"gain":15,"frequency":1800}],"settings":{"radius":2000,"resolution":100,"preset":"fast"}}' | jq '.stats, .computation_time, .models_used' + +# Full preset (will be slow!) +curl -X POST "https://api.rfcp.eliah.one/api/coverage/calculate" \ + -H "Content-Type: application/json" \ + -d '{"sites":[{"lat":48.46,"lon":35.05,"height":30,"power":43,"gain":15,"frequency":1800}],"settings":{"radius":1000,"resolution":100,"preset":"full"}}' | jq '.stats, .computation_time' + +# Check material detection +curl "https://api.rfcp.eliah.one/api/coverage/buildings?min_lat=48.455&min_lon=35.045&max_lat=48.465&max_lon=35.055" | jq '.buildings[:3] | .[].material' +``` + +--- + +## ๐Ÿ“ Notes + +- Street canyon requires road network fetch (additional Overpass query) +- Reflection calculations are CPU-intensive โ€” consider caching +- Full model on large areas may timeout โ€” implement background tasks later (1.4.1) +- Material detection fallback chain: explicit tag โ†’ facade tag โ†’ building type โ†’ default + +--- + +## ๐Ÿ”œ Next Iterations + +**1.4.1 โ€” Enhanced Environment (Should have):** +- Spatial indexing (R-tree) for buildings +- Ground reflection improvements +- Water body reflection (OSM natural=water) +- Vegetation loss (OSM landuse=forest) + +**1.4.2 โ€” Extra Factors (Could have):** +- Weather/rain attenuation (ITU-R P.838) +- Indoor penetration layer +- Seasonal vegetation toggle + +--- + +**Ready for Claude Code** ๐Ÿš€