Files
rfcp/backend/app/services/vegetation_service.py
2026-02-04 00:50:52 +02:00

324 lines
11 KiB
Python

"""
OSM vegetation service for RF signal attenuation.
Forests and dense vegetation attenuate RF signals significantly.
Uses ITU-R P.833 approximations for foliage loss.
"""
import os
import asyncio
import httpx
import json
from typing import List, Tuple, Optional
from pydantic import BaseModel
from pathlib import Path
from datetime import datetime, timedelta
class VegetationArea(BaseModel):
"""Vegetation area from OSM"""
id: int
geometry: List[Tuple[float, float]] # [(lon, lat), ...]
vegetation_type: str # forest, wood, scrub, orchard
density: str # dense, sparse, mixed
# Bounding box for fast rejection (computed from geometry)
min_lat: float = 0.0
max_lat: float = 0.0
min_lon: float = 0.0
max_lon: float = 0.0
class VegetationCache:
"""Local cache for vegetation data with expiry"""
CACHE_EXPIRY_DAYS = 30
def __init__(self):
self.data_path = Path(os.environ.get('RFCP_DATA_PATH', './data'))
self.cache_path = self.data_path / 'osm' / 'vegetation'
self.cache_path.mkdir(parents=True, exist_ok=True)
def _get_cache_key(self, min_lat: float, min_lon: float, max_lat: float, max_lon: float) -> str:
return f"{min_lat:.2f}_{min_lon:.2f}_{max_lat:.2f}_{max_lon:.2f}"
def _get_cache_file(self, cache_key: str) -> Path:
return self.cache_path / f"{cache_key}.json"
def get(self, min_lat: float, min_lon: float, max_lat: float, max_lon: float) -> Optional[list]:
cache_key = self._get_cache_key(min_lat, min_lon, max_lat, max_lon)
cache_file = self._get_cache_file(cache_key)
if not cache_file.exists():
return None
try:
data = json.loads(cache_file.read_text())
cached_at = datetime.fromisoformat(data.get('_cached_at', '2000-01-01'))
if datetime.now() - cached_at > timedelta(days=self.CACHE_EXPIRY_DAYS):
return None
return data.get('data')
except Exception as e:
print(f"[VegetationCache] Failed to read cache: {e}")
return None
def set(self, min_lat: float, min_lon: float, max_lat: float, max_lon: float, data):
cache_key = self._get_cache_key(min_lat, min_lon, max_lat, max_lon)
cache_file = self._get_cache_file(cache_key)
try:
cache_data = {
'_cached_at': datetime.now().isoformat(),
'_bbox': [min_lat, min_lon, max_lat, max_lon],
'data': data
}
cache_file.write_text(json.dumps(cache_data))
except Exception as e:
print(f"[VegetationCache] Failed to write cache: {e}")
def clear(self):
for f in self.cache_path.glob("*.json"):
f.unlink()
def get_size_mb(self) -> float:
total = sum(f.stat().st_size for f in self.cache_path.glob("*.json"))
return total / (1024 * 1024)
class VegetationService:
"""OSM vegetation for signal attenuation"""
OVERPASS_URLS = [
"https://overpass-api.de/api/interpreter",
"https://overpass.kumi.systems/api/interpreter",
]
# Attenuation dB per 100 meters of vegetation
ATTENUATION_DB_PER_100M = {
"forest": 8.0,
"wood": 6.0,
"tree_row": 2.0,
"scrub": 3.0,
"orchard": 2.0,
"vineyard": 1.0,
"meadow": 0.5,
}
# Seasonal factor (summer = full foliage)
SEASONAL_FACTOR = {
"summer": 1.0,
"winter": 0.3,
"spring": 0.6,
"autumn": 0.7,
}
def __init__(self):
self.cache = VegetationCache()
self._memory_cache: dict[str, List[VegetationArea]] = {}
async def fetch_vegetation(
self,
min_lat: float, min_lon: float,
max_lat: float, max_lon: float
) -> List[VegetationArea]:
"""Fetch vegetation areas in bounding box, using cache if available"""
cache_key = f"{min_lat:.2f}_{min_lon:.2f}_{max_lat:.2f}_{max_lon:.2f}"
# Memory cache
if cache_key in self._memory_cache:
return self._memory_cache[cache_key]
# Disk cache with expiry
cached = self.cache.get(min_lat, min_lon, max_lat, max_lon)
if cached is not None:
print(f"[Vegetation] Cache hit for bbox")
areas = []
for v in cached:
area = VegetationArea(**v)
# Recompute bbox if missing (backward compat with old cache)
if area.min_lat == 0.0 and area.max_lat == 0.0 and area.geometry:
lons = [p[0] for p in area.geometry]
lats = [p[1] for p in area.geometry]
area = VegetationArea(
id=area.id,
geometry=area.geometry,
vegetation_type=area.vegetation_type,
density=area.density,
min_lat=min(lats),
max_lat=max(lats),
min_lon=min(lons),
max_lon=max(lons),
)
areas.append(area)
self._memory_cache[cache_key] = areas
return areas
# Fetch from Overpass with retry
print(f"[Vegetation] Fetching from Overpass API...")
query = f"""
[out:json][timeout:30];
(
way["landuse"="forest"]({min_lat},{min_lon},{max_lat},{max_lon});
way["natural"="wood"]({min_lat},{min_lon},{max_lat},{max_lon});
way["landuse"="orchard"]({min_lat},{min_lon},{max_lat},{max_lon});
way["natural"="scrub"]({min_lat},{min_lon},{max_lat},{max_lon});
);
out body;
>;
out skel qt;
"""
data = None
max_retries = 3
for attempt in range(max_retries):
url = self.OVERPASS_URLS[attempt % len(self.OVERPASS_URLS)]
try:
timeout = 60.0 * (attempt + 1) # 60s, 120s, 180s
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(url, data={"data": query})
response.raise_for_status()
data = response.json()
break
except Exception as e:
print(f"[Vegetation] Overpass attempt {attempt + 1}/{max_retries} failed ({url}): {e}")
if attempt < max_retries - 1:
wait_time = 2 ** attempt # 1s, 2s
print(f"[Vegetation] Retrying in {wait_time}s...")
await asyncio.sleep(wait_time)
else:
print(f"[Vegetation] All {max_retries} attempts failed")
return []
areas = self._parse_response(data)
# Save to disk cache
if areas:
self.cache.set(min_lat, min_lon, max_lat, max_lon,
[v.model_dump() for v in areas])
self._memory_cache[cache_key] = areas
return areas
def _parse_response(self, data: dict) -> List[VegetationArea]:
"""Parse Overpass response"""
nodes = {}
for element in data.get("elements", []):
if element["type"] == "node":
nodes[element["id"]] = (element["lon"], element["lat"])
areas = []
for element in data.get("elements", []):
if element["type"] != "way":
continue
tags = element.get("tags", {})
veg_type = tags.get("landuse", tags.get("natural", "forest"))
geometry = []
for node_id in element.get("nodes", []):
if node_id in nodes:
geometry.append(nodes[node_id])
if len(geometry) < 3:
continue
leaf_type = tags.get("leaf_type", "mixed")
density = "dense" if leaf_type == "needleleaved" else "mixed"
# Compute bounding box from geometry (lon, lat tuples)
lons = [p[0] for p in geometry]
lats = [p[1] for p in geometry]
areas.append(VegetationArea(
id=element["id"],
geometry=geometry,
vegetation_type=veg_type,
density=density,
min_lat=min(lats),
max_lat=max(lats),
min_lon=min(lons),
max_lon=max(lons),
))
return areas
def calculate_vegetation_loss(
self,
lat1: float, lon1: float,
lat2: float, lon2: float,
vegetation_areas: List[VegetationArea],
season: str = "summer"
) -> float:
"""
Calculate signal loss through vegetation along path.
Samples points along the TX->RX path and accumulates
attenuation for each segment inside vegetation.
Returns loss in dB (capped at 40 dB).
"""
from app.services.terrain_service import TerrainService
path_length = TerrainService.haversine_distance(lat1, lon1, lat2, lon2)
if path_length < 1:
return 0.0
num_samples = max(10, int(path_length / 50))
segment_length = path_length / num_samples
total_loss = 0.0
for i in range(num_samples):
t = i / num_samples
lat = lat1 + t * (lat2 - lat1)
lon = lon1 + t * (lon2 - lon1)
veg = self._point_in_vegetation(lat, lon, vegetation_areas)
if veg:
attenuation = self.ATTENUATION_DB_PER_100M.get(veg.vegetation_type, 4.0)
seasonal = self.SEASONAL_FACTOR.get(season, 1.0)
total_loss += (segment_length / 100) * attenuation * seasonal
return min(total_loss, 40.0)
def _point_in_vegetation(
self,
lat: float, lon: float,
areas: List[VegetationArea]
) -> Optional[VegetationArea]:
"""Check if point is in vegetation area (with bbox pre-filter)"""
for area in areas:
# Quick bbox reject - skips 95%+ of polygons
if not (area.min_lat <= lat <= area.max_lat and
area.min_lon <= lon <= area.max_lon):
continue
if self._point_in_polygon(lat, lon, area.geometry):
return area
return None
@staticmethod
def _point_in_polygon(
lat: float, lon: float, polygon: List[Tuple[float, float]]
) -> bool:
"""Ray casting algorithm -- polygon coords are (lon, lat)"""
n = len(polygon)
inside = False
j = n - 1
for i in range(n):
xi, yi = polygon[i] # lon, lat
xj, yj = polygon[j]
if ((yi > lat) != (yj > lat)) and (lon < (xj - xi) * (lat - yi) / (yj - yi) + xi):
inside = not inside
j = i
return inside
vegetation_service = VegetationService()