""" Dedicated OpenStreetMap Overpass API client. Handles: - Building footprint queries - Vegetation area queries - Water body queries - Response parsing and error handling - Rate limiting (Overpass requires courtesy) """ import time import asyncio from typing import List, Optional, Dict, Any import httpx # Overpass API endpoints (primary + mirror) OVERPASS_ENDPOINTS = [ "https://overpass-api.de/api/interpreter", "https://overpass.kumi.systems/api/interpreter", ] # Minimum seconds between requests to same endpoint RATE_LIMIT_SECONDS = 1.0 class OSMClient: """ OpenStreetMap Overpass API client with rate limiting and automatic failover between endpoints. """ def __init__(self, timeout: float = 60.0): self.timeout = timeout self._last_request_time: float = 0 self._current_endpoint = 0 async def _rate_limit(self): """Enforce rate limiting between requests.""" elapsed = time.monotonic() - self._last_request_time if elapsed < RATE_LIMIT_SECONDS: await asyncio.sleep(RATE_LIMIT_SECONDS - elapsed) self._last_request_time = time.monotonic() async def query(self, overpass_ql: str) -> Optional[Dict[str, Any]]: """ Execute an Overpass QL query with automatic failover. Returns parsed JSON response or None on failure. """ await self._rate_limit() for i in range(len(OVERPASS_ENDPOINTS)): idx = (self._current_endpoint + i) % len(OVERPASS_ENDPOINTS) endpoint = OVERPASS_ENDPOINTS[idx] try: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.post( endpoint, data={"data": overpass_ql}, ) if response.status_code == 429: # Rate limited — try next endpoint print(f"[OSM] Rate limited by {endpoint}, trying next...") continue response.raise_for_status() self._current_endpoint = idx return response.json() except httpx.TimeoutException: print(f"[OSM] Timeout from {endpoint}") continue except httpx.HTTPStatusError as e: print(f"[OSM] HTTP error from {endpoint}: {e.response.status_code}") continue except Exception as e: print(f"[OSM] Error from {endpoint}: {e}") continue print("[OSM] All endpoints failed") return None async def fetch_buildings( self, min_lat: float, min_lon: float, max_lat: float, max_lon: float, ) -> List[Dict[str, Any]]: """ Fetch building footprints in a bounding box. Returns list of raw OSM elements (ways and relations). """ query = f""" [out:json][timeout:30]; ( way["building"]({min_lat},{min_lon},{max_lat},{max_lon}); relation["building"]({min_lat},{min_lon},{max_lat},{max_lon}); ); out body; >; out skel qt; """ data = await self.query(query) if data is None: return [] return data.get("elements", []) async def fetch_vegetation( self, min_lat: float, min_lon: float, max_lat: float, max_lon: float, ) -> List[Dict[str, Any]]: """Fetch vegetation areas (forests, parks, etc.).""" query = f""" [out:json][timeout:30]; ( way["natural"="wood"]({min_lat},{min_lon},{max_lat},{max_lon}); way["landuse"="forest"]({min_lat},{min_lon},{max_lat},{max_lon}); way["natural"="tree_row"]({min_lat},{min_lon},{max_lat},{max_lon}); relation["natural"="wood"]({min_lat},{min_lon},{max_lat},{max_lon}); relation["landuse"="forest"]({min_lat},{min_lon},{max_lat},{max_lon}); ); out body; >; out skel qt; """ data = await self.query(query) if data is None: return [] return data.get("elements", []) async def fetch_water( self, min_lat: float, min_lon: float, max_lat: float, max_lon: float, ) -> List[Dict[str, Any]]: """Fetch water bodies (rivers, lakes, etc.).""" query = f""" [out:json][timeout:30]; ( way["natural"="water"]({min_lat},{min_lon},{max_lat},{max_lon}); way["waterway"]({min_lat},{min_lon},{max_lat},{max_lon}); relation["natural"="water"]({min_lat},{min_lon},{max_lat},{max_lon}); ); out body; >; out skel qt; """ data = await self.query(query) if data is None: return [] return data.get("elements", []) # Singleton osm_client = OSMClient()