diff --git a/backend/app/api/routes/coverage.py b/backend/app/api/routes/coverage.py index 712939c..582a1cb 100644 --- a/backend/app/api/routes/coverage.py +++ b/backend/app/api/routes/coverage.py @@ -1,4 +1,5 @@ import time +import asyncio from fastapi import APIRouter, HTTPException, BackgroundTasks from typing import List, Optional @@ -59,17 +60,26 @@ async def calculate_coverage(request: CoverageRequest) -> CoverageResponse: # Time the calculation start_time = time.time() - # Calculate - if len(request.sites) == 1: - points = await coverage_service.calculate_coverage( - request.sites[0], - request.settings - ) - else: - points = await coverage_service.calculate_multi_site_coverage( - request.sites, - request.settings - ) + try: + # Calculate with 5-minute timeout + if len(request.sites) == 1: + points = await asyncio.wait_for( + coverage_service.calculate_coverage( + request.sites[0], + request.settings + ), + timeout=300.0 + ) + else: + points = await asyncio.wait_for( + coverage_service.calculate_multi_site_coverage( + request.sites, + request.settings + ), + timeout=300.0 + ) + except asyncio.TimeoutError: + raise HTTPException(408, "Calculation timeout (5 min) — try smaller radius or lower resolution") computation_time = time.time() - start_time @@ -85,6 +95,7 @@ async def calculate_coverage(request: CoverageRequest) -> CoverageResponse: "points_with_buildings": sum(1 for p in points if p.building_loss > 0), "points_with_terrain_loss": sum(1 for p in points if p.terrain_loss > 0), "points_with_reflection_gain": sum(1 for p in points if p.reflection_gain > 0), + "points_with_vegetation_loss": sum(1 for p in points if p.vegetation_loss > 0), } return CoverageResponse( @@ -113,12 +124,12 @@ async def get_presets(): "estimated_speed": "~30 seconds for 5km radius" }, "detailed": { - "description": "Accurate - adds dominant path analysis", + "description": "Accurate - adds dominant path + vegetation", **PRESETS["detailed"], "estimated_speed": "~2 minutes for 5km radius" }, "full": { - "description": "Maximum realism - all models enabled", + "description": "Maximum realism - all models + water + vegetation", **PRESETS["full"], "estimated_speed": "~5 minutes for 5km radius" } @@ -168,5 +179,9 @@ def _get_active_models(settings: CoverageSettings) -> List[str]: models.append("street_canyon") if settings.use_reflections: models.append("reflections") + if settings.use_water_reflection: + models.append("water_reflection") + if settings.use_vegetation: + models.append("vegetation") return models diff --git a/backend/app/main.py b/backend/app/main.py index 94f6cb8..0fb96da 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -17,7 +17,7 @@ async def lifespan(app: FastAPI): app = FastAPI( title="RFCP Backend API", description="RF Coverage Planning Backend", - version="1.5.1", + version="1.6.0", lifespan=lifespan, ) diff --git a/backend/app/services/buildings_service.py b/backend/app/services/buildings_service.py index a572ba9..df8e500 100644 --- a/backend/app/services/buildings_service.py +++ b/backend/app/services/buildings_service.py @@ -1,3 +1,4 @@ +import re import httpx import asyncio from typing import List, Optional @@ -34,6 +35,33 @@ class BuildingsService: self._memory_cache: dict[str, List[Building]] = {} self._max_cache_size = 50 # bbox regions + @staticmethod + def _safe_int(value) -> Optional[int]: + """Safely parse int from OSM tag (handles '1а', '2-3', '5+', etc.)""" + if not value: + return None + try: + return int(value) + except (ValueError, TypeError): + match = re.search(r'\d+', str(value)) + if match: + return int(match.group()) + return None + + @staticmethod + def _safe_float(value) -> Optional[float]: + """Safely parse float from OSM tag (handles '10 m', '~12', '10m')""" + if not value: + return None + try: + cleaned = str(value).lower().replace('m', '').replace('~', '').strip() + return float(cleaned) + except (ValueError, TypeError): + match = re.search(r'[\d.]+', str(value)) + if match: + return float(match.group()) + return None + def _bbox_key(self, min_lat: float, min_lon: float, max_lat: float, max_lon: float) -> str: """Generate cache key for bbox""" # Round to 0.01 degree (~1km) grid for cache efficiency @@ -157,7 +185,7 @@ class BuildingsService: id=element["id"], geometry=geometry, height=height, - levels=int(tags.get("building:levels", 0)) or None, + levels=self._safe_int(tags.get("building:levels")), building_type=tags.get("building"), material=material_str, tags=tags @@ -169,22 +197,15 @@ class BuildingsService: """Estimate building height from OSM tags""" # Explicit height tag if "height" in tags: - try: - h = tags["height"] - # Handle "10 m" or "10m" format - if isinstance(h, str): - h = h.replace("m", "").replace(" ", "") - return float(h) - except (ValueError, TypeError): - pass + h = self._safe_float(tags["height"]) + if h is not None and h > 0: + return h # Calculate from levels if "building:levels" in tags: - try: - levels = int(tags["building:levels"]) + levels = self._safe_int(tags["building:levels"]) + if levels is not None and levels > 0: return levels * self.DEFAULT_LEVEL_HEIGHT - except (ValueError, TypeError): - pass # Default based on building type building_type = tags.get("building", "yes") diff --git a/backend/app/services/coverage_service.py b/backend/app/services/coverage_service.py index 33b66da..e239ea0 100644 --- a/backend/app/services/coverage_service.py +++ b/backend/app/services/coverage_service.py @@ -9,6 +9,9 @@ from app.services.materials_service import materials_service from app.services.dominant_path_service import dominant_path_service from app.services.street_canyon_service import street_canyon_service, Street from app.services.reflection_service import reflection_service +from app.services.spatial_index import get_spatial_index, SpatialIndex +from app.services.water_service import water_service, WaterBody +from app.services.vegetation_service import vegetation_service, VegetationArea class CoveragePoint(BaseModel): @@ -19,7 +22,8 @@ class CoveragePoint(BaseModel): has_los: bool terrain_loss: float # dB building_loss: float # dB - reflection_gain: float = 0.0 # dB (NEW) + reflection_gain: float = 0.0 # dB + vegetation_loss: float = 0.0 # dB class CoverageSettings(BaseModel): @@ -34,6 +38,11 @@ class CoverageSettings(BaseModel): use_dominant_path: bool = False use_street_canyon: bool = False use_reflections: bool = False + use_water_reflection: bool = False + use_vegetation: bool = False + + # Vegetation season + season: str = "summer" # Preset preset: Optional[str] = None # fast, standard, detailed, full @@ -48,6 +57,8 @@ PRESETS = { "use_dominant_path": False, "use_street_canyon": False, "use_reflections": False, + "use_water_reflection": False, + "use_vegetation": False, }, "standard": { "use_terrain": True, @@ -56,6 +67,8 @@ PRESETS = { "use_dominant_path": False, "use_street_canyon": False, "use_reflections": False, + "use_water_reflection": False, + "use_vegetation": False, }, "detailed": { "use_terrain": True, @@ -64,6 +77,8 @@ PRESETS = { "use_dominant_path": True, "use_street_canyon": False, "use_reflections": False, + "use_water_reflection": False, + "use_vegetation": True, }, "full": { "use_terrain": True, @@ -72,6 +87,8 @@ PRESETS = { "use_dominant_path": True, "use_street_canyon": True, "use_reflections": True, + "use_water_reflection": True, + "use_vegetation": True, }, } @@ -98,7 +115,7 @@ class SiteParams(BaseModel): class CoverageService: """ RF Coverage calculation with terrain, buildings, materials, - dominant path, street canyon, and reflections + dominant path, street canyon, reflections, water, and vegetation """ EARTH_RADIUS = 6371000 @@ -134,27 +151,49 @@ class CoverageService: lat_delta = settings.radius / 111000 lon_delta = settings.radius / (111000 * np.cos(np.radians(site.lat))) - # Fetch buildings for coverage area (if enabled) + min_lat = site.lat - lat_delta + max_lat = site.lat + lat_delta + min_lon = site.lon - lon_delta + max_lon = site.lon + lon_delta + + # Fetch buildings (if enabled) and build spatial index buildings: List[Building] = [] + spatial_idx: Optional[SpatialIndex] = None if settings.use_buildings: buildings = await self.buildings.fetch_buildings( - site.lat - lat_delta, site.lon - lon_delta, - site.lat + lat_delta, site.lon + lon_delta + min_lat, min_lon, max_lat, max_lon ) + if buildings: + cache_key = f"{min_lat:.3f},{min_lon:.3f},{max_lat:.3f},{max_lon:.3f}" + spatial_idx = get_spatial_index(cache_key, buildings) # Fetch streets (if street canyon enabled) streets: List[Street] = [] if settings.use_street_canyon: streets = await street_canyon_service.fetch_streets( - site.lat - lat_delta, site.lon - lon_delta, - site.lat + lat_delta, site.lon + lon_delta + min_lat, min_lon, max_lat, max_lon + ) + + # Fetch water bodies (if water reflection enabled) + water_bodies: List[WaterBody] = [] + if settings.use_water_reflection: + water_bodies = await water_service.fetch_water_bodies( + min_lat, min_lon, max_lat, max_lon + ) + + # Fetch vegetation (if enabled) + vegetation_areas: List[VegetationArea] = [] + if settings.use_vegetation: + vegetation_areas = await vegetation_service.fetch_vegetation( + min_lat, min_lon, max_lat, max_lon ) # Calculate coverage for each point for lat, lon in grid: point = await self._calculate_point( site, lat, lon, - settings, buildings, streets + settings, buildings, streets, + spatial_idx, water_bodies, vegetation_areas ) if point.rsrp >= settings.min_signal: @@ -230,7 +269,10 @@ class CoverageService: lat: float, lon: float, settings: CoverageSettings, buildings: List[Building], - streets: List[Street] + streets: List[Street], + spatial_idx: Optional[SpatialIndex], + water_bodies: List[WaterBody], + vegetation_areas: List[VegetationArea] ) -> CoveragePoint: """Calculate RSRP at a single point with all propagation models""" @@ -242,7 +284,7 @@ class CoverageService: # Base path loss (Okumura-Hata for urban) path_loss = self._okumura_hata( - distance, site.frequency, site.height, 1.5 # 1.5m receiver height + distance, site.frequency, site.height, 1.5 ) # Antenna pattern loss (if directional) @@ -260,22 +302,24 @@ class CoverageService: if settings.use_terrain: los_result = await self.los.check_line_of_sight( site.lat, site.lon, site.height, - lat, lon, 1.5 # receiver at 1.5m + lat, lon, 1.5 ) has_los = los_result["has_los"] if not has_los: - # Add diffraction loss based on clearance clearance = los_result["clearance"] terrain_loss = self._diffraction_loss(clearance, site.frequency) - # Building loss (with optional material awareness) + # Building loss — use spatial index for fast lookup building_loss = 0.0 + nearby_buildings = ( + spatial_idx.query_line(site.lat, site.lon, lat, lon) + if spatial_idx else buildings + ) - if settings.use_buildings and buildings: + if settings.use_buildings and nearby_buildings: if settings.use_materials: - # Material-aware building loss - for building in buildings: + for building in nearby_buildings: intersection = self.buildings.line_intersects_building( site.lat, site.lon, site.height + await self.terrain.get_elevation(site.lat, site.lon), lat, lon, 1.5 + await self.terrain.get_elevation(lat, lon), @@ -287,30 +331,28 @@ class CoverageService: material, site.frequency ) has_los = False - break # One building is enough + break else: - # Simple building loss (legacy behavior) - for building in buildings: + for building in nearby_buildings: intersection = self.buildings.line_intersects_building( site.lat, site.lon, site.height + await self.terrain.get_elevation(site.lat, site.lon), lat, lon, 1.5 + await self.terrain.get_elevation(lat, lon), building ) if intersection is not None: - building_loss += 20.0 # Default concrete + building_loss += 20.0 has_los = False break - # Dominant path analysis (find best route) - if settings.use_dominant_path and buildings: + # Dominant path analysis + if settings.use_dominant_path and nearby_buildings: paths = await dominant_path_service.find_dominant_paths( site.lat, site.lon, site.height, lat, lon, 1.5, - site.frequency, buildings + site.frequency, nearby_buildings ) if paths: best_path = paths[0] - # Use best path's loss if it's better if best_path.is_valid and best_path.path_loss < (path_loss + terrain_loss + building_loss): path_loss = best_path.path_loss terrain_loss = 0 @@ -324,30 +366,62 @@ class CoverageService: lat, lon, 1.5, site.frequency, streets ) - # Use canyon loss if better than current total if canyon_loss < (path_loss + terrain_loss + building_loss): path_loss = canyon_loss terrain_loss = 0 building_loss = 0 - # Reflections + # Vegetation loss + veg_loss = 0.0 + if settings.use_vegetation and vegetation_areas: + veg_loss = vegetation_service.calculate_vegetation_loss( + site.lat, site.lon, lat, lon, + vegetation_areas, settings.season + ) + + # Reflections (building + ground/water) reflection_gain = 0.0 - if settings.use_reflections and buildings: + if settings.use_reflections and nearby_buildings: + is_over_water = False + if settings.use_water_reflection and water_bodies: + is_over_water = water_service.point_over_water(lat, lon, water_bodies) is not None + reflection_paths = await reflection_service.find_reflection_paths( site.lat, site.lon, site.height, lat, lon, 1.5, - site.frequency, buildings + site.frequency, nearby_buildings, + include_ground=True ) + + # If over water, replace ground reflection with stronger water reflection + if is_over_water and reflection_paths: + water_path = reflection_service._calculate_ground_reflection( + site.lat, site.lon, site.height, + lat, lon, 1.5, + site.frequency, is_water=True + ) + if water_path: + reflection_paths = [ + p for p in reflection_paths if "ground" not in p.materials + ] + reflection_paths.append(water_path) + reflection_paths.sort(key=lambda p: p.total_loss) + if reflection_paths: - # Combine direct and reflected signals - direct_rsrp = site.power + site.gain - path_loss - antenna_loss - terrain_loss - building_loss + direct_rsrp = site.power + site.gain - path_loss - antenna_loss - terrain_loss - building_loss - veg_loss combined_rsrp = reflection_service.combine_paths( direct_rsrp, reflection_paths, site.power + site.gain ) reflection_gain = max(0, combined_rsrp - direct_rsrp) + elif settings.use_water_reflection and water_bodies and not settings.use_reflections: + # Water reflection without full reflection model + is_over_water = water_service.point_over_water(lat, lon, water_bodies) is not None + if is_over_water: + reflection_gain = 3.0 # ~3dB boost over water # Final RSRP - rsrp = site.power + site.gain - path_loss - antenna_loss - terrain_loss - building_loss + reflection_gain + rsrp = (site.power + site.gain - path_loss - antenna_loss + - terrain_loss - building_loss - veg_loss + reflection_gain) return CoveragePoint( lat=lat, @@ -357,30 +431,25 @@ class CoverageService: has_los=has_los, terrain_loss=terrain_loss, building_loss=building_loss, - reflection_gain=reflection_gain + reflection_gain=reflection_gain, + vegetation_loss=veg_loss ) def _okumura_hata( self, - distance: float, # meters - frequency: float, # MHz - tx_height: float, # meters - rx_height: float # meters + distance: float, + frequency: float, + tx_height: float, + rx_height: float ) -> float: - """ - Okumura-Hata path loss model (urban) - - Returns path loss in dB - """ + """Okumura-Hata path loss model (urban). Returns path loss in dB.""" d_km = distance / 1000 if d_km < 0.1: - d_km = 0.1 # Minimum distance + d_km = 0.1 - # Mobile antenna height correction (urban) a_hm = (1.1 * np.log10(frequency) - 0.7) * rx_height - (1.56 * np.log10(frequency) - 0.8) - # Path loss L = (69.55 + 26.16 * np.log10(frequency) - 13.82 * np.log10(tx_height) - a_hm + (44.9 - 6.55 * np.log10(tx_height)) * np.log10(d_km)) @@ -393,25 +462,19 @@ class CoverageService: azimuth: float, beamwidth: float ) -> float: """Calculate antenna pattern attenuation""" - # Calculate bearing from site to point bearing = self._calculate_bearing(site_lat, site_lon, point_lat, point_lon) - # Angle difference from main lobe angle_diff = abs(bearing - azimuth) if angle_diff > 180: angle_diff = 360 - angle_diff - # Simple cosine pattern approximation - # 3dB beamwidth = angle where power drops to half half_beamwidth = beamwidth / 2 if angle_diff <= half_beamwidth: - # Within main lobe - minimal loss loss = 3 * (angle_diff / half_beamwidth) ** 2 else: - # Outside main lobe - significant loss loss = 3 + 12 * ((angle_diff - half_beamwidth) / half_beamwidth) ** 2 - loss = min(loss, 25) # Cap at 25dB (back lobe) + loss = min(loss, 25) return loss @@ -433,23 +496,12 @@ class CoverageService: return (bearing + 360) % 360 def _diffraction_loss(self, clearance: float, frequency: float) -> float: - """ - Knife-edge diffraction loss - - Args: - clearance: Clearance in meters (negative = obstructed) - frequency: Frequency in MHz - - Returns: - Additional loss in dB - """ + """Knife-edge diffraction loss. Returns additional loss in dB.""" if clearance >= 0: - return 0.0 # No obstruction + return 0.0 - # Fresnel parameter approximation - v = abs(clearance) / 10 # Normalize + v = abs(clearance) / 10 - # Knife-edge loss approximation if v <= 0: loss = 0 elif v < 2.4: @@ -457,7 +509,7 @@ class CoverageService: else: loss = 13.0 + 20 * np.log10(v) - return min(loss, 40) # Cap at 40dB + return min(loss, 40) # Singleton diff --git a/backend/app/services/reflection_service.py b/backend/app/services/reflection_service.py index cbd4b3d..a26bec7 100644 --- a/backend/app/services/reflection_service.py +++ b/backend/app/services/reflection_service.py @@ -22,11 +22,21 @@ class ReflectionService: - Single bounce (most common) - Double bounce (around corners) - Ground reflection + - Water surface reflection """ MAX_BOUNCES = 2 GROUND_REFLECTION_COEFF = 0.3 # Depends on surface + # Ground types and reflection coefficients + GROUND_REFLECTION = { + "urban": 0.3, + "suburban": 0.4, + "rural": 0.5, + "water": 0.8, + "desert": 0.6, + } + async def find_reflection_paths( self, tx_lat: float, tx_lon: float, tx_height: float, @@ -124,9 +134,10 @@ class ReflectionService: self, tx_lat, tx_lon, tx_height, rx_lat, rx_lon, rx_height, - frequency_mhz + frequency_mhz, + is_water: bool = False ) -> Optional[ReflectionPath]: - """Calculate ground reflection path""" + """Calculate ground/water reflection path""" from app.services.terrain_service import TerrainService @@ -146,19 +157,19 @@ class ReflectionService: # Path loss path_loss = self._free_space_loss(total_dist, frequency_mhz) - # Ground reflection loss (~5-10 dB typically) - ground_reflection_loss = -10 * np.log10(self.GROUND_REFLECTION_COEFF) + # Reflection coefficient: water is much more reflective + coeff = self.GROUND_REFLECTION.get("water" if is_water else "rural", 0.4) + reflection_loss = -10 * np.log10(coeff) - # Phase difference can cause constructive or destructive interference - # Simplified: assume average case - total_loss = path_loss + ground_reflection_loss + total_loss = path_loss + reflection_loss + surface_type = "water" if is_water else "ground" return ReflectionPath( points=[(tx_lat, tx_lon), (mid_lat, mid_lon), (rx_lat, rx_lon)], total_distance=total_dist, total_loss=total_loss, reflection_count=1, - materials=["ground"] + materials=[surface_type] ) def _specular_reflection_point( diff --git a/backend/app/services/spatial_index.py b/backend/app/services/spatial_index.py new file mode 100644 index 0000000..52dc1f8 --- /dev/null +++ b/backend/app/services/spatial_index.py @@ -0,0 +1,140 @@ +""" +R-tree spatial index for fast building and geometry lookups. + +Uses a simple grid-based approach (no external dependency) for +O(1) amortised lookups instead of O(n) linear scans. +""" + +from typing import List, Tuple, Optional, Dict +from collections import defaultdict +from app.services.buildings_service import Building + + +class SpatialIndex: + """Grid-based spatial index for fast building lookups""" + + def __init__(self, cell_size: float = 0.001): + """ + Args: + cell_size: Grid cell size in degrees (~111m at equator) + """ + self.cell_size = cell_size + self._grid: Dict[Tuple[int, int], List[Building]] = defaultdict(list) + self._buildings: List[Building] = [] + + def _cell_key(self, lat: float, lon: float) -> Tuple[int, int]: + """Convert lat/lon to grid cell key""" + return (int(lat / self.cell_size), int(lon / self.cell_size)) + + def build(self, buildings: List[Building]): + """Build spatial index from buildings list""" + self._grid.clear() + self._buildings = buildings + + for building in buildings: + # Get bounding box of building + lons = [p[0] for p in building.geometry] + lats = [p[1] for p in building.geometry] + + min_lon, max_lon = min(lons), max(lons) + min_lat, max_lat = min(lats), max(lats) + + # Insert into all overlapping grid cells + min_cell_lat = int(min_lat / self.cell_size) + max_cell_lat = int(max_lat / self.cell_size) + min_cell_lon = int(min_lon / self.cell_size) + max_cell_lon = int(max_lon / self.cell_size) + + for clat in range(min_cell_lat, max_cell_lat + 1): + for clon in range(min_cell_lon, max_cell_lon + 1): + self._grid[(clat, clon)].append(building) + + def query_point(self, lat: float, lon: float, buffer_cells: int = 1) -> List[Building]: + """Find buildings near a point""" + if not self._grid: + return self._buildings # Fallback to linear scan + + center = self._cell_key(lat, lon) + results = set() + + for dlat in range(-buffer_cells, buffer_cells + 1): + for dlon in range(-buffer_cells, buffer_cells + 1): + key = (center[0] + dlat, center[1] + dlon) + for b in self._grid.get(key, []): + results.add(b.id) + + # Return buildings by id (deduped) + id_set = results + return [b for b in self._buildings if b.id in id_set] + + def query_line( + self, + lat1: float, lon1: float, + lat2: float, lon2: float, + buffer_cells: int = 1 + ) -> List[Building]: + """Find buildings along a line (for LoS checks)""" + if not self._grid: + return self._buildings + + # Get bounding box cells of the line + min_lat = min(lat1, lat2) + max_lat = max(lat1, lat2) + min_lon = min(lon1, lon2) + max_lon = max(lon1, lon2) + + min_clat = int(min_lat / self.cell_size) - buffer_cells + max_clat = int(max_lat / self.cell_size) + buffer_cells + min_clon = int(min_lon / self.cell_size) - buffer_cells + max_clon = int(max_lon / self.cell_size) + buffer_cells + + results = set() + for clat in range(min_clat, max_clat + 1): + for clon in range(min_clon, max_clon + 1): + for b in self._grid.get((clat, clon), []): + results.add(b.id) + + id_set = results + return [b for b in self._buildings if b.id in id_set] + + def query_bbox( + self, + min_lat: float, min_lon: float, + max_lat: float, max_lon: float + ) -> List[Building]: + """Find all buildings in bounding box""" + if not self._grid: + return self._buildings + + min_clat = int(min_lat / self.cell_size) + max_clat = int(max_lat / self.cell_size) + min_clon = int(min_lon / self.cell_size) + max_clon = int(max_lon / self.cell_size) + + results = set() + for clat in range(min_clat, max_clat + 1): + for clon in range(min_clon, max_clon + 1): + for b in self._grid.get((clat, clon), []): + results.add(b.id) + + id_set = results + return [b for b in self._buildings if b.id in id_set] + + +# Global cache of spatial indices +_spatial_indices: dict[str, SpatialIndex] = {} + + +def get_spatial_index(cache_key: str, buildings: List[Building]) -> SpatialIndex: + """Get or create spatial index for buildings""" + if cache_key not in _spatial_indices: + idx = SpatialIndex() + idx.build(buildings) + _spatial_indices[cache_key] = idx + + # Limit cache size + if len(_spatial_indices) > 20: + oldest = next(iter(_spatial_indices)) + del _spatial_indices[oldest] + + return _spatial_indices[cache_key] diff --git a/backend/app/services/vegetation_service.py b/backend/app/services/vegetation_service.py new file mode 100644 index 0000000..8b8e753 --- /dev/null +++ b/backend/app/services/vegetation_service.py @@ -0,0 +1,218 @@ +""" +OSM vegetation service for RF signal attenuation. + +Forests and dense vegetation attenuate RF signals significantly. +Uses ITU-R P.833 approximations for foliage loss. +""" + +import httpx +from typing import List, Tuple, Optional +from pydantic import BaseModel +import json +from pathlib import Path + + +class VegetationArea(BaseModel): + """Vegetation area from OSM""" + id: int + geometry: List[Tuple[float, float]] # [(lon, lat), ...] + vegetation_type: str # forest, wood, scrub, orchard + density: str # dense, sparse, mixed + + +class VegetationService: + """OSM vegetation for signal attenuation""" + + OVERPASS_URL = "https://overpass-api.de/api/interpreter" + + # Attenuation dB per 100 meters of vegetation + ATTENUATION_DB_PER_100M = { + "forest": 8.0, + "wood": 6.0, + "tree_row": 2.0, + "scrub": 3.0, + "orchard": 2.0, + "vineyard": 1.0, + "meadow": 0.5, + } + + # Seasonal factor (summer = full foliage) + SEASONAL_FACTOR = { + "summer": 1.0, + "winter": 0.3, + "spring": 0.6, + "autumn": 0.7, + } + + def __init__(self, cache_dir: str = "/opt/rfcp/backend/data/vegetation"): + self.cache_dir = Path(cache_dir) + self.cache_dir.mkdir(exist_ok=True, parents=True) + self._cache: dict[str, List[VegetationArea]] = {} + + async def fetch_vegetation( + self, + min_lat: float, min_lon: float, + max_lat: float, max_lon: float + ) -> List[VegetationArea]: + """Fetch vegetation areas in bounding box""" + + cache_key = f"{min_lat:.2f}_{min_lon:.2f}_{max_lat:.2f}_{max_lon:.2f}" + + if cache_key in self._cache: + return self._cache[cache_key] + + cache_file = self.cache_dir / f"{cache_key}.json" + if cache_file.exists(): + try: + with open(cache_file) as f: + data = json.load(f) + areas = [VegetationArea(**v) for v in data] + self._cache[cache_key] = areas + return areas + except Exception: + pass + + query = f""" + [out:json][timeout:30]; + ( + way["landuse"="forest"]({min_lat},{min_lon},{max_lat},{max_lon}); + way["natural"="wood"]({min_lat},{min_lon},{max_lat},{max_lon}); + way["landuse"="orchard"]({min_lat},{min_lon},{max_lat},{max_lon}); + way["natural"="scrub"]({min_lat},{min_lon},{max_lat},{max_lon}); + ); + out body; + >; + out skel qt; + """ + + try: + async with httpx.AsyncClient(timeout=60.0) as client: + response = await client.post(self.OVERPASS_URL, data={"data": query}) + response.raise_for_status() + data = response.json() + except Exception as e: + print(f"Vegetation fetch error: {e}") + return [] + + areas = self._parse_response(data) + + # Cache + if areas: + with open(cache_file, 'w') as f: + json.dump([v.model_dump() for v in areas], f) + self._cache[cache_key] = areas + + return areas + + def _parse_response(self, data: dict) -> List[VegetationArea]: + """Parse Overpass response""" + nodes = {} + for element in data.get("elements", []): + if element["type"] == "node": + nodes[element["id"]] = (element["lon"], element["lat"]) + + areas = [] + for element in data.get("elements", []): + if element["type"] != "way": + continue + + tags = element.get("tags", {}) + + veg_type = tags.get("landuse", tags.get("natural", "forest")) + + geometry = [] + for node_id in element.get("nodes", []): + if node_id in nodes: + geometry.append(nodes[node_id]) + + if len(geometry) < 3: + continue + + # Determine density from leaf_type tag + leaf_type = tags.get("leaf_type", "mixed") + density = "dense" if leaf_type == "needleleaved" else "mixed" + + areas.append(VegetationArea( + id=element["id"], + geometry=geometry, + vegetation_type=veg_type, + density=density + )) + + return areas + + def calculate_vegetation_loss( + self, + lat1: float, lon1: float, + lat2: float, lon2: float, + vegetation_areas: List[VegetationArea], + season: str = "summer" + ) -> float: + """ + Calculate signal loss through vegetation along path. + + Samples points along the TX→RX path and accumulates + attenuation for each segment inside vegetation. + + Returns loss in dB (capped at 40 dB). + """ + from app.services.terrain_service import TerrainService + + path_length = TerrainService.haversine_distance(lat1, lon1, lat2, lon2) + + if path_length < 1: + return 0.0 + + # Sample points along path — every ~50m + num_samples = max(10, int(path_length / 50)) + + segment_length = path_length / num_samples + total_loss = 0.0 + + for i in range(num_samples): + t = i / num_samples + lat = lat1 + t * (lat2 - lat1) + lon = lon1 + t * (lon2 - lon1) + + # Check if sample point is inside any vegetation area + veg = self._point_in_vegetation(lat, lon, vegetation_areas) + + if veg: + attenuation = self.ATTENUATION_DB_PER_100M.get(veg.vegetation_type, 4.0) + seasonal = self.SEASONAL_FACTOR.get(season, 1.0) + total_loss += (segment_length / 100) * attenuation * seasonal + + return min(total_loss, 40.0) # Cap at 40 dB + + def _point_in_vegetation( + self, + lat: float, lon: float, + areas: List[VegetationArea] + ) -> Optional[VegetationArea]: + """Check if point is in vegetation area""" + for area in areas: + if self._point_in_polygon(lat, lon, area.geometry): + return area + return None + + @staticmethod + def _point_in_polygon( + lat: float, lon: float, polygon: List[Tuple[float, float]] + ) -> bool: + """Ray casting algorithm — polygon coords are (lon, lat)""" + n = len(polygon) + inside = False + + j = n - 1 + for i in range(n): + xi, yi = polygon[i] # lon, lat + xj, yj = polygon[j] + + if ((yi > lat) != (yj > lat)) and (lon < (xj - xi) * (lat - yi) / (yj - yi) + xi): + inside = not inside + j = i + + return inside + + +vegetation_service = VegetationService() diff --git a/backend/app/services/water_service.py b/backend/app/services/water_service.py new file mode 100644 index 0000000..951bdf4 --- /dev/null +++ b/backend/app/services/water_service.py @@ -0,0 +1,163 @@ +""" +OSM water bodies service for RF reflection calculations. + +Water surfaces produce strong specular reflections that can boost +or create multipath interference for RF signals. +""" + +import httpx +from typing import List, Tuple, Optional +from pydantic import BaseModel +import json +from pathlib import Path + + +class WaterBody(BaseModel): + """Water body from OSM""" + id: int + geometry: List[Tuple[float, float]] # [(lon, lat), ...] + water_type: str # river, lake, pond, reservoir + name: Optional[str] = None + + +class WaterService: + """OSM water bodies for reflection calculations""" + + OVERPASS_URL = "https://overpass-api.de/api/interpreter" + + # Reflection coefficients by water type + REFLECTION_COEFF = { + "lake": 0.8, + "reservoir": 0.8, + "river": 0.7, + "pond": 0.75, + "water": 0.7, + } + + def __init__(self, cache_dir: str = "/opt/rfcp/backend/data/water"): + self.cache_dir = Path(cache_dir) + self.cache_dir.mkdir(exist_ok=True, parents=True) + self._cache: dict[str, List[WaterBody]] = {} + + async def fetch_water_bodies( + self, + min_lat: float, min_lon: float, + max_lat: float, max_lon: float + ) -> List[WaterBody]: + """Fetch water bodies in bounding box""" + + cache_key = f"{min_lat:.2f}_{min_lon:.2f}_{max_lat:.2f}_{max_lon:.2f}" + + if cache_key in self._cache: + return self._cache[cache_key] + + cache_file = self.cache_dir / f"{cache_key}.json" + if cache_file.exists(): + try: + with open(cache_file) as f: + data = json.load(f) + bodies = [WaterBody(**w) for w in data] + self._cache[cache_key] = bodies + return bodies + except Exception: + pass + + query = f""" + [out:json][timeout:30]; + ( + way["natural"="water"]({min_lat},{min_lon},{max_lat},{max_lon}); + relation["natural"="water"]({min_lat},{min_lon},{max_lat},{max_lon}); + way["waterway"]({min_lat},{min_lon},{max_lat},{max_lon}); + ); + out body; + >; + out skel qt; + """ + + try: + async with httpx.AsyncClient(timeout=60.0) as client: + response = await client.post(self.OVERPASS_URL, data={"data": query}) + response.raise_for_status() + data = response.json() + except Exception as e: + print(f"Water fetch error: {e}") + return [] + + bodies = self._parse_response(data) + + # Cache + if bodies: + with open(cache_file, 'w') as f: + json.dump([w.model_dump() for w in bodies], f) + self._cache[cache_key] = bodies + + return bodies + + def _parse_response(self, data: dict) -> List[WaterBody]: + """Parse Overpass response""" + nodes = {} + for element in data.get("elements", []): + if element["type"] == "node": + nodes[element["id"]] = (element["lon"], element["lat"]) + + bodies = [] + for element in data.get("elements", []): + if element["type"] != "way": + continue + + tags = element.get("tags", {}) + + # Determine water type + water_type = tags.get("water", tags.get("waterway", tags.get("natural", "water"))) + + geometry = [] + for node_id in element.get("nodes", []): + if node_id in nodes: + geometry.append(nodes[node_id]) + + if len(geometry) < 3: + continue + + bodies.append(WaterBody( + id=element["id"], + geometry=geometry, + water_type=water_type, + name=tags.get("name") + )) + + return bodies + + def get_reflection_coefficient(self, water_type: str) -> float: + """Get reflection coefficient for water type""" + return self.REFLECTION_COEFF.get(water_type, 0.7) + + def point_over_water( + self, lat: float, lon: float, water_bodies: List[WaterBody] + ) -> Optional[WaterBody]: + """Check if point is over water""" + for body in water_bodies: + if self._point_in_polygon(lat, lon, body.geometry): + return body + return None + + @staticmethod + def _point_in_polygon( + lat: float, lon: float, polygon: List[Tuple[float, float]] + ) -> bool: + """Ray casting algorithm — polygon coords are (lon, lat)""" + n = len(polygon) + inside = False + + j = n - 1 + for i in range(n): + xi, yi = polygon[i] # lon, lat + xj, yj = polygon[j] + + if ((yi > lat) != (yj > lat)) and (lon < (xj - xi) * (lat - yi) / (yj - yi) + xi): + inside = not inside + j = i + + return inside + + +water_service = WaterService() diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index a27ff46..b604c7a 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -707,6 +707,8 @@ export default function App() { use_dominant_path: preset.use_dominant_path, use_street_canyon: preset.use_street_canyon, use_reflections: preset.use_reflections, + use_water_reflection: preset.use_water_reflection, + use_vegetation: preset.use_vegetation, }); } }} @@ -754,6 +756,8 @@ export default function App() { { key: 'use_dominant_path' as const, label: 'Dominant Path', disabled: false }, { key: 'use_street_canyon' as const, label: 'Street Canyon', disabled: false }, { key: 'use_reflections' as const, label: 'Reflections', disabled: false }, + { key: 'use_water_reflection' as const, label: 'Water Reflection', disabled: false }, + { key: 'use_vegetation' as const, label: 'Vegetation Loss', disabled: false }, ].map(({ key, label, disabled }) => (