Files
rfcp/backend/app/services/water_service.py
mytec defa3ad440 @mytec: feat: Phase 3.0 Architecture Refactor
Major refactoring of RFCP backend:
- Modular propagation models (8 models)
- SharedMemoryManager for terrain data
- ProcessPoolExecutor parallel processing
- WebSocket progress streaming
- Building filtering pipeline (351k → 15k)
- 82 unit tests

Performance: Standard preset 38s → 5s (7.6x speedup)

Known issue: Detailed preset timeout (fix in 3.1.0)
2026-02-01 23:12:26 +02:00

234 lines
7.5 KiB
Python

"""
OSM water bodies service for RF reflection calculations.
Water surfaces produce strong specular reflections that can boost
or create multipath interference for RF signals.
"""
import os
import asyncio
import httpx
import json
from typing import List, Tuple, Optional
from pydantic import BaseModel
from pathlib import Path
from datetime import datetime, timedelta
class WaterBody(BaseModel):
"""Water body from OSM"""
id: int
geometry: List[Tuple[float, float]] # [(lon, lat), ...]
water_type: str # river, lake, pond, reservoir
name: Optional[str] = None
class WaterCache:
"""Local cache for water body data with expiry"""
CACHE_EXPIRY_DAYS = 30
def __init__(self):
self.data_path = Path(os.environ.get('RFCP_DATA_PATH', './data'))
self.cache_path = self.data_path / 'osm' / 'water'
self.cache_path.mkdir(parents=True, exist_ok=True)
def _get_cache_key(self, min_lat: float, min_lon: float, max_lat: float, max_lon: float) -> str:
return f"{min_lat:.2f}_{min_lon:.2f}_{max_lat:.2f}_{max_lon:.2f}"
def _get_cache_file(self, cache_key: str) -> Path:
return self.cache_path / f"{cache_key}.json"
def get(self, min_lat: float, min_lon: float, max_lat: float, max_lon: float) -> Optional[list]:
cache_key = self._get_cache_key(min_lat, min_lon, max_lat, max_lon)
cache_file = self._get_cache_file(cache_key)
if not cache_file.exists():
return None
try:
data = json.loads(cache_file.read_text())
cached_at = datetime.fromisoformat(data.get('_cached_at', '2000-01-01'))
if datetime.now() - cached_at > timedelta(days=self.CACHE_EXPIRY_DAYS):
return None
return data.get('data')
except Exception as e:
print(f"[WaterCache] Failed to read cache: {e}")
return None
def set(self, min_lat: float, min_lon: float, max_lat: float, max_lon: float, data):
cache_key = self._get_cache_key(min_lat, min_lon, max_lat, max_lon)
cache_file = self._get_cache_file(cache_key)
try:
cache_data = {
'_cached_at': datetime.now().isoformat(),
'_bbox': [min_lat, min_lon, max_lat, max_lon],
'data': data
}
cache_file.write_text(json.dumps(cache_data))
except Exception as e:
print(f"[WaterCache] Failed to write cache: {e}")
def clear(self):
for f in self.cache_path.glob("*.json"):
f.unlink()
def get_size_mb(self) -> float:
total = sum(f.stat().st_size for f in self.cache_path.glob("*.json"))
return total / (1024 * 1024)
class WaterService:
"""OSM water bodies for reflection calculations"""
OVERPASS_URLS = [
"https://overpass-api.de/api/interpreter",
"https://overpass.kumi.systems/api/interpreter",
]
# Reflection coefficients by water type
REFLECTION_COEFF = {
"lake": 0.8,
"reservoir": 0.8,
"river": 0.7,
"pond": 0.75,
"water": 0.7,
}
def __init__(self):
self.cache = WaterCache()
self._memory_cache: dict[str, List[WaterBody]] = {}
async def fetch_water_bodies(
self,
min_lat: float, min_lon: float,
max_lat: float, max_lon: float
) -> List[WaterBody]:
"""Fetch water bodies in bounding box, using cache if available"""
cache_key = f"{min_lat:.2f}_{min_lon:.2f}_{max_lat:.2f}_{max_lon:.2f}"
# Memory cache
if cache_key in self._memory_cache:
return self._memory_cache[cache_key]
# Disk cache with expiry
cached = self.cache.get(min_lat, min_lon, max_lat, max_lon)
if cached is not None:
print(f"[Water] Cache hit for bbox")
bodies = [WaterBody(**w) for w in cached]
self._memory_cache[cache_key] = bodies
return bodies
# Fetch from Overpass
print(f"[Water] Fetching from Overpass API...")
query = f"""
[out:json][timeout:30];
(
way["natural"="water"]({min_lat},{min_lon},{max_lat},{max_lon});
relation["natural"="water"]({min_lat},{min_lon},{max_lat},{max_lon});
way["waterway"]({min_lat},{min_lon},{max_lat},{max_lon});
);
out body;
>;
out skel qt;
"""
data = None
max_retries = 3
for attempt in range(max_retries):
url = self.OVERPASS_URLS[attempt % len(self.OVERPASS_URLS)]
try:
timeout = 60.0 * (attempt + 1)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(url, data={"data": query})
response.raise_for_status()
data = response.json()
break
except Exception as e:
print(f"[Water] Overpass attempt {attempt + 1}/{max_retries} failed ({url}): {e}")
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt)
else:
print(f"[Water] All {max_retries} attempts failed")
return []
bodies = self._parse_response(data)
# Save to disk cache
if bodies:
self.cache.set(min_lat, min_lon, max_lat, max_lon,
[w.model_dump() for w in bodies])
self._memory_cache[cache_key] = bodies
return bodies
def _parse_response(self, data: dict) -> List[WaterBody]:
"""Parse Overpass response"""
nodes = {}
for element in data.get("elements", []):
if element["type"] == "node":
nodes[element["id"]] = (element["lon"], element["lat"])
bodies = []
for element in data.get("elements", []):
if element["type"] != "way":
continue
tags = element.get("tags", {})
water_type = tags.get("water", tags.get("waterway", tags.get("natural", "water")))
geometry = []
for node_id in element.get("nodes", []):
if node_id in nodes:
geometry.append(nodes[node_id])
if len(geometry) < 3:
continue
bodies.append(WaterBody(
id=element["id"],
geometry=geometry,
water_type=water_type,
name=tags.get("name")
))
return bodies
def get_reflection_coefficient(self, water_type: str) -> float:
"""Get reflection coefficient for water type"""
return self.REFLECTION_COEFF.get(water_type, 0.7)
def point_over_water(
self, lat: float, lon: float, water_bodies: List[WaterBody]
) -> Optional[WaterBody]:
"""Check if point is over water"""
for body in water_bodies:
if self._point_in_polygon(lat, lon, body.geometry):
return body
return None
@staticmethod
def _point_in_polygon(
lat: float, lon: float, polygon: List[Tuple[float, float]]
) -> bool:
"""Ray casting algorithm -- polygon coords are (lon, lat)"""
n = len(polygon)
inside = False
j = n - 1
for i in range(n):
xi, yi = polygon[i] # lon, lat
xj, yj = polygon[j]
if ((yi > lat) != (yj > lat)) and (lon < (xj - xi) * (lat - yi) / (yj - yi) + xi):
inside = not inside
j = i
return inside
water_service = WaterService()