Files
rfcp/backend/app/services/buildings_service.py
mytec defa3ad440 @mytec: feat: Phase 3.0 Architecture Refactor
Major refactoring of RFCP backend:
- Modular propagation models (8 models)
- SharedMemoryManager for terrain data
- ProcessPoolExecutor parallel processing
- WebSocket progress streaming
- Building filtering pipeline (351k → 15k)
- 82 unit tests

Performance: Standard preset 38s → 5s (7.6x speedup)

Known issue: Detailed preset timeout (fix in 3.1.0)
2026-02-01 23:12:26 +02:00

335 lines
11 KiB
Python

import os
import re
import asyncio
import httpx
import json
from typing import List, Optional
from pydantic import BaseModel
from pathlib import Path
from datetime import datetime, timedelta
class Building(BaseModel):
"""Single building footprint"""
id: int
geometry: List[List[float]] # [[lon, lat], ...]
height: float # meters
levels: Optional[int] = None
building_type: Optional[str] = None
material: Optional[str] = None # Detected material type
tags: dict = {} # Store all OSM tags for material detection
class OSMCache:
"""Local cache for OSM data with expiry"""
CACHE_EXPIRY_DAYS = 30
def __init__(self, cache_type: str):
self.data_path = Path(os.environ.get('RFCP_DATA_PATH', './data'))
self.cache_path = self.data_path / 'osm' / cache_type
self.cache_path.mkdir(parents=True, exist_ok=True)
def _get_cache_key(self, min_lat: float, min_lon: float, max_lat: float, max_lon: float) -> str:
"""Generate cache key from bbox (rounded to 0.01 degree grid)"""
return f"{min_lat:.2f}_{min_lon:.2f}_{max_lat:.2f}_{max_lon:.2f}"
def _get_cache_file(self, cache_key: str) -> Path:
return self.cache_path / f"{cache_key}.json"
def get(self, min_lat: float, min_lon: float, max_lat: float, max_lon: float) -> Optional[dict]:
"""Get cached data if available and not expired"""
cache_key = self._get_cache_key(min_lat, min_lon, max_lat, max_lon)
cache_file = self._get_cache_file(cache_key)
if not cache_file.exists():
return None
try:
data = json.loads(cache_file.read_text())
# Check expiry
cached_at = datetime.fromisoformat(data.get('_cached_at', '2000-01-01'))
if datetime.now() - cached_at > timedelta(days=self.CACHE_EXPIRY_DAYS):
return None
return data.get('data')
except Exception as e:
print(f"[OSMCache] Failed to read cache: {e}")
return None
def set(self, min_lat: float, min_lon: float, max_lat: float, max_lon: float, data):
"""Save data to cache"""
cache_key = self._get_cache_key(min_lat, min_lon, max_lat, max_lon)
cache_file = self._get_cache_file(cache_key)
try:
cache_data = {
'_cached_at': datetime.now().isoformat(),
'_bbox': [min_lat, min_lon, max_lat, max_lon],
'data': data
}
cache_file.write_text(json.dumps(cache_data))
except Exception as e:
print(f"[OSMCache] Failed to write cache: {e}")
def clear(self):
"""Clear all cached data"""
for f in self.cache_path.glob("*.json"):
f.unlink()
def get_size_mb(self) -> float:
"""Get cache size in MB"""
total = sum(f.stat().st_size for f in self.cache_path.glob("*.json"))
return total / (1024 * 1024)
class BuildingsService:
"""
OpenStreetMap buildings via Overpass API with local caching.
"""
OVERPASS_URLS = [
"https://overpass-api.de/api/interpreter",
"https://overpass.kumi.systems/api/interpreter",
]
DEFAULT_LEVEL_HEIGHT = 3.0 # meters per floor
DEFAULT_BUILDING_HEIGHT = 9.0 # 3 floors if unknown
def __init__(self):
self.cache = OSMCache('buildings')
self._memory_cache: dict[str, List[Building]] = {}
self._max_cache_size = 50
@staticmethod
def _safe_int(value) -> Optional[int]:
"""Safely parse int from OSM tag (handles '1a', '2-3', '5+', etc.)"""
if not value:
return None
try:
return int(value)
except (ValueError, TypeError):
match = re.search(r'\d+', str(value))
if match:
return int(match.group())
return None
@staticmethod
def _safe_float(value) -> Optional[float]:
"""Safely parse float from OSM tag (handles '10 m', '~12', '10m')"""
if not value:
return None
try:
cleaned = str(value).lower().replace('m', '').replace('~', '').strip()
return float(cleaned)
except (ValueError, TypeError):
match = re.search(r'[\d.]+', str(value))
if match:
return float(match.group())
return None
def _bbox_key(self, min_lat: float, min_lon: float, max_lat: float, max_lon: float) -> str:
"""Generate memory cache key for bbox"""
return f"{min_lat:.2f}_{min_lon:.2f}_{max_lat:.2f}_{max_lon:.2f}"
async def fetch_buildings(
self,
min_lat: float, min_lon: float,
max_lat: float, max_lon: float,
use_cache: bool = True
) -> List[Building]:
"""Fetch buildings in bounding box from OSM, using cache if available"""
bbox_key = self._bbox_key(min_lat, min_lon, max_lat, max_lon)
# Check memory cache
if use_cache and bbox_key in self._memory_cache:
return self._memory_cache[bbox_key]
# Check disk cache (OSMCache with expiry)
if use_cache:
cached = self.cache.get(min_lat, min_lon, max_lat, max_lon)
if cached is not None:
print(f"[Buildings] Cache hit for bbox")
buildings = [Building(**b) for b in cached]
self._memory_cache[bbox_key] = buildings
return buildings
# Fetch from Overpass API with retry
print(f"[Buildings] Fetching from Overpass API...")
query = f"""
[out:json][timeout:30];
(
way["building"]({min_lat},{min_lon},{max_lat},{max_lon});
relation["building"]({min_lat},{min_lon},{max_lat},{max_lon});
);
out body;
>;
out skel qt;
"""
data = None
max_retries = 3
for attempt in range(max_retries):
url = self.OVERPASS_URLS[attempt % len(self.OVERPASS_URLS)]
try:
timeout = 60.0 * (attempt + 1) # 60s, 120s, 180s
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(url, data={"data": query})
response.raise_for_status()
data = response.json()
break
except Exception as e:
print(f"[Buildings] Overpass attempt {attempt + 1}/{max_retries} failed ({url}): {e}")
if attempt < max_retries - 1:
wait_time = 2 ** attempt # 1s, 2s
print(f"[Buildings] Retrying in {wait_time}s...")
await asyncio.sleep(wait_time)
else:
print(f"[Buildings] All {max_retries} attempts failed")
return []
buildings = self._parse_overpass_response(data)
# Save to disk cache
if buildings:
self.cache.set(min_lat, min_lon, max_lat, max_lon,
[b.model_dump() for b in buildings])
# Memory cache with size limit
if len(self._memory_cache) >= self._max_cache_size:
oldest = next(iter(self._memory_cache))
del self._memory_cache[oldest]
self._memory_cache[bbox_key] = buildings
return buildings
def _parse_overpass_response(self, data: dict) -> List[Building]:
"""Parse Overpass JSON response into Building objects"""
buildings = []
# Build node lookup
nodes = {}
for element in data.get("elements", []):
if element["type"] == "node":
nodes[element["id"]] = (element["lon"], element["lat"])
# Process ways (building footprints)
for element in data.get("elements", []):
if element["type"] != "way":
continue
tags = element.get("tags", {})
if "building" not in tags:
continue
geometry = []
for node_id in element.get("nodes", []):
if node_id in nodes:
geometry.append(list(nodes[node_id]))
if len(geometry) < 3:
continue
height = self._estimate_height(tags)
material_str = None
if "building:material" in tags:
material_str = tags["building:material"]
elif "building:facade:material" in tags:
material_str = tags["building:facade:material"]
buildings.append(Building(
id=element["id"],
geometry=geometry,
height=height,
levels=self._safe_int(tags.get("building:levels")),
building_type=tags.get("building"),
material=material_str,
tags=tags
))
return buildings
def _estimate_height(self, tags: dict) -> float:
"""Estimate building height from OSM tags"""
if "height" in tags:
h = self._safe_float(tags["height"])
if h is not None and h > 0:
return h
if "building:levels" in tags:
levels = self._safe_int(tags["building:levels"])
if levels is not None and levels > 0:
return levels * self.DEFAULT_LEVEL_HEIGHT
building_type = tags.get("building", "yes")
type_heights = {
"house": 6.0,
"residential": 12.0,
"apartments": 18.0,
"commercial": 12.0,
"industrial": 8.0,
"warehouse": 6.0,
"garage": 3.0,
"shed": 2.5,
"roof": 3.0,
"church": 15.0,
"cathedral": 30.0,
"hospital": 15.0,
"school": 12.0,
"university": 15.0,
"office": 20.0,
"retail": 6.0,
}
return type_heights.get(building_type, self.DEFAULT_BUILDING_HEIGHT)
def point_in_building(self, lat: float, lon: float, building: Building) -> bool:
"""Check if point is inside building footprint (ray casting)"""
x, y = lon, lat
polygon = building.geometry
n = len(polygon)
inside = False
j = n - 1
for i in range(n):
xi, yi = polygon[i]
xj, yj = polygon[j]
if ((yi > y) != (yj > y)) and (x < (xj - xi) * (y - yi) / (yj - yi) + xi):
inside = not inside
j = i
return inside
def line_intersects_building(
self,
lat1: float, lon1: float, height1: float,
lat2: float, lon2: float, height2: float,
building: Building
) -> Optional[float]:
"""Check if line segment intersects building.
Returns distance along path where intersection occurs, or None."""
from app.services.terrain_service import TerrainService
num_samples = 20
for i in range(num_samples):
t = i / num_samples
lat = lat1 + t * (lat2 - lat1)
lon = lon1 + t * (lon2 - lon1)
height = height1 + t * (height2 - height1)
if self.point_in_building(lat, lon, building):
if height < building.height:
dist = t * TerrainService.haversine_distance(lat1, lon1, lat2, lon2)
return dist
return None
# Singleton instance
buildings_service = BuildingsService()