""" OSM vegetation service for RF signal attenuation. Forests and dense vegetation attenuate RF signals significantly. Uses ITU-R P.833 approximations for foliage loss. """ import httpx from typing import List, Tuple, Optional from pydantic import BaseModel import json from pathlib import Path class VegetationArea(BaseModel): """Vegetation area from OSM""" id: int geometry: List[Tuple[float, float]] # [(lon, lat), ...] vegetation_type: str # forest, wood, scrub, orchard density: str # dense, sparse, mixed class VegetationService: """OSM vegetation for signal attenuation""" OVERPASS_URL = "https://overpass-api.de/api/interpreter" # Attenuation dB per 100 meters of vegetation ATTENUATION_DB_PER_100M = { "forest": 8.0, "wood": 6.0, "tree_row": 2.0, "scrub": 3.0, "orchard": 2.0, "vineyard": 1.0, "meadow": 0.5, } # Seasonal factor (summer = full foliage) SEASONAL_FACTOR = { "summer": 1.0, "winter": 0.3, "spring": 0.6, "autumn": 0.7, } def __init__(self, cache_dir: str = "/opt/rfcp/backend/data/vegetation"): self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(exist_ok=True, parents=True) self._cache: dict[str, List[VegetationArea]] = {} async def fetch_vegetation( self, min_lat: float, min_lon: float, max_lat: float, max_lon: float ) -> List[VegetationArea]: """Fetch vegetation areas in bounding box""" cache_key = f"{min_lat:.2f}_{min_lon:.2f}_{max_lat:.2f}_{max_lon:.2f}" if cache_key in self._cache: return self._cache[cache_key] cache_file = self.cache_dir / f"{cache_key}.json" if cache_file.exists(): try: with open(cache_file) as f: data = json.load(f) areas = [VegetationArea(**v) for v in data] self._cache[cache_key] = areas return areas except Exception: pass query = f""" [out:json][timeout:30]; ( way["landuse"="forest"]({min_lat},{min_lon},{max_lat},{max_lon}); way["natural"="wood"]({min_lat},{min_lon},{max_lat},{max_lon}); way["landuse"="orchard"]({min_lat},{min_lon},{max_lat},{max_lon}); way["natural"="scrub"]({min_lat},{min_lon},{max_lat},{max_lon}); ); out body; >; out skel qt; """ try: async with httpx.AsyncClient(timeout=60.0) as client: response = await client.post(self.OVERPASS_URL, data={"data": query}) response.raise_for_status() data = response.json() except Exception as e: print(f"Vegetation fetch error: {e}") return [] areas = self._parse_response(data) # Cache if areas: with open(cache_file, 'w') as f: json.dump([v.model_dump() for v in areas], f) self._cache[cache_key] = areas return areas def _parse_response(self, data: dict) -> List[VegetationArea]: """Parse Overpass response""" nodes = {} for element in data.get("elements", []): if element["type"] == "node": nodes[element["id"]] = (element["lon"], element["lat"]) areas = [] for element in data.get("elements", []): if element["type"] != "way": continue tags = element.get("tags", {}) veg_type = tags.get("landuse", tags.get("natural", "forest")) geometry = [] for node_id in element.get("nodes", []): if node_id in nodes: geometry.append(nodes[node_id]) if len(geometry) < 3: continue # Determine density from leaf_type tag leaf_type = tags.get("leaf_type", "mixed") density = "dense" if leaf_type == "needleleaved" else "mixed" areas.append(VegetationArea( id=element["id"], geometry=geometry, vegetation_type=veg_type, density=density )) return areas def calculate_vegetation_loss( self, lat1: float, lon1: float, lat2: float, lon2: float, vegetation_areas: List[VegetationArea], season: str = "summer" ) -> float: """ Calculate signal loss through vegetation along path. Samples points along the TX→RX path and accumulates attenuation for each segment inside vegetation. Returns loss in dB (capped at 40 dB). """ from app.services.terrain_service import TerrainService path_length = TerrainService.haversine_distance(lat1, lon1, lat2, lon2) if path_length < 1: return 0.0 # Sample points along path — every ~50m num_samples = max(10, int(path_length / 50)) segment_length = path_length / num_samples total_loss = 0.0 for i in range(num_samples): t = i / num_samples lat = lat1 + t * (lat2 - lat1) lon = lon1 + t * (lon2 - lon1) # Check if sample point is inside any vegetation area veg = self._point_in_vegetation(lat, lon, vegetation_areas) if veg: attenuation = self.ATTENUATION_DB_PER_100M.get(veg.vegetation_type, 4.0) seasonal = self.SEASONAL_FACTOR.get(season, 1.0) total_loss += (segment_length / 100) * attenuation * seasonal return min(total_loss, 40.0) # Cap at 40 dB def _point_in_vegetation( self, lat: float, lon: float, areas: List[VegetationArea] ) -> Optional[VegetationArea]: """Check if point is in vegetation area""" for area in areas: if self._point_in_polygon(lat, lon, area.geometry): return area return None @staticmethod def _point_in_polygon( lat: float, lon: float, polygon: List[Tuple[float, float]] ) -> bool: """Ray casting algorithm — polygon coords are (lon, lat)""" n = len(polygon) inside = False j = n - 1 for i in range(n): xi, yi = polygon[i] # lon, lat xj, yj = polygon[j] if ((yi > lat) != (yj > lat)) and (lon < (xj - xi) * (lat - yi) / (yj - yi) + xi): inside = not inside j = i return inside vegetation_service = VegetationService()