Files
rfcp/docs/devlog/back/RFCP-Iteration-1.2-Terrain-Integration.md

20 KiB

RFCP Backend - Iteration 1.2: Terrain Integration

Date: January 30, 2025
Type: Backend Development
Estimated: 4-6 hours (Phase 2.1 + 2.2 from roadmap)
Location: /opt/rfcp/backend/


🎯 Goal

Add SRTM elevation data service and Line-of-Sight calculations for realistic RF coverage.


📋 Pre-reading (IMPORTANT)

Before starting, read these documents in project knowledge:

  1. RFCP-Backend-Roadmap-Complete.md — Phase 2 details (lines 312-600)
  2. RFCP-Iteration-1.1-Backend-Foundation.md — current state

📊 Current State

# Backend running with 1.1 complete
systemctl status rfcp-backend.service  # ✅ active

# Structure from 1.1
/opt/rfcp/backend/
├── app/
│   ├── main.py
│   ├── api/routes/          # health.py, projects.py
│   ├── core/                # config.py, database.py
│   └── models/              # project.py, site.py
├── venv/
└── requirements.txt

Tasks

1. Install Additional Dependencies

cd /opt/rfcp/backend
source venv/bin/activate

pip install aiofiles httpx
pip freeze > requirements.txt

2. Create Data Directory

mkdir -p /opt/rfcp/backend/data/srtm
chown -R root:root /opt/rfcp/backend/data

3. Create Terrain Service

app/services/init.py:

# empty file

app/services/terrain_service.py:

import struct
import asyncio
import aiofiles
import httpx
from pathlib import Path
from typing import List, Optional, Tuple
import numpy as np

