import os import re import asyncio import httpx import json from typing import List, Optional from pydantic import BaseModel from pathlib import Path from datetime import datetime, timedelta class Building(BaseModel): """Single building footprint""" id: int geometry: List[List[float]] # [[lon, lat], ...] height: float # meters levels: Optional[int] = None building_type: Optional[str] = None material: Optional[str] = None # Detected material type tags: dict = {} # Store all OSM tags for material detection class OSMCache: """Local cache for OSM data with expiry""" CACHE_EXPIRY_DAYS = 30 def __init__(self, cache_type: str): self.data_path = Path(os.environ.get('RFCP_DATA_PATH', './data')) self.cache_path = self.data_path / 'osm' / cache_type self.cache_path.mkdir(parents=True, exist_ok=True) def _get_cache_key(self, min_lat: float, min_lon: float, max_lat: float, max_lon: float) -> str: """Generate cache key from bbox (rounded to 0.01 degree grid)""" return f"{min_lat:.2f}_{min_lon:.2f}_{max_lat:.2f}_{max_lon:.2f}" def _get_cache_file(self, cache_key: str) -> Path: return self.cache_path / f"{cache_key}.json" def get(self, min_lat: float, min_lon: float, max_lat: float, max_lon: float) -> Optional[dict]: """Get cached data if available and not expired""" cache_key = self._get_cache_key(min_lat, min_lon, max_lat, max_lon) cache_file = self._get_cache_file(cache_key) if not cache_file.exists(): return None try: data = json.loads(cache_file.read_text()) # Check expiry cached_at = datetime.fromisoformat(data.get('_cached_at', '2000-01-01')) if datetime.now() - cached_at > timedelta(days=self.CACHE_EXPIRY_DAYS): return None return data.get('data') except Exception as e: print(f"[OSMCache] Failed to read cache: {e}") return None def set(self, min_lat: float, min_lon: float, max_lat: float, max_lon: float, data): """Save data to cache""" cache_key = self._get_cache_key(min_lat, min_lon, max_lat, max_lon) cache_file = self._get_cache_file(cache_key) try: cache_data = { '_cached_at': datetime.now().isoformat(), '_bbox': [min_lat, min_lon, max_lat, max_lon], 'data': data } cache_file.write_text(json.dumps(cache_data)) except Exception as e: print(f"[OSMCache] Failed to write cache: {e}") def clear(self): """Clear all cached data""" for f in self.cache_path.glob("*.json"): f.unlink() def get_size_mb(self) -> float: """Get cache size in MB""" total = sum(f.stat().st_size for f in self.cache_path.glob("*.json")) return total / (1024 * 1024) class BuildingsService: """ OpenStreetMap buildings via Overpass API with local caching. """ OVERPASS_URLS = [ "https://overpass-api.de/api/interpreter", "https://overpass.kumi.systems/api/interpreter", ] DEFAULT_LEVEL_HEIGHT = 3.0 # meters per floor DEFAULT_BUILDING_HEIGHT = 9.0 # 3 floors if unknown def __init__(self): self.cache = OSMCache('buildings') self._memory_cache: dict[str, List[Building]] = {} self._max_cache_size = 50 @staticmethod def _safe_int(value) -> Optional[int]: """Safely parse int from OSM tag (handles '1a', '2-3', '5+', etc.)""" if not value: return None try: return int(value) except (ValueError, TypeError): match = re.search(r'\d+', str(value)) if match: return int(match.group()) return None @staticmethod def _safe_float(value) -> Optional[float]: """Safely parse float from OSM tag (handles '10 m', '~12', '10m')""" if not value: return None try: cleaned = str(value).lower().replace('m', '').replace('~', '').strip() return float(cleaned) except (ValueError, TypeError): match = re.search(r'[\d.]+', str(value)) if match: return float(match.group()) return None def _bbox_key(self, min_lat: float, min_lon: float, max_lat: float, max_lon: float) -> str: """Generate memory cache key for bbox""" return f"{min_lat:.2f}_{min_lon:.2f}_{max_lat:.2f}_{max_lon:.2f}" async def fetch_buildings( self, min_lat: float, min_lon: float, max_lat: float, max_lon: float, use_cache: bool = True ) -> List[Building]: """Fetch buildings in bounding box from OSM, using cache if available""" bbox_key = self._bbox_key(min_lat, min_lon, max_lat, max_lon) # Check memory cache if use_cache and bbox_key in self._memory_cache: return self._memory_cache[bbox_key] # Check disk cache (OSMCache with expiry) if use_cache: cached = self.cache.get(min_lat, min_lon, max_lat, max_lon) if cached is not None: print(f"[Buildings] Cache hit for bbox") buildings = [Building(**b) for b in cached] self._memory_cache[bbox_key] = buildings return buildings # Fetch from Overpass API with retry print(f"[Buildings] Fetching from Overpass API...") query = f""" [out:json][timeout:30]; ( way["building"]({min_lat},{min_lon},{max_lat},{max_lon}); relation["building"]({min_lat},{min_lon},{max_lat},{max_lon}); ); out body; >; out skel qt; """ data = None max_retries = 3 for attempt in range(max_retries): url = self.OVERPASS_URLS[attempt % len(self.OVERPASS_URLS)] try: timeout = 60.0 * (attempt + 1) # 60s, 120s, 180s async with httpx.AsyncClient(timeout=timeout) as client: response = await client.post(url, data={"data": query}) response.raise_for_status() data = response.json() break except Exception as e: print(f"[Buildings] Overpass attempt {attempt + 1}/{max_retries} failed ({url}): {e}") if attempt < max_retries - 1: wait_time = 2 ** attempt # 1s, 2s print(f"[Buildings] Retrying in {wait_time}s...") await asyncio.sleep(wait_time) else: print(f"[Buildings] All {max_retries} attempts failed") return [] buildings = self._parse_overpass_response(data) # Save to disk cache if buildings: self.cache.set(min_lat, min_lon, max_lat, max_lon, [b.model_dump() for b in buildings]) # Memory cache with size limit if len(self._memory_cache) >= self._max_cache_size: oldest = next(iter(self._memory_cache)) del self._memory_cache[oldest] self._memory_cache[bbox_key] = buildings return buildings def _parse_overpass_response(self, data: dict) -> List[Building]: """Parse Overpass JSON response into Building objects""" buildings = [] # Build node lookup nodes = {} for element in data.get("elements", []): if element["type"] == "node": nodes[element["id"]] = (element["lon"], element["lat"]) # Process ways (building footprints) for element in data.get("elements", []): if element["type"] != "way": continue tags = element.get("tags", {}) if "building" not in tags: continue geometry = [] for node_id in element.get("nodes", []): if node_id in nodes: geometry.append(list(nodes[node_id])) if len(geometry) < 3: continue height = self._estimate_height(tags) material_str = None if "building:material" in tags: material_str = tags["building:material"] elif "building:facade:material" in tags: material_str = tags["building:facade:material"] buildings.append(Building( id=element["id"], geometry=geometry, height=height, levels=self._safe_int(tags.get("building:levels")), building_type=tags.get("building"), material=material_str, tags=tags )) return buildings def _estimate_height(self, tags: dict) -> float: """Estimate building height from OSM tags""" if "height" in tags: h = self._safe_float(tags["height"]) if h is not None and h > 0: return h if "building:levels" in tags: levels = self._safe_int(tags["building:levels"]) if levels is not None and levels > 0: return levels * self.DEFAULT_LEVEL_HEIGHT building_type = tags.get("building", "yes") type_heights = { "house": 6.0, "residential": 12.0, "apartments": 18.0, "commercial": 12.0, "industrial": 8.0, "warehouse": 6.0, "garage": 3.0, "shed": 2.5, "roof": 3.0, "church": 15.0, "cathedral": 30.0, "hospital": 15.0, "school": 12.0, "university": 15.0, "office": 20.0, "retail": 6.0, } return type_heights.get(building_type, self.DEFAULT_BUILDING_HEIGHT) def point_in_building(self, lat: float, lon: float, building: Building) -> bool: """Check if point is inside building footprint (ray casting)""" x, y = lon, lat polygon = building.geometry n = len(polygon) inside = False j = n - 1 for i in range(n): xi, yi = polygon[i] xj, yj = polygon[j] if ((yi > y) != (yj > y)) and (x < (xj - xi) * (y - yi) / (yj - yi) + xi): inside = not inside j = i return inside def line_intersects_building( self, lat1: float, lon1: float, height1: float, lat2: float, lon2: float, height2: float, building: Building ) -> Optional[float]: """Check if line segment intersects building. Returns distance along path where intersection occurs, or None.""" from app.services.terrain_service import TerrainService num_samples = 20 for i in range(num_samples): t = i / num_samples lat = lat1 + t * (lat2 - lat1) lon = lon1 + t * (lon2 - lon1) height = height1 + t * (height2 - height1) if self.point_in_building(lat, lon, building): if height < building.height: dist = t * TerrainService.haversine_distance(lat1, lon1, lat2, lon2) return dist return None # Singleton instance buildings_service = BuildingsService()