From 58ee6b4163a911eaf2cce1200783c6c443eb1b62 Mon Sep 17 00:00:00 2001 From: mytec Date: Fri, 30 Jan 2026 23:39:17 +0200 Subject: [PATCH] @mytec: iter1.2 start --- RFCP-Iteration-1.2-Terrain-Integration.md | 653 ++++++++++++++++++++++ 1 file changed, 653 insertions(+) create mode 100644 RFCP-Iteration-1.2-Terrain-Integration.md diff --git a/RFCP-Iteration-1.2-Terrain-Integration.md b/RFCP-Iteration-1.2-Terrain-Integration.md new file mode 100644 index 0000000..8bbfe4c --- /dev/null +++ b/RFCP-Iteration-1.2-Terrain-Integration.md @@ -0,0 +1,653 @@ +# RFCP Backend - Iteration 1.2: Terrain Integration + +**Date:** January 30, 2025 +**Type:** Backend Development +**Estimated:** 4-6 hours (Phase 2.1 + 2.2 from roadmap) +**Location:** `/opt/rfcp/backend/` + +--- + +## 🎯 Goal + +Add SRTM elevation data service and Line-of-Sight calculations for realistic RF coverage. + +--- + +## 📋 Pre-reading (IMPORTANT) + +Before starting, read these documents in project knowledge: + +1. `RFCP-Backend-Roadmap-Complete.md` — Phase 2 details (lines 312-600) +2. `RFCP-Iteration-1.1-Backend-Foundation.md` — current state + +--- + +## 📊 Current State + +```bash +# Backend running with 1.1 complete +systemctl status rfcp-backend.service # ✅ active + +# Structure from 1.1 +/opt/rfcp/backend/ +├── app/ +│ ├── main.py +│ ├── api/routes/ # health.py, projects.py +│ ├── core/ # config.py, database.py +│ └── models/ # project.py, site.py +├── venv/ +└── requirements.txt +``` + +--- + +## ✅ Tasks + +### 1. Install Additional Dependencies + +```bash +cd /opt/rfcp/backend +source venv/bin/activate + +pip install aiofiles httpx +pip freeze > requirements.txt +``` + +--- + +### 2. Create Data Directory + +```bash +mkdir -p /opt/rfcp/backend/data/srtm +chown -R root:root /opt/rfcp/backend/data +``` + +--- + +### 3. Create Terrain Service + +**app/services/__init__.py:** +```python +# empty file +``` + +**app/services/terrain_service.py:** +```python +import struct +import asyncio +import aiofiles +import httpx +from pathlib import Path +from typing import List, Optional, Tuple +import numpy as np + +class TerrainService: + """ + SRTM elevation data service + - Downloads and caches .hgt tiles + - Provides elevation lookups + - Generates elevation profiles + """ + + # SRTM tile dimensions (1 arc-second = 3601x3601, 3 arc-second = 1201x1201) + TILE_SIZE = 3601 # 1 arc-second (30m resolution) + + # Mirror URLs for SRTM data (USGS requires login, use mirrors) + SRTM_MIRRORS = [ + "https://elevation-tiles-prod.s3.amazonaws.com/skadi/{lat_dir}/{tile_name}.hgt.gz", + "https://s3.amazonaws.com/elevation-tiles-prod/skadi/{lat_dir}/{tile_name}.hgt.gz", + ] + + def __init__(self, cache_dir: str = "/opt/rfcp/backend/data/srtm"): + self.cache_dir = Path(cache_dir) + self.cache_dir.mkdir(exist_ok=True, parents=True) + self._tile_cache: dict[str, np.ndarray] = {} # In-memory cache + self._max_cached_tiles = 10 # Limit memory usage + + def get_tile_name(self, lat: float, lon: float) -> str: + """Convert lat/lon to SRTM tile name (e.g., N48E035)""" + lat_int = int(lat) if lat >= 0 else int(lat) - 1 + lon_int = int(lon) if lon >= 0 else int(lon) - 1 + + lat_letter = 'N' if lat_int >= 0 else 'S' + lon_letter = 'E' if lon_int >= 0 else 'W' + + return f"{lat_letter}{abs(lat_int):02d}{lon_letter}{abs(lon_int):03d}" + + def get_tile_path(self, tile_name: str) -> Path: + """Get local path for tile""" + return self.cache_dir / f"{tile_name}.hgt" + + async def download_tile(self, tile_name: str) -> bool: + """Download SRTM tile from mirror""" + import gzip + + tile_path = self.get_tile_path(tile_name) + if tile_path.exists(): + return True + + lat_dir = tile_name[:3] # e.g., "N48" + + async with httpx.AsyncClient(timeout=60.0) as client: + for mirror in self.SRTM_MIRRORS: + url = mirror.format(lat_dir=lat_dir, tile_name=tile_name) + try: + response = await client.get(url) + if response.status_code == 200: + # Decompress gzip + decompressed = gzip.decompress(response.content) + + async with aiofiles.open(tile_path, 'wb') as f: + await f.write(decompressed) + + print(f"Downloaded {tile_name} from {mirror}") + return True + except Exception as e: + print(f"Failed to download from {mirror}: {e}") + continue + + print(f"Failed to download tile {tile_name}") + return False + + async def load_tile(self, tile_name: str) -> Optional[np.ndarray]: + """Load tile into memory (with caching)""" + # Check memory cache + if tile_name in self._tile_cache: + return self._tile_cache[tile_name] + + tile_path = self.get_tile_path(tile_name) + + # Download if missing + if not tile_path.exists(): + success = await self.download_tile(tile_name) + if not success: + return None + + # Read HGT file (big-endian signed 16-bit integers) + try: + async with aiofiles.open(tile_path, 'rb') as f: + data = await f.read() + + # Parse as numpy array + arr = np.frombuffer(data, dtype='>i2').reshape(self.TILE_SIZE, self.TILE_SIZE) + + # Manage cache size + if len(self._tile_cache) >= self._max_cached_tiles: + # Remove oldest entry + oldest = next(iter(self._tile_cache)) + del self._tile_cache[oldest] + + self._tile_cache[tile_name] = arr + return arr + + except Exception as e: + print(f"Error loading tile {tile_name}: {e}") + return None + + async def get_elevation(self, lat: float, lon: float) -> float: + """Get elevation at specific coordinate (meters above sea level)""" + tile_name = self.get_tile_name(lat, lon) + tile = await self.load_tile(tile_name) + + if tile is None: + return 0.0 # No data, assume sea level + + # Calculate position within tile + lat_int = int(lat) if lat >= 0 else int(lat) - 1 + lon_int = int(lon) if lon >= 0 else int(lon) - 1 + + lat_frac = lat - lat_int + lon_frac = lon - lon_int + + # Row 0 = north edge, row 3600 = south edge + row = int((1 - lat_frac) * (self.TILE_SIZE - 1)) + col = int(lon_frac * (self.TILE_SIZE - 1)) + + # Clamp to valid range + row = max(0, min(row, self.TILE_SIZE - 1)) + col = max(0, min(col, self.TILE_SIZE - 1)) + + elevation = tile[row, col] + + # -32768 = void/no data + if elevation == -32768: + return 0.0 + + return float(elevation) + + async def get_elevation_profile( + self, + lat1: float, lon1: float, + lat2: float, lon2: float, + num_points: int = 100 + ) -> List[dict]: + """ + Get elevation profile between two points + + Returns list of {lat, lon, elevation, distance} dicts + """ + lats = np.linspace(lat1, lat2, num_points) + lons = np.linspace(lon1, lon2, num_points) + + # Calculate cumulative distances + total_distance = self.haversine_distance(lat1, lon1, lat2, lon2) + distances = np.linspace(0, total_distance, num_points) + + profile = [] + for i, (lat, lon, dist) in enumerate(zip(lats, lons, distances)): + elev = await self.get_elevation(lat, lon) + profile.append({ + "lat": float(lat), + "lon": float(lon), + "elevation": elev, + "distance": float(dist) + }) + + return profile + + @staticmethod + def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float: + """Calculate distance between two points in meters""" + EARTH_RADIUS = 6371000 # meters + + lat1, lon1, lat2, lon2 = map(np.radians, [lat1, lon1, lat2, lon2]) + + dlat = lat2 - lat1 + dlon = lon2 - lon1 + + a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2 + c = 2 * np.arcsin(np.sqrt(a)) + + return EARTH_RADIUS * c + + +# Singleton instance +terrain_service = TerrainService() +``` + +--- + +### 4. Create Line-of-Sight Service + +**app/services/los_service.py:** +```python +import numpy as np +from typing import Tuple, List +from app.services.terrain_service import terrain_service, TerrainService + +class LineOfSightService: + """ + Line-of-Sight calculations with terrain + """ + + EARTH_RADIUS = 6371000 # meters + K_FACTOR = 4/3 # Standard atmospheric refraction + + def __init__(self, terrain: TerrainService = None): + self.terrain = terrain or terrain_service + + async def check_line_of_sight( + self, + tx_lat: float, tx_lon: float, tx_height: float, + rx_lat: float, rx_lon: float, rx_height: float = 1.5, + num_samples: int = 50 + ) -> dict: + """ + Check line-of-sight between transmitter and receiver + + Args: + tx_lat, tx_lon: Transmitter coordinates + tx_height: Transmitter antenna height above ground (meters) + rx_lat, rx_lon: Receiver coordinates + rx_height: Receiver height above ground (meters), default 1.5m (person) + num_samples: Number of points to sample along path + + Returns: + { + "has_los": bool, + "clearance": float, # minimum clearance in meters (negative = blocked) + "blocked_at": float | None, # distance where blocked (meters) + "profile": [...] # elevation profile with LOS line + } + """ + # Get elevation profile + profile = await self.terrain.get_elevation_profile( + tx_lat, tx_lon, rx_lat, rx_lon, num_samples + ) + + if not profile: + return {"has_los": True, "clearance": 0, "blocked_at": None, "profile": []} + + # Get endpoint elevations + tx_ground = profile[0]["elevation"] + rx_ground = profile[-1]["elevation"] + + tx_total = tx_ground + tx_height + rx_total = rx_ground + rx_height + + total_distance = profile[-1]["distance"] + + min_clearance = float('inf') + blocked_at = None + + # Check each point along path + for point in profile: + d = point["distance"] + terrain_elev = point["elevation"] + + if total_distance == 0: + los_height = tx_total + else: + # Linear interpolation of LOS line + los_height = tx_total + (rx_total - tx_total) * (d / total_distance) + + # Earth curvature correction (with atmospheric refraction) + # Effective Earth radius = K * actual radius + effective_radius = self.K_FACTOR * self.EARTH_RADIUS + curvature = (d * (total_distance - d)) / (2 * effective_radius) + + # LOS height after curvature correction + los_height_corrected = los_height - curvature + + # Clearance at this point + clearance = los_height_corrected - terrain_elev + + # Add to profile for visualization + point["los_height"] = los_height_corrected + point["clearance"] = clearance + + if clearance < min_clearance: + min_clearance = clearance + if clearance <= 0: + blocked_at = d + + has_los = min_clearance > 0 + + return { + "has_los": has_los, + "clearance": min_clearance, + "blocked_at": blocked_at, + "profile": profile + } + + async def calculate_fresnel_clearance( + self, + tx_lat: float, tx_lon: float, tx_height: float, + rx_lat: float, rx_lon: float, rx_height: float, + frequency_mhz: float, + num_samples: int = 50 + ) -> dict: + """ + Calculate Fresnel zone clearance + + 60% clearance of 1st Fresnel zone = good signal + + Returns: + { + "clearance_percent": float, # worst-case clearance as % of required + "has_adequate_clearance": bool, # >= 60% + "worst_point_distance": float, + "fresnel_profile": [...] + } + """ + profile = await self.terrain.get_elevation_profile( + tx_lat, tx_lon, rx_lat, rx_lon, num_samples + ) + + if not profile: + return { + "clearance_percent": 100.0, + "has_adequate_clearance": True, + "worst_point_distance": 0, + "fresnel_profile": [] + } + + tx_ground = profile[0]["elevation"] + rx_ground = profile[-1]["elevation"] + + tx_total = tx_ground + tx_height + rx_total = rx_ground + rx_height + + total_distance = profile[-1]["distance"] + + # Wavelength (λ = c / f) + wavelength = 300.0 / frequency_mhz # meters + + worst_clearance_pct = 100.0 + worst_distance = 0.0 + + for point in profile: + d = point["distance"] + terrain_elev = point["elevation"] + + if d == 0 or d == total_distance: + continue # Skip endpoints + + # LOS height at this point + if total_distance > 0: + los_height = tx_total + (rx_total - tx_total) * (d / total_distance) + else: + los_height = tx_total + + # 1st Fresnel zone radius at this point + d1 = d + d2 = total_distance - d + fresnel_radius = np.sqrt((wavelength * d1 * d2) / total_distance) + + # Required clearance (60% of 1st Fresnel zone) + required_clearance = 0.6 * fresnel_radius + + # Actual clearance + actual_clearance = los_height - terrain_elev + + # Clearance as percentage of required + if required_clearance > 0: + clearance_pct = (actual_clearance / required_clearance) * 100 + else: + clearance_pct = 100.0 + + # Add to profile + point["fresnel_radius"] = fresnel_radius + point["required_clearance"] = required_clearance + point["clearance_percent"] = clearance_pct + + if clearance_pct < worst_clearance_pct: + worst_clearance_pct = clearance_pct + worst_distance = d + + return { + "clearance_percent": worst_clearance_pct, + "has_adequate_clearance": worst_clearance_pct >= 60.0, + "worst_point_distance": worst_distance, + "fresnel_profile": profile + } + + +# Singleton instance +los_service = LineOfSightService() +``` + +--- + +### 5. Create Terrain API Routes + +**app/api/routes/terrain.py:** +```python +from fastapi import APIRouter, HTTPException, Query +from typing import Optional +from app.services.terrain_service import terrain_service +from app.services.los_service import los_service + +router = APIRouter() + + +@router.get("/elevation") +async def get_elevation( + lat: float = Query(..., ge=-90, le=90, description="Latitude"), + lon: float = Query(..., ge=-180, le=180, description="Longitude") +): + """Get elevation at a specific point""" + elevation = await terrain_service.get_elevation(lat, lon) + return { + "lat": lat, + "lon": lon, + "elevation": elevation, + "unit": "meters" + } + + +@router.get("/profile") +async def get_elevation_profile( + lat1: float = Query(..., description="Start latitude"), + lon1: float = Query(..., description="Start longitude"), + lat2: float = Query(..., description="End latitude"), + lon2: float = Query(..., description="End longitude"), + points: int = Query(100, ge=10, le=500, description="Number of sample points") +): + """Get elevation profile between two points""" + profile = await terrain_service.get_elevation_profile(lat1, lon1, lat2, lon2, points) + + return { + "start": {"lat": lat1, "lon": lon1}, + "end": {"lat": lat2, "lon": lon2}, + "num_points": len(profile), + "profile": profile + } + + +@router.get("/los") +async def check_line_of_sight( + tx_lat: float = Query(..., description="Transmitter latitude"), + tx_lon: float = Query(..., description="Transmitter longitude"), + tx_height: float = Query(..., ge=0, description="Transmitter height above ground (m)"), + rx_lat: float = Query(..., description="Receiver latitude"), + rx_lon: float = Query(..., description="Receiver longitude"), + rx_height: float = Query(1.5, ge=0, description="Receiver height above ground (m)") +): + """Check line-of-sight between transmitter and receiver""" + result = await los_service.check_line_of_sight( + tx_lat, tx_lon, tx_height, + rx_lat, rx_lon, rx_height + ) + return result + + +@router.get("/fresnel") +async def check_fresnel_clearance( + tx_lat: float = Query(..., description="Transmitter latitude"), + tx_lon: float = Query(..., description="Transmitter longitude"), + tx_height: float = Query(..., ge=0, description="Transmitter height (m)"), + rx_lat: float = Query(..., description="Receiver latitude"), + rx_lon: float = Query(..., description="Receiver longitude"), + rx_height: float = Query(1.5, ge=0, description="Receiver height (m)"), + frequency: float = Query(..., ge=100, le=6000, description="Frequency (MHz)") +): + """Calculate Fresnel zone clearance""" + result = await los_service.calculate_fresnel_clearance( + tx_lat, tx_lon, tx_height, + rx_lat, rx_lon, rx_height, + frequency + ) + return result + + +@router.get("/tiles") +async def list_cached_tiles(): + """List cached SRTM tiles""" + tiles = list(terrain_service.cache_dir.glob("*.hgt")) + return { + "cache_dir": str(terrain_service.cache_dir), + "tiles": [t.stem for t in tiles], + "count": len(tiles) + } +``` + +--- + +### 6. Register Routes in main.py + +**Update app/main.py:** +```python +# Add import +from app.api.routes import health, projects, terrain + +# Add router +app.include_router(terrain.router, prefix="/api/terrain", tags=["terrain"]) +``` + +--- + +### 7. Restart & Test + +```bash +# Install deps +cd /opt/rfcp/backend +source venv/bin/activate +pip install aiofiles httpx + +# Restart service +sudo systemctl restart rfcp-backend.service + +# Check logs +journalctl -u rfcp-backend -f + +# Test endpoints +curl "https://api.rfcp.eliah.one/api/terrain/elevation?lat=48.5&lon=35.0" + +curl "https://api.rfcp.eliah.one/api/terrain/profile?lat1=48.5&lon1=35.0&lat2=48.6&lon2=35.1&points=50" + +curl "https://api.rfcp.eliah.one/api/terrain/los?tx_lat=48.5&tx_lon=35.0&tx_height=30&rx_lat=48.52&rx_lon=35.02" + +curl "https://api.rfcp.eliah.one/api/terrain/fresnel?tx_lat=48.5&tx_lon=35.0&tx_height=30&rx_lat=48.52&rx_lon=35.02&frequency=1800" +``` + +--- + +## ✅ Success Criteria + +- [ ] `/api/terrain/elevation` returns elevation for any Ukraine coordinate +- [ ] SRTM tiles auto-download to `/opt/rfcp/backend/data/srtm/` +- [ ] `/api/terrain/profile` returns elevation array between two points +- [ ] `/api/terrain/los` returns `has_los: true/false` with clearance +- [ ] `/api/terrain/fresnel` returns clearance percentage +- [ ] In-memory tile cache working (≤10 tiles) +- [ ] Swagger docs show all new endpoints + +--- + +## 📁 Files Created + +``` +app/services/ +├── __init__.py # NEW +├── terrain_service.py # NEW - SRTM handling +└── los_service.py # NEW - LoS + Fresnel + +app/api/routes/ +└── terrain.py # NEW - API endpoints + +data/srtm/ +└── *.hgt # Auto-downloaded tiles +``` + +--- + +## 📝 Notes + +- SRTM tiles ~25MB each, auto-download on first request +- Ukraine needs ~20-30 tiles for full coverage +- First request to new area will be slow (download) +- Subsequent requests fast (cached in memory + disk) +- Earth curvature + atmospheric refraction (K=4/3) included + +--- + +## 🔜 Next: Iteration 1.3 + +- Integrate terrain into coverage calculation +- Add `use_terrain` flag to settings +- Enhanced path loss with LoS/Fresnel factors + +--- + +**Ready for Claude Code** 🚀