class TerrainService:
    """
    SRTM elevation data service
    - Downloads and caches .hgt tiles
    - Provides elevation lookups
    - Generates elevation profiles
    """
    
    # SRTM tile dimensions (1 arc-second = 3601x3601, 3 arc-second = 1201x1201)
    TILE_SIZE = 3601  # 1 arc-second (30m resolution)
    
    # Mirror URLs for SRTM data (USGS requires login, use mirrors)
    SRTM_MIRRORS = [
        "https://elevation-tiles-prod.s3.amazonaws.com/skadi/{lat_dir}/{tile_name}.hgt.gz",
        "https://s3.amazonaws.com/elevation-tiles-prod/skadi/{lat_dir}/{tile_name}.hgt.gz",
    ]
    
    def __init__(self, cache_dir: str = "/opt/rfcp/backend/data/srtm"):
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(exist_ok=True, parents=True)
        self._tile_cache: dict[str, np.ndarray] = {}  # In-memory cache
        self._max_cached_tiles = 10  # Limit memory usage
    
    def get_tile_name(self, lat: float, lon: float) -> str:
        """Convert lat/lon to SRTM tile name (e.g., N48E035)"""
        lat_int = int(lat) if lat >= 0 else int(lat) - 1
        lon_int = int(lon) if lon >= 0 else int(lon) - 1
        
        lat_letter = 'N' if lat_int >= 0 else 'S'
        lon_letter = 'E' if lon_int >= 0 else 'W'
        
        return f"{lat_letter}{abs(lat_int):02d}{lon_letter}{abs(lon_int):03d}"
    
    def get_tile_path(self, tile_name: str) -> Path:
        """Get local path for tile"""
        return self.cache_dir / f"{tile_name}.hgt"
    
    async def download_tile(self, tile_name: str) -> bool:
        """Download SRTM tile from mirror"""
        import gzip
        
        tile_path = self.get_tile_path(tile_name)
        if tile_path.exists():
            return True
        
        lat_dir = tile_name[:3]  # e.g., "N48"
        
        async with httpx.AsyncClient(timeout=60.0) as client:
            for mirror in self.SRTM_MIRRORS:
                url = mirror.format(lat_dir=lat_dir, tile_name=tile_name)
                try:
                    response = await client.get(url)
                    if response.status_code == 200:
                        # Decompress gzip
                        decompressed = gzip.decompress(response.content)
                        
                        async with aiofiles.open(tile_path, 'wb') as f:
                            await f.write(decompressed)
                        
                        print(f"Downloaded {tile_name} from {mirror}")
                        return True
                except Exception as e:
                    print(f"Failed to download from {mirror}: {e}")
                    continue
        
        print(f"Failed to download tile {tile_name}")
        return False
    
    async def load_tile(self, tile_name: str) -> Optional[np.ndarray]:
        """Load tile into memory (with caching)"""
        # Check memory cache
        if tile_name in self._tile_cache:
            return self._tile_cache[tile_name]
        
        tile_path = self.get_tile_path(tile_name)
        
        # Download if missing
        if not tile_path.exists():
            success = await self.download_tile(tile_name)
            if not success:
                return None
        
        # Read HGT file (big-endian signed 16-bit integers)
        try:
            async with aiofiles.open(tile_path, 'rb') as f:
                data = await f.read()
            
            # Parse as numpy array
            arr = np.frombuffer(data, dtype='>i2').reshape(self.TILE_SIZE, self.TILE_SIZE)
            
            # Manage cache size
            if len(self._tile_cache) >= self._max_cached_tiles:
                # Remove oldest entry
                oldest = next(iter(self._tile_cache))
                del self._tile_cache[oldest]
            
            self._tile_cache[tile_name] = arr
            return arr
            
        except Exception as e:
            print(f"Error loading tile {tile_name}: {e}")
            return None
    
    async def get_elevation(self, lat: float, lon: float) -> float:
        """Get elevation at specific coordinate (meters above sea level)"""
        tile_name = self.get_tile_name(lat, lon)
        tile = await self.load_tile(tile_name)
        
        if tile is None:
            return 0.0  # No data, assume sea level
        
        # Calculate position within tile
        lat_int = int(lat) if lat >= 0 else int(lat) - 1
        lon_int = int(lon) if lon >= 0 else int(lon) - 1
        
        lat_frac = lat - lat_int
        lon_frac = lon - lon_int
        
        # Row 0 = north edge, row 3600 = south edge
        row = int((1 - lat_frac) * (self.TILE_SIZE - 1))
        col = int(lon_frac * (self.TILE_SIZE - 1))
        
        # Clamp to valid range
        row = max(0, min(row, self.TILE_SIZE - 1))
        col = max(0, min(col, self.TILE_SIZE - 1))
        
        elevation = tile[row, col]
        
        # -32768 = void/no data
        if elevation == -32768:
            return 0.0
        
        return float(elevation)
    
    async def get_elevation_profile(
        self, 
        lat1: float, lon1: float, 
        lat2: float, lon2: float, 
        num_points: int = 100
    ) -> List[dict]:
        """
        Get elevation profile between two points
        
        Returns list of {lat, lon, elevation, distance} dicts
        """
        lats = np.linspace(lat1, lat2, num_points)
        lons = np.linspace(lon1, lon2, num_points)
        
        # Calculate cumulative distances
        total_distance = self.haversine_distance(lat1, lon1, lat2, lon2)
        distances = np.linspace(0, total_distance, num_points)
        
        profile = []
        for i, (lat, lon, dist) in enumerate(zip(lats, lons, distances)):
            elev = await self.get_elevation(lat, lon)
            profile.append({
                "lat": float(lat),
                "lon": float(lon),
                "elevation": elev,
                "distance": float(dist)
            })
        
        return profile
    
    @staticmethod
    def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
        """Calculate distance between two points in meters"""
        EARTH_RADIUS = 6371000  # meters
        
        lat1, lon1, lat2, lon2 = map(np.radians, [lat1, lon1, lat2, lon2])
        
        dlat = lat2 - lat1
        dlon = lon2 - lon1
        
        a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2
        c = 2 * np.arcsin(np.sqrt(a))
        
        return EARTH_RADIUS * c


