""" OSM water bodies service for RF reflection calculations. Water surfaces produce strong specular reflections that can boost or create multipath interference for RF signals. """ import os import asyncio import httpx import json from typing import List, Tuple, Optional from pydantic import BaseModel from pathlib import Path from datetime import datetime, timedelta class WaterBody(BaseModel): """Water body from OSM""" id: int geometry: List[Tuple[float, float]] # [(lon, lat), ...] water_type: str # river, lake, pond, reservoir name: Optional[str] = None class WaterCache: """Local cache for water body data with expiry""" CACHE_EXPIRY_DAYS = 30 def __init__(self): self.data_path = Path(os.environ.get('RFCP_DATA_PATH', './data')) self.cache_path = self.data_path / 'osm' / 'water' self.cache_path.mkdir(parents=True, exist_ok=True) def _get_cache_key(self, min_lat: float, min_lon: float, max_lat: float, max_lon: float) -> str: return f"{min_lat:.2f}_{min_lon:.2f}_{max_lat:.2f}_{max_lon:.2f}" def _get_cache_file(self, cache_key: str) -> Path: return self.cache_path / f"{cache_key}.json" def get(self, min_lat: float, min_lon: float, max_lat: float, max_lon: float) -> Optional[list]: cache_key = self._get_cache_key(min_lat, min_lon, max_lat, max_lon) cache_file = self._get_cache_file(cache_key) if not cache_file.exists(): return None try: data = json.loads(cache_file.read_text()) cached_at = datetime.fromisoformat(data.get('_cached_at', '2000-01-01')) if datetime.now() - cached_at > timedelta(days=self.CACHE_EXPIRY_DAYS): return None return data.get('data') except Exception as e: print(f"[WaterCache] Failed to read cache: {e}") return None def set(self, min_lat: float, min_lon: float, max_lat: float, max_lon: float, data): cache_key = self._get_cache_key(min_lat, min_lon, max_lat, max_lon) cache_file = self._get_cache_file(cache_key) try: cache_data = { '_cached_at': datetime.now().isoformat(), '_bbox': [min_lat, min_lon, max_lat, max_lon], 'data': data } cache_file.write_text(json.dumps(cache_data)) except Exception as e: print(f"[WaterCache] Failed to write cache: {e}") def clear(self): for f in self.cache_path.glob("*.json"): f.unlink() def get_size_mb(self) -> float: total = sum(f.stat().st_size for f in self.cache_path.glob("*.json")) return total / (1024 * 1024) class WaterService: """OSM water bodies for reflection calculations""" OVERPASS_URLS = [ "https://overpass-api.de/api/interpreter", "https://overpass.kumi.systems/api/interpreter", ] # Reflection coefficients by water type REFLECTION_COEFF = { "lake": 0.8, "reservoir": 0.8, "river": 0.7, "pond": 0.75, "water": 0.7, } def __init__(self): self.cache = WaterCache() self._memory_cache: dict[str, List[WaterBody]] = {} async def fetch_water_bodies( self, min_lat: float, min_lon: float, max_lat: float, max_lon: float ) -> List[WaterBody]: """Fetch water bodies in bounding box, using cache if available""" cache_key = f"{min_lat:.2f}_{min_lon:.2f}_{max_lat:.2f}_{max_lon:.2f}" # Memory cache if cache_key in self._memory_cache: return self._memory_cache[cache_key] # Disk cache with expiry cached = self.cache.get(min_lat, min_lon, max_lat, max_lon) if cached is not None: print(f"[Water] Cache hit for bbox") bodies = [WaterBody(**w) for w in cached] self._memory_cache[cache_key] = bodies return bodies # Fetch from Overpass print(f"[Water] Fetching from Overpass API...") query = f""" [out:json][timeout:30]; ( way["natural"="water"]({min_lat},{min_lon},{max_lat},{max_lon}); relation["natural"="water"]({min_lat},{min_lon},{max_lat},{max_lon}); way["waterway"]({min_lat},{min_lon},{max_lat},{max_lon}); ); out body; >; out skel qt; """ data = None max_retries = 3 for attempt in range(max_retries): url = self.OVERPASS_URLS[attempt % len(self.OVERPASS_URLS)] try: timeout = 60.0 * (attempt + 1) async with httpx.AsyncClient(timeout=timeout) as client: response = await client.post(url, data={"data": query}) response.raise_for_status() data = response.json() break except Exception as e: print(f"[Water] Overpass attempt {attempt + 1}/{max_retries} failed ({url}): {e}") if attempt < max_retries - 1: await asyncio.sleep(2 ** attempt) else: print(f"[Water] All {max_retries} attempts failed") return [] bodies = self._parse_response(data) # Save to disk cache if bodies: self.cache.set(min_lat, min_lon, max_lat, max_lon, [w.model_dump() for w in bodies]) self._memory_cache[cache_key] = bodies return bodies def _parse_response(self, data: dict) -> List[WaterBody]: """Parse Overpass response""" nodes = {} for element in data.get("elements", []): if element["type"] == "node": nodes[element["id"]] = (element["lon"], element["lat"]) bodies = [] for element in data.get("elements", []): if element["type"] != "way": continue tags = element.get("tags", {}) water_type = tags.get("water", tags.get("waterway", tags.get("natural", "water"))) geometry = [] for node_id in element.get("nodes", []): if node_id in nodes: geometry.append(nodes[node_id]) if len(geometry) < 3: continue bodies.append(WaterBody( id=element["id"], geometry=geometry, water_type=water_type, name=tags.get("name") )) return bodies def get_reflection_coefficient(self, water_type: str) -> float: """Get reflection coefficient for water type""" return self.REFLECTION_COEFF.get(water_type, 0.7) def point_over_water( self, lat: float, lon: float, water_bodies: List[WaterBody] ) -> Optional[WaterBody]: """Check if point is over water""" for body in water_bodies: if self._point_in_polygon(lat, lon, body.geometry): return body return None @staticmethod def _point_in_polygon( lat: float, lon: float, polygon: List[Tuple[float, float]] ) -> bool: """Ray casting algorithm -- polygon coords are (lon, lat)""" n = len(polygon) inside = False j = n - 1 for i in range(n): xi, yi = polygon[i] # lon, lat xj, yj = polygon[j] if ((yi > lat) != (yj > lat)) and (lon < (xj - xi) * (lat - yi) / (yj - yi) + xi): inside = not inside j = i return inside water_service = WaterService()