Files
rfcp/docs/devlog/back/RFCP-Iteration-1.3-Coverage-OSM-Buildings.md

856 lines
25 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# RFCP Backend - Iteration 1.3: Coverage Calculation + OSM Buildings
**Date:** January 30, 2025
**Type:** Backend Development
**Estimated:** 6-8 hours
**Location:** `/opt/rfcp/backend/`
---
## 🎯 Goal
Implement server-side coverage calculation with terrain (SRTM) and building obstacles (OpenStreetMap) for realistic urban RF propagation.
---
## 📋 Pre-reading
1. `RFCP-Backend-Roadmap-Complete.md` — Phase 2 & 3 details
2. `RFCP-Iteration-1.2-Terrain-Integration.md` — current terrain services
---
## 📊 Current State
```bash
# Backend 1.2 complete
/opt/rfcp/backend/app/
├── services/
│ ├── terrain_service.py # SRTM elevation ✅
│ └── los_service.py # Line-of-sight + Fresnel ✅
├── api/routes/
│ └── terrain.py # /elevation, /profile, /los, /fresnel ✅
```
**What's missing:**
- Building data (OSM)
- Coverage grid calculation
- Integration of terrain + buildings into RF model
---
## ✅ Tasks
### 1. Create OSM Buildings Service
**app/services/buildings_service.py:**
```python
import httpx
import asyncio
from typing import List, Optional
from pydantic import BaseModel
from functools import lru_cache
import hashlib
import json
from pathlib import Path
class Building(BaseModel):
"""Single building footprint"""
id: int
geometry: List[List[float]] # [[lon, lat], ...]
height: float # meters
levels: Optional[int] = None
building_type: Optional[str] = None
class BuildingsService:
"""
OpenStreetMap buildings via Overpass API
"""
OVERPASS_URL = "https://overpass-api.de/api/interpreter"
DEFAULT_LEVEL_HEIGHT = 3.0 # meters per floor
DEFAULT_BUILDING_HEIGHT = 9.0 # 3 floors if unknown
def __init__(self, cache_dir: str = "/opt/rfcp/backend/data/buildings"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True, parents=True)
self._memory_cache: dict[str, List[Building]] = {}
self._max_cache_size = 50 # bbox regions
def _bbox_key(self, min_lat: float, min_lon: float, max_lat: float, max_lon: float) -> str:
"""Generate cache key for bbox"""
# Round to 0.01 degree (~1km) grid for cache efficiency
key = f"{min_lat:.2f},{min_lon:.2f},{max_lat:.2f},{max_lon:.2f}"
return hashlib.md5(key.encode()).hexdigest()[:12]
async def fetch_buildings(
self,
min_lat: float, min_lon: float,
max_lat: float, max_lon: float,
use_cache: bool = True
) -> List[Building]:
"""
Fetch buildings in bounding box from OSM
Args:
min_lat, min_lon, max_lat, max_lon: Bounding box
use_cache: Whether to use cached results
Returns:
List of Building objects with height estimates
"""
cache_key = self._bbox_key(min_lat, min_lon, max_lat, max_lon)
# Check memory cache
if use_cache and cache_key in self._memory_cache:
return self._memory_cache[cache_key]
# Check disk cache
cache_file = self.cache_dir / f"{cache_key}.json"
if use_cache and cache_file.exists():
try:
with open(cache_file, 'r') as f:
data = json.load(f)
buildings = [Building(**b) for b in data]
self._memory_cache[cache_key] = buildings
return buildings
except Exception:
pass # Fetch fresh if cache corrupted
# Fetch from Overpass API
query = f"""
[out:json][timeout:30];
(
way["building"]({min_lat},{min_lon},{max_lat},{max_lon});
relation["building"]({min_lat},{min_lon},{max_lat},{max_lon});
);
out body;
>;
out skel qt;
"""
try:
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(
self.OVERPASS_URL,
data={"data": query}
)
response.raise_for_status()
data = response.json()
except Exception as e:
print(f"Overpass API error: {e}")
return []
# Parse response
buildings = self._parse_overpass_response(data)
# Cache results
if buildings:
# Disk cache
with open(cache_file, 'w') as f:
json.dump([b.model_dump() for b in buildings], f)
# Memory cache (with size limit)
if len(self._memory_cache) >= self._max_cache_size:
oldest = next(iter(self._memory_cache))
del self._memory_cache[oldest]
self._memory_cache[cache_key] = buildings
return buildings
def _parse_overpass_response(self, data: dict) -> List[Building]:
"""Parse Overpass JSON response into Building objects"""
buildings = []
# Build node lookup
nodes = {}
for element in data.get("elements", []):
if element["type"] == "node":
nodes[element["id"]] = (element["lon"], element["lat"])
# Process ways (building footprints)
for element in data.get("elements", []):
if element["type"] != "way":
continue
tags = element.get("tags", {})
if "building" not in tags:
continue
# Get geometry
geometry = []
for node_id in element.get("nodes", []):
if node_id in nodes:
geometry.append(list(nodes[node_id]))
if len(geometry) < 3:
continue # Invalid polygon
# Estimate height
height = self._estimate_height(tags)
buildings.append(Building(
id=element["id"],
geometry=geometry,
height=height,
levels=int(tags.get("building:levels", 0)) or None,
building_type=tags.get("building")
))
return buildings
def _estimate_height(self, tags: dict) -> float:
"""Estimate building height from OSM tags"""
# Explicit height tag
if "height" in tags:
try:
h = tags["height"]
# Handle "10 m" or "10m" format
if isinstance(h, str):
h = h.replace("m", "").replace(" ", "")
return float(h)
except (ValueError, TypeError):
pass
# Calculate from levels
if "building:levels" in tags:
try:
levels = int(tags["building:levels"])
return levels * self.DEFAULT_LEVEL_HEIGHT
except (ValueError, TypeError):
pass
# Default based on building type
building_type = tags.get("building", "yes")
type_heights = {
"house": 6.0,
"residential": 12.0,
"apartments": 18.0,
"commercial": 12.0,
"industrial": 8.0,
"warehouse": 6.0,
"garage": 3.0,
"shed": 2.5,
"roof": 3.0,
"church": 15.0,
"cathedral": 30.0,
"hospital": 15.0,
"school": 12.0,
"university": 15.0,
"office": 20.0,
"retail": 6.0,
}
return type_heights.get(building_type, self.DEFAULT_BUILDING_HEIGHT)
def point_in_building(self, lat: float, lon: float, building: Building) -> bool:
"""Check if point is inside building footprint (ray casting)"""
x, y = lon, lat
polygon = building.geometry
n = len(polygon)
inside = False
j = n - 1
for i in range(n):
xi, yi = polygon[i]
xj, yj = polygon[j]
if ((yi > y) != (yj > y)) and (x < (xj - xi) * (y - yi) / (yj - yi) + xi):
inside = not inside
j = i
return inside
def line_intersects_building(
self,
lat1: float, lon1: float, height1: float,
lat2: float, lon2: float, height2: float,
building: Building
) -> Optional[float]:
"""
Check if line segment intersects building
Returns:
Distance along path where intersection occurs, or None
"""
# Simplified 2D check + height comparison
# For accurate 3D intersection, would need proper ray-polygon intersection
from app.services.terrain_service import TerrainService
# Sample points along line
num_samples = 20
for i in range(num_samples):
t = i / num_samples
lat = lat1 + t * (lat2 - lat1)
lon = lon1 + t * (lon2 - lon1)
height = height1 + t * (height2 - height1)
if self.point_in_building(lat, lon, building):
# Check if signal height is below building
if height < building.height:
# Calculate distance
dist = t * TerrainService.haversine_distance(lat1, lon1, lat2, lon2)
return dist
return None
# Singleton instance
buildings_service = BuildingsService()
```
---
### 2. Create Coverage Calculation Service
**app/services/coverage_service.py:**
```python
import numpy as np
import asyncio
from typing import List, Optional, Tuple
from pydantic import BaseModel
from app.services.terrain_service import terrain_service, TerrainService
from app.services.los_service import los_service
from app.services.buildings_service import buildings_service, Building
class CoveragePoint(BaseModel):
lat: float
lon: float
rsrp: float # dBm
distance: float # meters from site
has_los: bool
terrain_loss: float # dB
building_loss: float # dB
class CoverageSettings(BaseModel):
radius: float = 10000 # meters
resolution: float = 200 # meters
min_signal: float = -120 # dBm threshold
use_terrain: bool = True
use_buildings: bool = True
class SiteParams(BaseModel):
lat: float
lon: float
height: float = 30 # antenna height meters
power: float = 43 # dBm (20W)
gain: float = 15 # dBi
frequency: float = 1800 # MHz
azimuth: Optional[float] = None # degrees, None = omni
beamwidth: Optional[float] = 65 # degrees
class CoverageService:
"""
RF Coverage calculation with terrain and buildings
"""
EARTH_RADIUS = 6371000
def __init__(self):
self.terrain = terrain_service
self.buildings = buildings_service
self.los = los_service
async def calculate_coverage(
self,
site: SiteParams,
settings: CoverageSettings
) -> List[CoveragePoint]:
"""
Calculate coverage grid for a single site
Returns list of CoveragePoint with RSRP values
"""
points = []
# Generate grid
grid = self._generate_grid(
site.lat, site.lon,
settings.radius,
settings.resolution
)
# Fetch buildings for coverage area (if enabled)
buildings = []
if settings.use_buildings:
# Calculate bbox with margin
lat_delta = settings.radius / 111000 # ~111km per degree
lon_delta = settings.radius / (111000 * np.cos(np.radians(site.lat)))
buildings = await self.buildings.fetch_buildings(
site.lat - lat_delta, site.lon - lon_delta,
site.lat + lat_delta, site.lon + lon_delta
)
# Calculate coverage for each point
for lat, lon in grid:
point = await self._calculate_point(
site, lat, lon,
settings, buildings
)
if point.rsrp >= settings.min_signal:
points.append(point)
return points
async def calculate_multi_site_coverage(
self,
sites: List[SiteParams],
settings: CoverageSettings
) -> List[CoveragePoint]:
"""
Calculate combined coverage from multiple sites
Best server (strongest signal) wins at each point
"""
if not sites:
return []
# Get all individual coverages
all_coverages = await asyncio.gather(*[
self.calculate_coverage(site, settings)
for site in sites
])
# Combine by best signal
point_map: dict[Tuple[float, float], CoveragePoint] = {}
for coverage in all_coverages:
for point in coverage:
key = (round(point.lat, 6), round(point.lon, 6))
if key not in point_map or point.rsrp > point_map[key].rsrp:
point_map[key] = point
return list(point_map.values())
def _generate_grid(
self,
center_lat: float, center_lon: float,
radius: float, resolution: float
) -> List[Tuple[float, float]]:
"""Generate coverage grid points"""
points = []
# Convert resolution to degrees
lat_step = resolution / 111000
lon_step = resolution / (111000 * np.cos(np.radians(center_lat)))
# Calculate grid bounds
lat_delta = radius / 111000
lon_delta = radius / (111000 * np.cos(np.radians(center_lat)))
lat = center_lat - lat_delta
while lat <= center_lat + lat_delta:
lon = center_lon - lon_delta
while lon <= center_lon + lon_delta:
# Check if within radius (circular, not square)
dist = TerrainService.haversine_distance(center_lat, center_lon, lat, lon)
if dist <= radius:
points.append((lat, lon))
lon += lon_step
lat += lat_step
return points
async def _calculate_point(
self,
site: SiteParams,
lat: float, lon: float,
settings: CoverageSettings,
buildings: List[Building]
) -> CoveragePoint:
"""Calculate RSRP at a single point"""
# Distance
distance = TerrainService.haversine_distance(site.lat, site.lon, lat, lon)
if distance < 1:
distance = 1 # Avoid division by zero
# Base path loss (Okumura-Hata for urban)
path_loss = self._okumura_hata(
distance, site.frequency, site.height, 1.5 # 1.5m receiver height
)
# Antenna pattern loss (if directional)
antenna_loss = 0.0
if site.azimuth is not None and site.beamwidth:
antenna_loss = self._antenna_pattern_loss(
site.lat, site.lon, lat, lon,
site.azimuth, site.beamwidth
)
# Terrain loss (LoS check)
terrain_loss = 0.0
has_los = True
if settings.use_terrain:
los_result = await self.los.check_line_of_sight(
site.lat, site.lon, site.height,
lat, lon, 1.5 # receiver at 1.5m
)
has_los = los_result["has_los"]
if not has_los:
# Add diffraction loss based on clearance
clearance = los_result["clearance"]
terrain_loss = self._diffraction_loss(clearance, site.frequency)
# Building loss
building_loss = 0.0
if settings.use_buildings and buildings:
for building in buildings:
intersection = self.buildings.line_intersects_building(
site.lat, site.lon, site.height + await self.terrain.get_elevation(site.lat, site.lon),
lat, lon, 1.5 + await self.terrain.get_elevation(lat, lon),
building
)
if intersection is not None:
# Building penetration loss (~20dB for concrete)
building_loss += 20.0
has_los = False
break # One building is enough
# Calculate RSRP
# RSRP = Tx Power + Tx Gain - Path Loss - Antenna Loss - Terrain Loss - Building Loss
rsrp = site.power + site.gain - path_loss - antenna_loss - terrain_loss - building_loss
return CoveragePoint(
lat=lat,
lon=lon,
rsrp=rsrp,
distance=distance,
has_los=has_los,
terrain_loss=terrain_loss,
building_loss=building_loss
)
def _okumura_hata(
self,
distance: float, # meters
frequency: float, # MHz
tx_height: float, # meters
rx_height: float # meters
) -> float:
"""
Okumura-Hata path loss model (urban)
Returns path loss in dB
"""
d_km = distance / 1000
if d_km < 0.1:
d_km = 0.1 # Minimum distance
# Mobile antenna height correction (urban)
a_hm = (1.1 * np.log10(frequency) - 0.7) * rx_height - (1.56 * np.log10(frequency) - 0.8)
# Path loss
L = (69.55 + 26.16 * np.log10(frequency) - 13.82 * np.log10(tx_height) - a_hm +
(44.9 - 6.55 * np.log10(tx_height)) * np.log10(d_km))
return L
def _antenna_pattern_loss(
self,
site_lat: float, site_lon: float,
point_lat: float, point_lon: float,
azimuth: float, beamwidth: float
) -> float:
"""Calculate antenna pattern attenuation"""
# Calculate bearing from site to point
bearing = self._calculate_bearing(site_lat, site_lon, point_lat, point_lon)
# Angle difference from main lobe
angle_diff = abs(bearing - azimuth)
if angle_diff > 180:
angle_diff = 360 - angle_diff
# Simple cosine pattern approximation
# 3dB beamwidth = angle where power drops to half
half_beamwidth = beamwidth / 2
if angle_diff <= half_beamwidth:
# Within main lobe - minimal loss
loss = 3 * (angle_diff / half_beamwidth) ** 2
else:
# Outside main lobe - significant loss
loss = 3 + 12 * ((angle_diff - half_beamwidth) / half_beamwidth) ** 2
loss = min(loss, 25) # Cap at 25dB (back lobe)
return loss
def _calculate_bearing(
self,
lat1: float, lon1: float,
lat2: float, lon2: float
) -> float:
"""Calculate bearing from point 1 to point 2 (degrees)"""
lat1, lon1, lat2, lon2 = map(np.radians, [lat1, lon1, lat2, lon2])
dlon = lon2 - lon1
x = np.sin(dlon) * np.cos(lat2)
y = np.cos(lat1) * np.sin(lat2) - np.sin(lat1) * np.cos(lat2) * np.cos(dlon)
bearing = np.degrees(np.arctan2(x, y))
return (bearing + 360) % 360
def _diffraction_loss(self, clearance: float, frequency: float) -> float:
"""
Knife-edge diffraction loss
Args:
clearance: Clearance in meters (negative = obstructed)
frequency: Frequency in MHz
Returns:
Additional loss in dB
"""
if clearance >= 0:
return 0.0 # No obstruction
# Fresnel parameter approximation
# v ≈ clearance * sqrt(2 / (λ * d))
# Simplified: use clearance directly
v = abs(clearance) / 10 # Normalize
# Knife-edge loss approximation
if v <= 0:
loss = 0
elif v < 2.4:
loss = 6.02 + 9.11 * v - 1.27 * v**2
else:
loss = 13.0 + 20 * np.log10(v)
return min(loss, 40) # Cap at 40dB
# Singleton
coverage_service = CoverageService()
```
---
### 3. Create Coverage API Routes
**app/api/routes/coverage.py:**
```python
from fastapi import APIRouter, HTTPException, BackgroundTasks
from typing import List, Optional
from pydantic import BaseModel
from app.services.coverage_service import (
coverage_service,
CoverageSettings,
SiteParams,
CoveragePoint
)
router = APIRouter()
class CoverageRequest(BaseModel):
"""Request body for coverage calculation"""
sites: List[SiteParams]
settings: CoverageSettings = CoverageSettings()
class CoverageResponse(BaseModel):
"""Coverage calculation response"""
points: List[CoveragePoint]
count: int
settings: CoverageSettings
stats: dict
@router.post("/calculate")
async def calculate_coverage(request: CoverageRequest) -> CoverageResponse:
"""
Calculate RF coverage for one or more sites
Returns grid of RSRP values with terrain and building effects
"""
if not request.sites:
raise HTTPException(400, "At least one site required")
if len(request.sites) > 10:
raise HTTPException(400, "Maximum 10 sites per request")
# Validate settings
if request.settings.radius > 50000:
raise HTTPException(400, "Maximum radius 50km")
if request.settings.resolution < 50:
raise HTTPException(400, "Minimum resolution 50m")
# Calculate
if len(request.sites) == 1:
points = await coverage_service.calculate_coverage(
request.sites[0],
request.settings
)
else:
points = await coverage_service.calculate_multi_site_coverage(
request.sites,
request.settings
)
# Calculate stats
rsrp_values = [p.rsrp for p in points]
los_count = sum(1 for p in points if p.has_los)
stats = {
"min_rsrp": min(rsrp_values) if rsrp_values else 0,
"max_rsrp": max(rsrp_values) if rsrp_values else 0,
"avg_rsrp": sum(rsrp_values) / len(rsrp_values) if rsrp_values else 0,
"los_percentage": (los_count / len(points) * 100) if points else 0,
"points_with_buildings": sum(1 for p in points if p.building_loss > 0),
"points_with_terrain_loss": sum(1 for p in points if p.terrain_loss > 0),
}
return CoverageResponse(
points=points,
count=len(points),
settings=request.settings,
stats=stats
)
@router.get("/buildings")
async def get_buildings(
min_lat: float,
min_lon: float,
max_lat: float,
max_lon: float
):
"""
Get buildings in bounding box (for debugging/visualization)
"""
from app.services.buildings_service import buildings_service
# Limit bbox size
if (max_lat - min_lat) > 0.1 or (max_lon - min_lon) > 0.1:
raise HTTPException(400, "Bbox too large (max 0.1 degrees)")
buildings = await buildings_service.fetch_buildings(
min_lat, min_lon, max_lat, max_lon
)
return {
"count": len(buildings),
"buildings": [b.model_dump() for b in buildings]
}
```
---
### 4. Register Routes
**Update app/main.py:**
```python
# Add import
from app.api.routes import health, projects, terrain, coverage
# Add router
app.include_router(coverage.router, prefix="/api/coverage", tags=["coverage"])
```
**Update version:**
```python
version="1.3.0"
```
---
### 5. Create Buildings Cache Directory
```bash
mkdir -p /opt/rfcp/backend/data/buildings
```
---
### 6. Test
```bash
# Restart
sudo systemctl restart rfcp-backend
# Test buildings endpoint
curl "https://api.rfcp.eliah.one/api/coverage/buildings?min_lat=48.45&min_lon=35.0&max_lat=48.47&max_lon=35.02"
# Test coverage calculation (single site)
curl -X POST "https://api.rfcp.eliah.one/api/coverage/calculate" \
-H "Content-Type: application/json" \
-d '{
"sites": [{
"lat": 48.46,
"lon": 35.05,
"height": 30,
"power": 43,
"gain": 15,
"frequency": 1800
}],
"settings": {
"radius": 2000,
"resolution": 100,
"use_terrain": true,
"use_buildings": true
}
}'
```
---
## ✅ Success Criteria
- [ ] `/api/coverage/buildings` returns OSM buildings with heights
- [ ] Buildings cached to disk (check `/opt/rfcp/backend/data/buildings/`)
- [ ] `/api/coverage/calculate` returns coverage grid
- [ ] Response includes `terrain_loss` and `building_loss` per point
- [ ] Stats show `los_percentage` and building/terrain impact
- [ ] Swagger docs show new endpoints
---
## 📁 Files Created
```
app/services/
├── buildings_service.py # NEW - OSM Overpass integration
└── coverage_service.py # NEW - RF coverage calculation
app/api/routes/
└── coverage.py # NEW - API endpoints
data/buildings/
└── *.json # Cached building data per bbox
```
---
## 📝 Notes
- Overpass API has rate limits (~10k requests/day) — caching critical
- Building height estimation: `levels × 3m` or defaults by type
- Building penetration loss: ~20dB for concrete (simplified)
- Diffraction uses knife-edge approximation
- Coverage calculation can be slow for large areas — consider async/background tasks later
---
## 🔜 Next: Iteration 1.4
- Frontend integration (replace browser calculation with API)
- Real-time coverage updates
- Progress indication for large calculations
---
**Ready for Claude Code** 🚀