# Singleton instance
terrain_service = TerrainService()

4. Create Line-of-Sight Service

app/services/los_service.py:

import numpy as np
from typing import Tuple, List
from app.services.terrain_service import terrain_service, TerrainService

class LineOfSightService:
    """
    Line-of-Sight calculations with terrain
    """
    
    EARTH_RADIUS = 6371000  # meters
    K_FACTOR = 4/3  # Standard atmospheric refraction
    
    def __init__(self, terrain: TerrainService = None):
        self.terrain = terrain or terrain_service
    
    async def check_line_of_sight(
        self,
        tx_lat: float, tx_lon: float, tx_height: float,
        rx_lat: float, rx_lon: float, rx_height: float = 1.5,
        num_samples: int = 50
    ) -> dict:
        """
        Check line-of-sight between transmitter and receiver
        
        Args:
            tx_lat, tx_lon: Transmitter coordinates
            tx_height: Transmitter antenna height above ground (meters)
            rx_lat, rx_lon: Receiver coordinates  
            rx_height: Receiver height above ground (meters), default 1.5m (person)
            num_samples: Number of points to sample along path
        
        Returns:
            {
                "has_los": bool,
                "clearance": float,  # minimum clearance in meters (negative = blocked)
                "blocked_at": float | None,  # distance where blocked (meters)
                "profile": [...]  # elevation profile with LOS line
            }
        """
        # Get elevation profile
        profile = await self.terrain.get_elevation_profile(
            tx_lat, tx_lon, rx_lat, rx_lon, num_samples
        )
        
        if not profile:
            return {"has_los": True, "clearance": 0, "blocked_at": None, "profile": []}
        
        # Get endpoint elevations
        tx_ground = profile[0]["elevation"]
        rx_ground = profile[-1]["elevation"]
        
        tx_total = tx_ground + tx_height
        rx_total = rx_ground + rx_height
        
        total_distance = profile[-1]["distance"]
        
        min_clearance = float('inf')
        blocked_at = None
        
        # Check each point along path
        for point in profile:
            d = point["distance"]
            terrain_elev = point["elevation"]
            
            if total_distance == 0:
                los_height = tx_total
            else:
                # Linear interpolation of LOS line
                los_height = tx_total + (rx_total - tx_total) * (d / total_distance)
            
            # Earth curvature correction (with atmospheric refraction)
            # Effective Earth radius = K * actual radius
            effective_radius = self.K_FACTOR * self.EARTH_RADIUS
            curvature = (d * (total_distance - d)) / (2 * effective_radius)
            
            # LOS height after curvature correction
            los_height_corrected = los_height - curvature
            
            # Clearance at this point
            clearance = los_height_corrected - terrain_elev
            
            # Add to profile for visualization
            point["los_height"] = los_height_corrected
            point["clearance"] = clearance
            
            if clearance < min_clearance:
                min_clearance = clearance
                if clearance <= 0:
                    blocked_at = d
        
        has_los = min_clearance > 0
        
        return {
            "has_los": has_los,
            "clearance": min_clearance,
            "blocked_at": blocked_at,
            "profile": profile
        }
    
    async def calculate_fresnel_clearance(
        self,
        tx_lat: float, tx_lon: float, tx_height: float,
        rx_lat: float, rx_lon: float, rx_height: float,
        frequency_mhz: float,
        num_samples: int = 50
    ) -> dict:
        """
        Calculate Fresnel zone clearance
        
        60% clearance of 1st Fresnel zone = good signal
        
        Returns:
            {
                "clearance_percent": float,  # worst-case clearance as % of required
                "has_adequate_clearance": bool,  # >= 60%
                "worst_point_distance": float,
                "fresnel_profile": [...]
            }
        """
        profile = await self.terrain.get_elevation_profile(
            tx_lat, tx_lon, rx_lat, rx_lon, num_samples
        )
        
        if not profile:
            return {
                "clearance_percent": 100.0,
                "has_adequate_clearance": True,
                "worst_point_distance": 0,
                "fresnel_profile": []
            }
        
        tx_ground = profile[0]["elevation"]
        rx_ground = profile[-1]["elevation"]
        
        tx_total = tx_ground + tx_height
        rx_total = rx_ground + rx_height
        
        total_distance = profile[-1]["distance"]
        
        # Wavelength (λ = c / f)
        wavelength = 300.0 / frequency_mhz  # meters
        
        worst_clearance_pct = 100.0
        worst_distance = 0.0
        
        for point in profile:
            d = point["distance"]
            terrain_elev = point["elevation"]
            
            if d == 0 or d == total_distance:
                continue  # Skip endpoints
            
            # LOS height at this point
            if total_distance > 0:
                los_height = tx_total + (rx_total - tx_total) * (d / total_distance)
            else:
                los_height = tx_total
            
            # 1st Fresnel zone radius at this point
            d1 = d
            d2 = total_distance - d
            fresnel_radius = np.sqrt((wavelength * d1 * d2) / total_distance)
            
            # Required clearance (60% of 1st Fresnel zone)
            required_clearance = 0.6 * fresnel_radius
            
            # Actual clearance
            actual_clearance = los_height - terrain_elev
            
            # Clearance as percentage of required
            if required_clearance > 0:
                clearance_pct = (actual_clearance / required_clearance) * 100
            else:
                clearance_pct = 100.0
            
            # Add to profile
            point["fresnel_radius"] = fresnel_radius
            point["required_clearance"] = required_clearance
            point["clearance_percent"] = clearance_pct
            
            if clearance_pct < worst_clearance_pct:
                worst_clearance_pct = clearance_pct
                worst_distance = d
        
        return {
            "clearance_percent": worst_clearance_pct,
            "has_adequate_clearance": worst_clearance_pct >= 60.0,
            "worst_point_distance": worst_distance,
            "fresnel_profile": profile
        }


