# RFCP Backend - Iteration 1.4: Advanced Propagation Models **Date:** January 31, 2025 **Type:** Backend Development **Estimated:** 14-18 hours **Location:** `/opt/rfcp/backend/` --- ## ๐ŸŽฏ Goal Implement realistic RF propagation with building materials, dominant path analysis, street canyon model, and reflections. Add UI toggles for model selection. --- ## ๐Ÿ“‹ Pre-reading 1. `RFCP-Iteration-1.3-Coverage-OSM-Buildings.md` โ€” current state 2. ITU-R P.1411 โ€” Propagation for short-range outdoor 3. ITU-R P.2040 โ€” Building material properties --- ## ๐Ÿ“Š Current State ```bash # Backend 1.3 complete /opt/rfcp/backend/app/services/ โ”œโ”€โ”€ terrain_service.py # SRTM elevation โœ… โ”œโ”€โ”€ los_service.py # Line-of-sight + Fresnel โœ… โ”œโ”€โ”€ buildings_service.py # OSM buildings fetch โœ… โ””โ”€โ”€ coverage_service.py # Basic Okumura-Hata โœ… # Issues: # - points_with_buildings: 0 (intersection bug) # - No material consideration # - No reflections # - Slow (2 min for 1km radius) ``` --- ## ๐Ÿ—๏ธ Architecture ### Propagation Model Layers ``` โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ RSRP Calculation โ”‚ โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค โ”‚ Base: Okumura-Hata / COST-231 โ”‚ โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค โ”‚ [1] Dominant Path - find best 2-3 rays โ”‚ โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค โ”‚ [2] Materials - wall penetration loss โ”‚ โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค โ”‚ [3] Street Canyon - signal along roads โ”‚ โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค โ”‚ [4] Reflections - bounced signals โ”‚ โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค โ”‚ Terrain (SRTM) + Buildings (OSM) โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ ``` ### Settings Model ```python class PropagationSettings(BaseModel): # Existing use_terrain: bool = True use_buildings: bool = True # New toggles use_materials: bool = True use_dominant_path: bool = False # Slower but more accurate use_street_canyon: bool = False # Requires road network use_reflections: bool = False # Adds reflection paths # Presets preset: str = "standard" # fast, standard, detailed, full # Presets mapping PRESETS = { "fast": { "use_terrain": True, "use_buildings": False, "use_materials": False, "use_dominant_path": False, "use_street_canyon": False, "use_reflections": False, }, "standard": { "use_terrain": True, "use_buildings": True, "use_materials": True, "use_dominant_path": False, "use_street_canyon": False, "use_reflections": False, }, "detailed": { "use_terrain": True, "use_buildings": True, "use_materials": True, "use_dominant_path": True, "use_street_canyon": False, "use_reflections": False, }, "full": { "use_terrain": True, "use_buildings": True, "use_materials": True, "use_dominant_path": True, "use_street_canyon": True, "use_reflections": True, }, } ``` --- ## โœ… Tasks ### Task 1: Building Materials Service (2-3 hours) **app/services/materials_service.py:** ```python from enum import Enum from typing import Optional class BuildingMaterial(Enum): """Building materials with RF properties""" CONCRETE = "concrete" BRICK = "brick" GLASS = "glass" WOOD = "wood" METAL = "metal" STONE = "stone" PLASTER = "plaster" UNKNOWN = "unknown" # ITU-R P.2040 based attenuation (dB per wall at 1-3 GHz) MATERIAL_LOSS = { BuildingMaterial.CONCRETE: 15.0, BuildingMaterial.BRICK: 10.0, BuildingMaterial.GLASS: 3.0, BuildingMaterial.WOOD: 5.0, BuildingMaterial.METAL: 25.0, # Or full reflection BuildingMaterial.STONE: 12.0, BuildingMaterial.PLASTER: 4.0, BuildingMaterial.UNKNOWN: 10.0, # Default assumption } # Reflection coefficient (0-1, portion of signal reflected) MATERIAL_REFLECTION = { BuildingMaterial.CONCRETE: 0.6, BuildingMaterial.BRICK: 0.5, BuildingMaterial.GLASS: 0.3, BuildingMaterial.WOOD: 0.2, BuildingMaterial.METAL: 0.9, BuildingMaterial.STONE: 0.55, BuildingMaterial.PLASTER: 0.3, BuildingMaterial.UNKNOWN: 0.4, } class MaterialsService: """Building material detection and RF properties""" # OSM building:material tag mapping OSM_MATERIAL_MAP = { "concrete": BuildingMaterial.CONCRETE, "brick": BuildingMaterial.BRICK, "glass": BuildingMaterial.GLASS, "wood": BuildingMaterial.WOOD, "metal": BuildingMaterial.METAL, "steel": BuildingMaterial.METAL, "stone": BuildingMaterial.STONE, "plaster": BuildingMaterial.PLASTER, "cement_block": BuildingMaterial.CONCRETE, "timber": BuildingMaterial.WOOD, } # Fallback by building type BUILDING_TYPE_MATERIAL = { "industrial": BuildingMaterial.METAL, "warehouse": BuildingMaterial.METAL, "garage": BuildingMaterial.METAL, "shed": BuildingMaterial.WOOD, "house": BuildingMaterial.BRICK, "residential": BuildingMaterial.CONCRETE, "apartments": BuildingMaterial.CONCRETE, "commercial": BuildingMaterial.GLASS, # Often glass facades "office": BuildingMaterial.GLASS, "retail": BuildingMaterial.GLASS, "church": BuildingMaterial.STONE, "cathedral": BuildingMaterial.STONE, "school": BuildingMaterial.BRICK, "hospital": BuildingMaterial.CONCRETE, "university": BuildingMaterial.CONCRETE, } def detect_material(self, building_tags: dict) -> BuildingMaterial: """Detect building material from OSM tags""" # Direct material tag if "building:material" in building_tags: material_str = building_tags["building:material"].lower() if material_str in self.OSM_MATERIAL_MAP: return self.OSM_MATERIAL_MAP[material_str] # Facade material (often more relevant for RF) if "building:facade:material" in building_tags: material_str = building_tags["building:facade:material"].lower() if material_str in self.OSM_MATERIAL_MAP: return self.OSM_MATERIAL_MAP[material_str] # Fallback by building type building_type = building_tags.get("building", "yes").lower() if building_type in self.BUILDING_TYPE_MATERIAL: return self.BUILDING_TYPE_MATERIAL[building_type] return BuildingMaterial.UNKNOWN def get_penetration_loss(self, material: BuildingMaterial, frequency_mhz: float = 1800) -> float: """ Get RF penetration loss through wall Frequency correction: +2dB per octave above 1GHz """ base_loss = MATERIAL_LOSS[material] # Frequency correction (simplified) freq_factor = max(0, (frequency_mhz - 1000) / 1000) * 2 return base_loss + freq_factor def get_reflection_coefficient(self, material: BuildingMaterial) -> float: """Get reflection coefficient (0-1)""" return MATERIAL_REFLECTION[material] def get_reflection_loss(self, material: BuildingMaterial) -> float: """Get loss due to reflection (dB)""" coeff = MATERIAL_REFLECTION[material] if coeff <= 0: return 30.0 # Effectively no reflection # Reflection loss in dB = -10 * log10(coefficient) import math return -10 * math.log10(coeff) materials_service = MaterialsService() ``` **Update buildings_service.py:** ```python # Add to Building model class Building(BaseModel): id: int geometry: List[List[float]] height: float levels: Optional[int] = None building_type: Optional[str] = None material: Optional[str] = None # NEW tags: dict = {} # NEW - store all tags for material detection ``` --- ### Task 2: Dominant Path Service (4-5 hours) **app/services/dominant_path_service.py:** ```python import numpy as np from typing import List, Tuple, Optional from dataclasses import dataclass from app.services.terrain_service import terrain_service from app.services.buildings_service import buildings_service, Building from app.services.materials_service import materials_service, BuildingMaterial @dataclass class RayPath: """Single ray path from TX to RX""" path_type: str # "direct", "reflected", "diffracted", "street" total_distance: float # meters path_loss: float # dB reflection_points: List[Tuple[float, float]] # [(lat, lon), ...] materials_crossed: List[BuildingMaterial] is_valid: bool # Does this path exist? class DominantPathService: """ Find dominant propagation paths (2-3 strongest) Path types: 1. Direct (LoS if available) 2. Single reflection off building 3. Over-roof diffraction 4. Around-corner diffraction """ MAX_REFLECTIONS = 2 MAX_PATHS = 3 async def find_dominant_paths( self, tx_lat: float, tx_lon: float, tx_height: float, rx_lat: float, rx_lon: float, rx_height: float, frequency_mhz: float, buildings: List[Building] ) -> List[RayPath]: """Find the dominant propagation paths""" paths = [] # 1. Try direct path direct = await self._check_direct_path( tx_lat, tx_lon, tx_height, rx_lat, rx_lon, rx_height, frequency_mhz, buildings ) if direct: paths.append(direct) # 2. Try single-bounce reflections reflections = await self._find_reflection_paths( tx_lat, tx_lon, tx_height, rx_lat, rx_lon, rx_height, frequency_mhz, buildings ) paths.extend(reflections[:2]) # Max 2 reflection paths # 3. Try over-roof diffraction (if direct blocked) if not direct or not direct.is_valid: diffracted = await self._find_diffraction_path( tx_lat, tx_lon, tx_height, rx_lat, rx_lon, rx_height, frequency_mhz, buildings ) if diffracted: paths.append(diffracted) # Sort by path loss (best first) and return top N paths.sort(key=lambda p: p.path_loss) return paths[:self.MAX_PATHS] async def _check_direct_path( self, tx_lat, tx_lon, tx_height, rx_lat, rx_lon, rx_height, frequency_mhz, buildings: List[Building] ) -> Optional[RayPath]: """Check if direct LoS path exists""" from app.services.los_service import los_service # Check terrain LoS los_result = await los_service.check_line_of_sight( tx_lat, tx_lon, tx_height, rx_lat, rx_lon, rx_height ) if not los_result["has_los"]: return RayPath( path_type="direct", total_distance=los_result.get("distance", 0), path_loss=float('inf'), reflection_points=[], materials_crossed=[], is_valid=False ) # Check building intersections materials_crossed = [] for building in buildings: intersection = self._line_intersects_building_3d( tx_lat, tx_lon, tx_height, rx_lat, rx_lon, rx_height, building ) if intersection: material = materials_service.detect_material(building.tags) materials_crossed.append(material) # Calculate path loss distance = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon) path_loss = self._calculate_path_loss(distance, frequency_mhz, tx_height, rx_height) # Add material penetration losses for material in materials_crossed: path_loss += materials_service.get_penetration_loss(material, frequency_mhz) return RayPath( path_type="direct", total_distance=distance, path_loss=path_loss, reflection_points=[], materials_crossed=materials_crossed, is_valid=len(materials_crossed) < 3 # Too many walls = not viable ) async def _find_reflection_paths( self, tx_lat, tx_lon, tx_height, rx_lat, rx_lon, rx_height, frequency_mhz, buildings: List[Building] ) -> List[RayPath]: """Find viable single-bounce reflection paths""" reflection_paths = [] for building in buildings: # Find potential reflection points on building walls reflection_point = self._find_reflection_point( tx_lat, tx_lon, rx_lat, rx_lon, building ) if not reflection_point: continue ref_lat, ref_lon = reflection_point # Check if both segments are clear # TX -> Reflection point dist1 = terrain_service.haversine_distance(tx_lat, tx_lon, ref_lat, ref_lon) # Reflection point -> RX dist2 = terrain_service.haversine_distance(ref_lat, ref_lon, rx_lat, rx_lon) total_distance = dist1 + dist2 # Don't consider if much longer than direct path direct_distance = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon) if total_distance > direct_distance * 2: continue # Calculate path loss path_loss = self._calculate_path_loss(total_distance, frequency_mhz, tx_height, rx_height) # Add reflection loss material = materials_service.detect_material(building.tags) path_loss += materials_service.get_reflection_loss(material) reflection_paths.append(RayPath( path_type="reflected", total_distance=total_distance, path_loss=path_loss, reflection_points=[(ref_lat, ref_lon)], materials_crossed=[], is_valid=True )) return reflection_paths async def _find_diffraction_path( self, tx_lat, tx_lon, tx_height, rx_lat, rx_lon, rx_height, frequency_mhz, buildings: List[Building] ) -> Optional[RayPath]: """Find over-roof diffraction path""" # Find highest obstacle between TX and RX max_height = 0 obstacle_lat, obstacle_lon = None, None # Sample points along direct path num_samples = 20 for i in range(1, num_samples - 1): t = i / num_samples lat = tx_lat + t * (rx_lat - tx_lat) lon = tx_lon + t * (rx_lon - tx_lon) # Check terrain terrain_elev = await terrain_service.get_elevation(lat, lon) if terrain_elev > max_height: max_height = terrain_elev obstacle_lat, obstacle_lon = lat, lon # Check buildings at this point for building in buildings: if buildings_service.point_in_building(lat, lon, building): if building.height > max_height: max_height = building.height obstacle_lat, obstacle_lon = lat, lon if not obstacle_lat: return None # Calculate diffraction loss (simplified knife-edge) distance = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon) # Fresnel parameter tx_elev = await terrain_service.get_elevation(tx_lat, tx_lon) rx_elev = await terrain_service.get_elevation(rx_lat, rx_lon) tx_total = tx_elev + tx_height rx_total = rx_elev + rx_height # Height of LoS at obstacle point d1 = terrain_service.haversine_distance(tx_lat, tx_lon, obstacle_lat, obstacle_lon) los_height = tx_total + (rx_total - tx_total) * (d1 / distance) clearance = los_height - max_height # Knife-edge diffraction loss diffraction_loss = self._knife_edge_loss(clearance, frequency_mhz, distance, d1) path_loss = self._calculate_path_loss(distance, frequency_mhz, tx_height, rx_height) path_loss += diffraction_loss return RayPath( path_type="diffracted", total_distance=distance, path_loss=path_loss, reflection_points=[(obstacle_lat, obstacle_lon)], materials_crossed=[], is_valid=True ) def _find_reflection_point( self, tx_lat: float, tx_lon: float, rx_lat: float, rx_lon: float, building: Building ) -> Optional[Tuple[float, float]]: """Find specular reflection point on building wall""" # Simplified: find closest wall segment and calculate reflection geometry = building.geometry best_point = None best_score = float('inf') for i in range(len(geometry) - 1): wall_start = geometry[i] wall_end = geometry[i + 1] # Find reflection point on this wall segment ref_point = self._specular_reflection( tx_lon, tx_lat, rx_lon, rx_lat, wall_start[0], wall_start[1], wall_end[0], wall_end[1] ) if ref_point: # Score by total path length d1 = np.sqrt((ref_point[0] - tx_lon)**2 + (ref_point[1] - tx_lat)**2) d2 = np.sqrt((ref_point[0] - rx_lon)**2 + (ref_point[1] - rx_lat)**2) score = d1 + d2 if score < best_score: best_score = score best_point = (ref_point[1], ref_point[0]) # Return as (lat, lon) return best_point def _specular_reflection( self, tx_x, tx_y, rx_x, rx_y, wall_x1, wall_y1, wall_x2, wall_y2 ) -> Optional[Tuple[float, float]]: """Calculate specular reflection point on wall segment""" # Wall vector wall_dx = wall_x2 - wall_x1 wall_dy = wall_y2 - wall_y1 wall_len = np.sqrt(wall_dx**2 + wall_dy**2) if wall_len < 1e-10: return None # Wall normal normal_x = -wall_dy / wall_len normal_y = wall_dx / wall_len # Mirror TX across wall # Project TX onto wall tx_rel_x = tx_x - wall_x1 tx_rel_y = tx_y - wall_y1 dot = tx_rel_x * normal_x + tx_rel_y * normal_y mirror_x = tx_x - 2 * dot * normal_x mirror_y = tx_y - 2 * dot * normal_y # Find intersection of (mirror -> RX) with wall # Parametric line: mirror + t * (rx - mirror) dx = rx_x - mirror_x dy = rx_y - mirror_y # Wall parametric: wall1 + s * (wall2 - wall1) denom = dx * wall_dy - dy * wall_dx if abs(denom) < 1e-10: return None # Parallel t = ((wall_x1 - mirror_x) * wall_dy - (wall_y1 - mirror_y) * wall_dx) / denom s = ((wall_x1 - mirror_x) * dy - (wall_y1 - mirror_y) * dx) / (-denom) # Check if intersection is on wall segment and between mirror and RX if 0 <= s <= 1 and 0 <= t <= 1: ref_x = mirror_x + t * dx ref_y = mirror_y + t * dy return (ref_x, ref_y) return None def _line_intersects_building_3d( self, lat1, lon1, height1, lat2, lon2, height2, building: Building ) -> bool: """Check if 3D line intersects building volume""" # Sample along line for t in np.linspace(0, 1, 20): lat = lat1 + t * (lat2 - lat1) lon = lon1 + t * (lon2 - lon1) height = height1 + t * (height2 - height1) if buildings_service.point_in_building(lat, lon, building): if height < building.height: return True return False def _calculate_path_loss(self, distance, frequency_mhz, tx_height, rx_height) -> float: """Okumura-Hata path loss""" d_km = max(distance / 1000, 0.1) a_hm = (1.1 * np.log10(frequency_mhz) - 0.7) * rx_height - (1.56 * np.log10(frequency_mhz) - 0.8) L = (69.55 + 26.16 * np.log10(frequency_mhz) - 13.82 * np.log10(tx_height) - a_hm + (44.9 - 6.55 * np.log10(tx_height)) * np.log10(d_km)) return L def _knife_edge_loss(self, clearance, frequency_mhz, total_distance, d1) -> float: """Knife-edge diffraction loss""" if clearance >= 0: return 0.0 wavelength = 300 / frequency_mhz d2 = total_distance - d1 # Fresnel parameter v v = abs(clearance) * np.sqrt(2 * (d1 + d2) / (wavelength * d1 * d2)) # Lee's approximation if v <= -0.78: return 0 elif v < 0: return 6.02 + 9.11 * v - 1.27 * v**2 elif v < 2.4: return 6.02 + 9.11 * v + 1.27 * v**2 else: return 13 + 20 * np.log10(v) dominant_path_service = DominantPathService() ``` --- ### Task 3: Street Canyon Service (4-5 hours) **app/services/street_canyon_service.py:** ```python import numpy as np from typing import List, Tuple, Optional from dataclasses import dataclass import httpx from pathlib import Path import json @dataclass class Street: """Street segment from OSM""" id: int name: Optional[str] geometry: List[Tuple[float, float]] # [(lat, lon), ...] width: float # meters highway_type: str # residential, primary, secondary, etc. class StreetCanyonService: """ Street canyon propagation model (ITU-R P.1411) Signal propagates along streets with reflections from building walls. Loss increases at corners/turns. """ OVERPASS_URL = "https://overpass-api.de/api/interpreter" # Default street widths by type STREET_WIDTHS = { "motorway": 25.0, "trunk": 20.0, "primary": 15.0, "secondary": 12.0, "tertiary": 10.0, "residential": 8.0, "unclassified": 6.0, "service": 5.0, "footway": 2.0, "path": 1.5, } # Corner/turn loss CORNER_LOSS_90 = 10.0 # dB for 90-degree turn CORNER_LOSS_45 = 4.0 # dB for 45-degree turn def __init__(self, cache_dir: str = "/opt/rfcp/backend/data/streets"): self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(exist_ok=True, parents=True) self._cache: dict[str, List[Street]] = {} async def fetch_streets( self, min_lat: float, min_lon: float, max_lat: float, max_lon: float ) -> List[Street]: """Fetch street network from OSM""" cache_key = f"{min_lat:.3f}_{min_lon:.3f}_{max_lat:.3f}_{max_lon:.3f}" # Check cache if cache_key in self._cache: return self._cache[cache_key] cache_file = self.cache_dir / f"{cache_key}.json" if cache_file.exists(): with open(cache_file) as f: data = json.load(f) streets = [Street(**s) for s in data] self._cache[cache_key] = streets return streets # Fetch from Overpass query = f""" [out:json][timeout:30]; way["highway"]({min_lat},{min_lon},{max_lat},{max_lon}); out body; >; out skel qt; """ try: async with httpx.AsyncClient(timeout=60.0) as client: response = await client.post(self.OVERPASS_URL, data={"data": query}) response.raise_for_status() data = response.json() except Exception as e: print(f"Street fetch error: {e}") return [] streets = self._parse_streets(data) # Cache with open(cache_file, 'w') as f: json.dump([{ "id": s.id, "name": s.name, "geometry": s.geometry, "width": s.width, "highway_type": s.highway_type } for s in streets], f) self._cache[cache_key] = streets return streets def _parse_streets(self, data: dict) -> List[Street]: """Parse Overpass response into Street objects""" nodes = {} for element in data.get("elements", []): if element["type"] == "node": nodes[element["id"]] = (element["lat"], element["lon"]) streets = [] for element in data.get("elements", []): if element["type"] != "way": continue tags = element.get("tags", {}) if "highway" not in tags: continue highway_type = tags["highway"] # Skip non-road types if highway_type in ["bus_stop", "crossing", "traffic_signals"]: continue geometry = [] for node_id in element.get("nodes", []): if node_id in nodes: geometry.append(nodes[node_id]) if len(geometry) < 2: continue # Get width width = self._get_street_width(tags) streets.append(Street( id=element["id"], name=tags.get("name"), geometry=geometry, width=width, highway_type=highway_type )) return streets def _get_street_width(self, tags: dict) -> float: """Estimate street width from OSM tags""" # Explicit width if "width" in tags: try: return float(tags["width"].replace("m", "").strip()) except: pass # Calculate from lanes if "lanes" in tags: try: lanes = int(tags["lanes"]) return lanes * 3.5 # ~3.5m per lane except: pass # Default by highway type highway_type = tags.get("highway", "residential") return self.STREET_WIDTHS.get(highway_type, 8.0) async def calculate_street_canyon_loss( self, tx_lat: float, tx_lon: float, tx_height: float, rx_lat: float, rx_lon: float, rx_height: float, frequency_mhz: float, streets: List[Street] ) -> Tuple[float, List[Tuple[float, float]]]: """ Calculate path loss through street canyon Returns: (path_loss_db, street_path as list of points) """ # Find path along streets from TX to RX street_path = self._find_street_path(tx_lat, tx_lon, rx_lat, rx_lon, streets) if not street_path: return float('inf'), [] # No street path found # Calculate loss along path total_loss = 0.0 total_distance = 0.0 for i in range(len(street_path) - 1): p1 = street_path[i] p2 = street_path[i + 1] # Segment distance from app.services.terrain_service import TerrainService segment_dist = TerrainService.haversine_distance(p1[0], p1[1], p2[0], p2[1]) total_distance += segment_dist # Street canyon loss (ITU-R P.1411 simplified) # L = 32.4 + 20*log10(f_MHz) + 20*log10(d_km) if segment_dist > 0: segment_loss = 32.4 + 20 * np.log10(frequency_mhz) + 20 * np.log10(segment_dist / 1000 + 0.001) total_loss += segment_loss * (segment_dist / total_distance) if total_distance > 0 else 0 # Corner loss if i > 0: corner_angle = self._calculate_corner_angle( street_path[i-1], p1, p2 ) corner_loss = self._corner_loss(corner_angle) total_loss += corner_loss return total_loss, street_path def _find_street_path( self, start_lat: float, start_lon: float, end_lat: float, end_lon: float, streets: List[Street] ) -> List[Tuple[float, float]]: """ Find path along streets (simplified A* / greedy) Returns list of (lat, lon) waypoints """ # Find nearest street point to start and end start_point = self._nearest_street_point(start_lat, start_lon, streets) end_point = self._nearest_street_point(end_lat, end_lon, streets) if not start_point or not end_point: return [] # Simplified: just return direct street segments # Full implementation would use A* pathfinding path = [(start_lat, start_lon), start_point] # Add intermediate points along streets toward destination current = start_point visited = set() for _ in range(50): # Max iterations if self._distance(current, end_point) < 50: # Within 50m break # Find next street segment toward destination next_point = self._next_street_point(current, end_point, streets, visited) if not next_point: break path.append(next_point) visited.add((round(current[0], 5), round(current[1], 5))) current = next_point path.append(end_point) path.append((end_lat, end_lon)) return path def _nearest_street_point( self, lat: float, lon: float, streets: List[Street] ) -> Optional[Tuple[float, float]]: """Find nearest point on any street""" best_point = None best_dist = float('inf') for street in streets: for point in street.geometry: dist = self._distance((lat, lon), point) if dist < best_dist: best_dist = dist best_point = point return best_point if best_dist < 200 else None # Max 200m to street def _next_street_point( self, current: Tuple[float, float], target: Tuple[float, float], streets: List[Street], visited: set ) -> Optional[Tuple[float, float]]: """Find next street point toward target""" best_point = None best_score = float('inf') for street in streets: for i, point in enumerate(street.geometry): if (round(point[0], 5), round(point[1], 5)) in visited: continue dist_from_current = self._distance(current, point) dist_to_target = self._distance(point, target) # Must be close to current position if dist_from_current > 100: continue # Score: prefer points closer to target score = dist_to_target + dist_from_current * 0.5 if score < best_score: best_score = score best_point = point return best_point def _distance(self, p1: Tuple[float, float], p2: Tuple[float, float]) -> float: """Quick distance approximation (meters)""" lat_diff = (p1[0] - p2[0]) * 111000 lon_diff = (p1[1] - p2[1]) * 111000 * np.cos(np.radians(p1[0])) return np.sqrt(lat_diff**2 + lon_diff**2) def _calculate_corner_angle( self, p1: Tuple[float, float], p2: Tuple[float, float], p3: Tuple[float, float] ) -> float: """Calculate angle at corner (degrees)""" v1 = (p1[0] - p2[0], p1[1] - p2[1]) v2 = (p3[0] - p2[0], p3[1] - p2[1]) dot = v1[0] * v2[0] + v1[1] * v2[1] mag1 = np.sqrt(v1[0]**2 + v1[1]**2) mag2 = np.sqrt(v2[0]**2 + v2[1]**2) if mag1 * mag2 < 1e-10: return 180.0 cos_angle = dot / (mag1 * mag2) cos_angle = max(-1, min(1, cos_angle)) return np.degrees(np.arccos(cos_angle)) def _corner_loss(self, angle_degrees: float) -> float: """Calculate loss due to corner/turn""" # Straight = 180ยฐ, right angle = 90ยฐ turn_angle = abs(180 - angle_degrees) if turn_angle < 15: return 0.0 elif turn_angle < 45: return self.CORNER_LOSS_45 * (turn_angle / 45) elif turn_angle < 90: return self.CORNER_LOSS_45 + (self.CORNER_LOSS_90 - self.CORNER_LOSS_45) * ((turn_angle - 45) / 45) else: return self.CORNER_LOSS_90 + (turn_angle - 90) * 0.2 # Extra loss for sharp turns street_canyon_service = StreetCanyonService() ``` --- ### Task 4: Reflection Service (3-4 hours) **app/services/reflection_service.py:** ```python import numpy as np from typing import List, Tuple, Optional from dataclasses import dataclass from app.services.buildings_service import Building from app.services.materials_service import materials_service @dataclass class ReflectionPath: """A reflection path with one or more bounces""" points: List[Tuple[float, float]] # [TX, reflection1, reflection2, ..., RX] total_distance: float total_loss: float reflection_count: int materials: List[str] class ReflectionService: """ Calculate reflection paths for RF propagation - Single bounce (most common) - Double bounce (around corners) - Ground reflection """ MAX_BOUNCES = 2 GROUND_REFLECTION_COEFF = 0.3 # Depends on surface async def find_reflection_paths( self, tx_lat: float, tx_lon: float, tx_height: float, rx_lat: float, rx_lon: float, rx_height: float, frequency_mhz: float, buildings: List[Building], include_ground: bool = True ) -> List[ReflectionPath]: """Find all viable reflection paths""" paths = [] # Single-bounce building reflections for building in buildings: path = self._find_single_bounce( tx_lat, tx_lon, tx_height, rx_lat, rx_lon, rx_height, frequency_mhz, building ) if path: paths.append(path) # Ground reflection if include_ground: ground_path = self._calculate_ground_reflection( tx_lat, tx_lon, tx_height, rx_lat, rx_lon, rx_height, frequency_mhz ) if ground_path: paths.append(ground_path) # Sort by loss (best first) paths.sort(key=lambda p: p.total_loss) return paths[:5] # Return top 5 def _find_single_bounce( self, tx_lat, tx_lon, tx_height, rx_lat, rx_lon, rx_height, frequency_mhz, building: Building ) -> Optional[ReflectionPath]: """Find single-bounce reflection off building""" # Find reflection point on building walls geometry = building.geometry for i in range(len(geometry) - 1): wall_start = geometry[i] wall_end = geometry[i + 1] ref_point = self._specular_reflection_point( (tx_lon, tx_lat), (rx_lon, rx_lat), wall_start, wall_end ) if not ref_point: continue ref_lat, ref_lon = ref_point[1], ref_point[0] # Calculate distances from app.services.terrain_service import TerrainService d1 = TerrainService.haversine_distance(tx_lat, tx_lon, ref_lat, ref_lon) d2 = TerrainService.haversine_distance(ref_lat, ref_lon, rx_lat, rx_lon) total_dist = d1 + d2 # Direct distance check - reflection shouldn't be much longer direct_dist = TerrainService.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon) if total_dist > direct_dist * 1.5: continue # Path loss path_loss = self._free_space_loss(total_dist, frequency_mhz) # Reflection loss material = materials_service.detect_material(building.tags) reflection_loss = materials_service.get_reflection_loss(material) total_loss = path_loss + reflection_loss return ReflectionPath( points=[(tx_lat, tx_lon), (ref_lat, ref_lon), (rx_lat, rx_lon)], total_distance=total_dist, total_loss=total_loss, reflection_count=1, materials=[material.value] ) return None def _calculate_ground_reflection( self, tx_lat, tx_lon, tx_height, rx_lat, rx_lon, rx_height, frequency_mhz ) -> Optional[ReflectionPath]: """Calculate ground reflection path""" from app.services.terrain_service import TerrainService # Reflection point (simplified - midpoint for flat ground) mid_lat = (tx_lat + rx_lat) / 2 mid_lon = (tx_lon + rx_lon) / 2 # Path lengths d1 = TerrainService.haversine_distance(tx_lat, tx_lon, mid_lat, mid_lon) d2 = TerrainService.haversine_distance(mid_lat, mid_lon, rx_lat, rx_lon) # Actual path length considering heights path1 = np.sqrt(d1**2 + tx_height**2) path2 = np.sqrt(d2**2 + rx_height**2) total_dist = path1 + path2 # Path loss path_loss = self._free_space_loss(total_dist, frequency_mhz) # Ground reflection loss (~5-10 dB typically) ground_reflection_loss = -10 * np.log10(self.GROUND_REFLECTION_COEFF) # Phase difference can cause constructive or destructive interference # Simplified: assume average case total_loss = path_loss + ground_reflection_loss return ReflectionPath( points=[(tx_lat, tx_lon), (mid_lat, mid_lon), (rx_lat, rx_lon)], total_distance=total_dist, total_loss=total_loss, reflection_count=1, materials=["ground"] ) def _specular_reflection_point( self, tx: Tuple[float, float], # (lon, lat) rx: Tuple[float, float], wall_start: List[float], # [lon, lat] wall_end: List[float] ) -> Optional[Tuple[float, float]]: """Calculate specular reflection point on wall""" # Wall vector wx = wall_end[0] - wall_start[0] wy = wall_end[1] - wall_start[1] wall_len = np.sqrt(wx**2 + wy**2) if wall_len < 1e-10: return None # Normalize wx /= wall_len wy /= wall_len # Wall normal (perpendicular) nx = -wy ny = wx # Vector from wall start to TX tx_rel_x = tx[0] - wall_start[0] tx_rel_y = tx[1] - wall_start[1] # Distance from TX to wall line dist_to_wall = tx_rel_x * nx + tx_rel_y * ny # Mirror TX across wall mirror_x = tx[0] - 2 * dist_to_wall * nx mirror_y = tx[1] - 2 * dist_to_wall * ny # Line from mirror to RX dx = rx[0] - mirror_x dy = rx[1] - mirror_y # Find intersection with wall # Parametric: wall_start + t * wall_dir # Parametric: mirror + s * (rx - mirror) denom = dx * wy - dy * wx if abs(denom) < 1e-10: return None t = ((wall_start[0] - mirror_x) * wy - (wall_start[1] - mirror_y) * wx) / denom s = ((wall_start[0] - mirror_x) * dy - (wall_start[1] - mirror_y) * dx) / (-denom) if denom != 0 else 0 # Check if on wall segment and between mirror and RX if 0 <= s <= 1 and 0 <= t <= 1: ref_x = mirror_x + t * dx ref_y = mirror_y + t * dy return (ref_x, ref_y) return None def _free_space_loss(self, distance: float, frequency_mhz: float) -> float: """Free space path loss (dB)""" if distance <= 0: distance = 1 # FSPL = 20*log10(d) + 20*log10(f) + 20*log10(4*pi/c) # Simplified: FSPL = 32.45 + 20*log10(f_MHz) + 20*log10(d_km) d_km = distance / 1000 return 32.45 + 20 * np.log10(frequency_mhz) + 20 * np.log10(d_km + 0.001) def combine_paths( self, direct_power_dbm: float, reflection_paths: List[ReflectionPath], tx_power_dbm: float ) -> float: """ Combine direct and reflected signals (power sum) Returns total received power in dBm """ # Convert to linear power powers = [] if direct_power_dbm > -150: # Valid direct signal powers.append(10 ** (direct_power_dbm / 10)) for path in reflection_paths: reflected_power_dbm = tx_power_dbm - path.total_loss if reflected_power_dbm > -150: powers.append(10 ** (reflected_power_dbm / 10)) if not powers: return -150.0 # No signal # Sum powers (incoherent addition - conservative estimate) total_power = sum(powers) return 10 * np.log10(total_power) reflection_service = ReflectionService() ``` --- ### Task 5: Update Coverage Service (2-3 hours) **Update app/services/coverage_service.py:** ```python # Add imports from app.services.materials_service import materials_service from app.services.dominant_path_service import dominant_path_service from app.services.street_canyon_service import street_canyon_service from app.services.reflection_service import reflection_service # Update CoverageSettings class CoverageSettings(BaseModel): radius: float = 10000 resolution: float = 200 min_signal: float = -120 # Layer toggles use_terrain: bool = True use_buildings: bool = True use_materials: bool = True use_dominant_path: bool = False use_street_canyon: bool = False use_reflections: bool = False # Preset preset: Optional[str] = None # fast, standard, detailed, full # Add preset application PRESETS = { "fast": {...}, "standard": {...}, "detailed": {...}, "full": {...}, } def apply_preset(settings: CoverageSettings) -> CoverageSettings: if settings.preset and settings.preset in PRESETS: for key, value in PRESETS[settings.preset].items(): setattr(settings, key, value) return settings # Update _calculate_point method async def _calculate_point(self, site, lat, lon, settings, buildings, streets): """Enhanced point calculation with all propagation models""" # ... existing code ... # Material-aware building loss if settings.use_buildings and settings.use_materials: for building in buildings: if self._intersects_building(site, lat, lon, building): material = materials_service.detect_material(building.tags) building_loss += materials_service.get_penetration_loss( material, site.frequency ) # Dominant path (find best route) if settings.use_dominant_path: paths = await dominant_path_service.find_dominant_paths( site.lat, site.lon, site.height, lat, lon, 1.5, site.frequency, buildings ) if paths: best_path = paths[0] # Use best path's loss instead of simple calculation path_loss = best_path.path_loss # Street canyon if settings.use_street_canyon and streets: canyon_loss, street_path = await street_canyon_service.calculate_street_canyon_loss( site.lat, site.lon, site.height, lat, lon, 1.5, site.frequency, streets ) # Use canyon loss if better than direct if canyon_loss < path_loss + terrain_loss + building_loss: path_loss = canyon_loss terrain_loss = 0 building_loss = 0 # Reflections reflection_gain = 0.0 if settings.use_reflections: reflection_paths = await reflection_service.find_reflection_paths( site.lat, site.lon, site.height, lat, lon, 1.5, site.frequency, buildings ) if reflection_paths: # Combine direct and reflected direct_rsrp = site.power + site.gain - path_loss - terrain_loss - building_loss combined_rsrp = reflection_service.combine_paths( direct_rsrp, reflection_paths, site.power + site.gain ) reflection_gain = combined_rsrp - direct_rsrp # Final RSRP rsrp = site.power + site.gain - path_loss - antenna_loss - terrain_loss - building_loss + reflection_gain return CoveragePoint( lat=lat, lon=lon, rsrp=rsrp, distance=distance, has_los=has_los, terrain_loss=terrain_loss, building_loss=building_loss, reflection_gain=reflection_gain # NEW ) ``` --- ### Task 6: Update API & Add Presets Endpoint (1-2 hours) **Update app/api/routes/coverage.py:** ```python @router.get("/presets") async def get_presets(): """Get available propagation model presets""" return { "presets": { "fast": { "description": "Quick calculation - terrain only", "use_terrain": True, "use_buildings": False, "use_materials": False, "use_dominant_path": False, "use_street_canyon": False, "use_reflections": False, "estimated_speed": "~5 seconds for 5km radius" }, "standard": { "description": "Balanced - terrain + buildings with materials", "use_terrain": True, "use_buildings": True, "use_materials": True, "use_dominant_path": False, "use_street_canyon": False, "use_reflections": False, "estimated_speed": "~30 seconds for 5km radius" }, "detailed": { "description": "Accurate - adds dominant path analysis", "use_terrain": True, "use_buildings": True, "use_materials": True, "use_dominant_path": True, "use_street_canyon": False, "use_reflections": False, "estimated_speed": "~2 minutes for 5km radius" }, "full": { "description": "Maximum realism - all models enabled", "use_terrain": True, "use_buildings": True, "use_materials": True, "use_dominant_path": True, "use_street_canyon": True, "use_reflections": True, "estimated_speed": "~5 minutes for 5km radius" } } } # Update CoverageResponse class CoverageResponse(BaseModel): points: List[CoveragePoint] count: int settings: CoverageSettings stats: dict computation_time: float # NEW - seconds models_used: List[str] # NEW - which models were active ``` --- ## ๐Ÿ“ Files Summary ### New Files: ``` app/services/ โ”œโ”€โ”€ materials_service.py # Building material RF properties โ”œโ”€โ”€ dominant_path_service.py # Multi-path ray analysis โ”œโ”€โ”€ street_canyon_service.py # Street network propagation โ””โ”€โ”€ reflection_service.py # Reflection calculations data/streets/ # Cached street network ``` ### Modified Files: ``` app/services/ โ”œโ”€โ”€ buildings_service.py # Add material & tags fields โ””โ”€โ”€ coverage_service.py # Integrate all models app/api/routes/ โ””โ”€โ”€ coverage.py # Add presets endpoint, timing app/models/ โ””โ”€โ”€ coverage.py # Extended settings model ``` --- ## โœ… Success Criteria - [ ] `/api/coverage/presets` returns all 4 presets with descriptions - [ ] `preset: "full"` enables all toggles - [ ] Material detection working (check logs for detected materials) - [ ] Reflection paths found (check `reflection_gain` in response) - [ ] Street canyon reduces loss along roads - [ ] `computation_time` reported in response - [ ] `models_used` shows active models - [ ] Performance: "fast" < 10s, "standard" < 1min, "detailed" < 3min, "full" < 10min for 5km --- ## ๐Ÿงช Test Commands ```bash # Test presets endpoint curl https://api.rfcp.eliah.one/api/coverage/presets | jq # Fast preset curl -X POST "https://api.rfcp.eliah.one/api/coverage/calculate" \ -H "Content-Type: application/json" \ -d '{"sites":[{"lat":48.46,"lon":35.05,"height":30,"power":43,"gain":15,"frequency":1800}],"settings":{"radius":2000,"resolution":100,"preset":"fast"}}' | jq '.stats, .computation_time, .models_used' # Full preset (will be slow!) curl -X POST "https://api.rfcp.eliah.one/api/coverage/calculate" \ -H "Content-Type: application/json" \ -d '{"sites":[{"lat":48.46,"lon":35.05,"height":30,"power":43,"gain":15,"frequency":1800}],"settings":{"radius":1000,"resolution":100,"preset":"full"}}' | jq '.stats, .computation_time' # Check material detection curl "https://api.rfcp.eliah.one/api/coverage/buildings?min_lat=48.455&min_lon=35.045&max_lat=48.465&max_lon=35.055" | jq '.buildings[:3] | .[].material' ``` --- ## ๐Ÿ“ Notes - Street canyon requires road network fetch (additional Overpass query) - Reflection calculations are CPU-intensive โ€” consider caching - Full model on large areas may timeout โ€” implement background tasks later (1.4.1) - Material detection fallback chain: explicit tag โ†’ facade tag โ†’ building type โ†’ default --- ## ๐Ÿ”œ Next Iterations **1.4.1 โ€” Enhanced Environment (Should have):** - Spatial indexing (R-tree) for buildings - Ground reflection improvements - Water body reflection (OSM natural=water) - Vegetation loss (OSM landuse=forest) **1.4.2 โ€” Extra Factors (Could have):** - Weather/rain attenuation (ITU-R P.838) - Indoor penetration layer - Seasonal vegetation toggle --- **Ready for Claude Code** ๐Ÿš€