# Singleton instance  
los_service = LineOfSightService()

5. Create Terrain API Routes

app/api/routes/terrain.py:

from fastapi import APIRouter, HTTPException, Query
from typing import Optional
from app.services.terrain_service import terrain_service
from app.services.los_service import los_service

router = APIRouter()


@router.get("/elevation")
async def get_elevation(
    lat: float = Query(..., ge=-90, le=90, description="Latitude"),
    lon: float = Query(..., ge=-180, le=180, description="Longitude")
):
    """Get elevation at a specific point"""
    elevation = await terrain_service.get_elevation(lat, lon)
    return {
        "lat": lat,
        "lon": lon,
        "elevation": elevation,
        "unit": "meters"
    }


@router.get("/profile")
async def get_elevation_profile(
    lat1: float = Query(..., description="Start latitude"),
    lon1: float = Query(..., description="Start longitude"),
    lat2: float = Query(..., description="End latitude"),
    lon2: float = Query(..., description="End longitude"),
    points: int = Query(100, ge=10, le=500, description="Number of sample points")
):
    """Get elevation profile between two points"""
    profile = await terrain_service.get_elevation_profile(lat1, lon1, lat2, lon2, points)
    
    return {
        "start": {"lat": lat1, "lon": lon1},
        "end": {"lat": lat2, "lon": lon2},
        "num_points": len(profile),
        "profile": profile
    }


@router.get("/los")
async def check_line_of_sight(
    tx_lat: float = Query(..., description="Transmitter latitude"),
    tx_lon: float = Query(..., description="Transmitter longitude"),
    tx_height: float = Query(..., ge=0, description="Transmitter height above ground (m)"),
    rx_lat: float = Query(..., description="Receiver latitude"),
    rx_lon: float = Query(..., description="Receiver longitude"),
    rx_height: float = Query(1.5, ge=0, description="Receiver height above ground (m)")
):
    """Check line-of-sight between transmitter and receiver"""
    result = await los_service.check_line_of_sight(
        tx_lat, tx_lon, tx_height,
        rx_lat, rx_lon, rx_height
    )
    return result


@router.get("/fresnel")
async def check_fresnel_clearance(
    tx_lat: float = Query(..., description="Transmitter latitude"),
    tx_lon: float = Query(..., description="Transmitter longitude"),
    tx_height: float = Query(..., ge=0, description="Transmitter height (m)"),
    rx_lat: float = Query(..., description="Receiver latitude"),
    rx_lon: float = Query(..., description="Receiver longitude"),
    rx_height: float = Query(1.5, ge=0, description="Receiver height (m)"),
    frequency: float = Query(..., ge=100, le=6000, description="Frequency (MHz)")
):
    """Calculate Fresnel zone clearance"""
    result = await los_service.calculate_fresnel_clearance(
        tx_lat, tx_lon, tx_height,
        rx_lat, rx_lon, rx_height,
        frequency
    )
    return result


@router.get("/tiles")
async def list_cached_tiles():
    """List cached SRTM tiles"""
    tiles = list(terrain_service.cache_dir.glob("*.hgt"))
    return {
        "cache_dir": str(terrain_service.cache_dir),
        "tiles": [t.stem for t in tiles],
        "count": len(tiles)
    }

6. Register Routes in main.py

Update app/main.py:

# Add import
from app.api.routes import health, projects, terrain

# Add router
app.include_router(terrain.router, prefix="/api/terrain", tags=["terrain"])

7. Restart & Test

# Install deps
cd /opt/rfcp/backend
source venv/bin/activate
pip install aiofiles httpx

# Restart service
sudo systemctl restart rfcp-backend.service

# Check logs
journalctl -u rfcp-backend -f

# Test endpoints
curl "https://api.rfcp.eliah.one/api/terrain/elevation?lat=48.5&lon=35.0"

curl "https://api.rfcp.eliah.one/api/terrain/profile?lat1=48.5&lon1=35.0&lat2=48.6&lon2=35.1&points=50"

curl "https://api.rfcp.eliah.one/api/terrain/los?tx_lat=48.5&tx_lon=35.0&tx_height=30&rx_lat=48.52&rx_lon=35.02"

curl "https://api.rfcp.eliah.one/api/terrain/fresnel?tx_lat=48.5&tx_lon=35.0&tx_height=30&rx_lat=48.52&rx_lon=35.02&frequency=1800"

Success Criteria

  • /api/terrain/elevation returns elevation for any Ukraine coordinate
  • SRTM tiles auto-download to /opt/rfcp/backend/data/srtm/
  • /api/terrain/profile returns elevation array between two points
  • /api/terrain/los returns has_los: true/false with clearance
  • /api/terrain/fresnel returns clearance percentage
  • In-memory tile cache working (≤10 tiles)
  • Swagger docs show all new endpoints

📁 Files Created

app/services/
├── __init__.py           # NEW
├── terrain_service.py    # NEW - SRTM handling
└── los_service.py        # NEW - LoS + Fresnel

app/api/routes/
└── terrain.py            # NEW - API endpoints

data/srtm/
└── *.hgt                 # Auto-downloaded tiles

📝 Notes

  • SRTM tiles ~25MB each, auto-download on first request
  • Ukraine needs ~20-30 tiles for full coverage
  • First request to new area will be slow (download)
  • Subsequent requests fast (cached in memory + disk)
  • Earth curvature + atmospheric refraction (K=4/3) included

🔜 Next: Iteration 1.3

  • Integrate terrain into coverage calculation
  • Add use_terrain flag to settings
  • Enhanced path loss with LoS/Fresnel factors

Ready for Claude Code 🚀