@mytec: 1.2iter ready for test

This commit is contained in:
2026-01-30 23:47:17 +02:00
parent 58ee6b4163
commit d13233d8e3
7 changed files with 493 additions and 15 deletions

View File

@@ -0,0 +1,101 @@
import os
from fastapi import APIRouter, HTTPException, Query
from fastapi.responses import FileResponse
from typing import Optional
from app.core.config import settings
from app.services.terrain_service import terrain_service
from app.services.los_service import los_service
router = APIRouter()
@router.get("/elevation")
async def get_elevation(
lat: float = Query(..., ge=-90, le=90, description="Latitude"),
lon: float = Query(..., ge=-180, le=180, description="Longitude")
):
"""Get elevation at a specific point"""
elevation = await terrain_service.get_elevation(lat, lon)
return {
"lat": lat,
"lon": lon,
"elevation": elevation,
"unit": "meters"
}
@router.get("/profile")
async def get_elevation_profile(
lat1: float = Query(..., description="Start latitude"),
lon1: float = Query(..., description="Start longitude"),
lat2: float = Query(..., description="End latitude"),
lon2: float = Query(..., description="End longitude"),
points: int = Query(100, ge=10, le=500, description="Number of sample points")
):
"""Get elevation profile between two points"""
profile = await terrain_service.get_elevation_profile(lat1, lon1, lat2, lon2, points)
return {
"start": {"lat": lat1, "lon": lon1},
"end": {"lat": lat2, "lon": lon2},
"num_points": len(profile),
"profile": profile
}
@router.get("/los")
async def check_line_of_sight(
tx_lat: float = Query(..., description="Transmitter latitude"),
tx_lon: float = Query(..., description="Transmitter longitude"),
tx_height: float = Query(..., ge=0, description="Transmitter height above ground (m)"),
rx_lat: float = Query(..., description="Receiver latitude"),
rx_lon: float = Query(..., description="Receiver longitude"),
rx_height: float = Query(1.5, ge=0, description="Receiver height above ground (m)")
):
"""Check line-of-sight between transmitter and receiver"""
result = await los_service.check_line_of_sight(
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height
)
return result
@router.get("/fresnel")
async def check_fresnel_clearance(
tx_lat: float = Query(..., description="Transmitter latitude"),
tx_lon: float = Query(..., description="Transmitter longitude"),
tx_height: float = Query(..., ge=0, description="Transmitter height (m)"),
rx_lat: float = Query(..., description="Receiver latitude"),
rx_lon: float = Query(..., description="Receiver longitude"),
rx_height: float = Query(1.5, ge=0, description="Receiver height (m)"),
frequency: float = Query(..., ge=100, le=6000, description="Frequency (MHz)")
):
"""Calculate Fresnel zone clearance"""
result = await los_service.calculate_fresnel_clearance(
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency
)
return result
@router.get("/tiles")
async def list_cached_tiles():
"""List cached SRTM tiles"""
tiles = list(terrain_service.cache_dir.glob("*.hgt"))
return {
"cache_dir": str(terrain_service.cache_dir),
"tiles": [t.stem for t in tiles],
"count": len(tiles)
}
@router.get("/file/{region}")
async def get_terrain_file(region: str):
"""Serve raw SRTM terrain .hgt files (legacy compatibility)."""
terrain_path = os.path.join(settings.TERRAIN_DATA_DIR, f"{region}.hgt")
if os.path.exists(terrain_path):
return FileResponse(terrain_path)
raise HTTPException(status_code=404, detail=f"Region '{region}' not found")

View File

@@ -1,13 +1,10 @@
import os
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from fastapi import FastAPI from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from app.core.config import settings
from app.core.database import connect_to_mongo, close_mongo_connection from app.core.database import connect_to_mongo, close_mongo_connection
from app.api.routes import health, projects from app.api.routes import health, projects, terrain
@asynccontextmanager @asynccontextmanager
@@ -20,7 +17,7 @@ async def lifespan(app: FastAPI):
app = FastAPI( app = FastAPI(
title="RFCP Backend API", title="RFCP Backend API",
description="RF Coverage Planning Backend", description="RF Coverage Planning Backend",
version="1.1.0", version="1.2.0",
lifespan=lifespan, lifespan=lifespan,
) )
@@ -36,20 +33,12 @@ app.add_middleware(
# Routes # Routes
app.include_router(health.router, prefix="/api/health", tags=["health"]) app.include_router(health.router, prefix="/api/health", tags=["health"])
app.include_router(projects.router, prefix="/api/projects", tags=["projects"]) app.include_router(projects.router, prefix="/api/projects", tags=["projects"])
app.include_router(terrain.router, prefix="/api/terrain", tags=["terrain"])
@app.get("/") @app.get("/")
async def root(): async def root():
return {"message": "RFCP Backend API", "version": "1.1.0"} return {"message": "RFCP Backend API", "version": "1.2.0"}
@app.get("/api/terrain/{region}")
async def get_terrain(region: str):
"""Serve SRTM terrain .hgt files."""
terrain_path = os.path.join(settings.TERRAIN_DATA_DIR, f"{region}.hgt")
if os.path.exists(terrain_path):
return FileResponse(terrain_path)
return {"error": "Region not found"}, 404
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -0,0 +1 @@
# Services package

View File

@@ -0,0 +1,195 @@
import numpy as np
from typing import Tuple, List
from app.services.terrain_service import terrain_service, TerrainService
class LineOfSightService:
"""
Line-of-Sight calculations with terrain
"""
EARTH_RADIUS = 6371000 # meters
K_FACTOR = 4 / 3 # Standard atmospheric refraction
def __init__(self, terrain: TerrainService = None):
self.terrain = terrain or terrain_service
async def check_line_of_sight(
self,
tx_lat: float, tx_lon: float, tx_height: float,
rx_lat: float, rx_lon: float, rx_height: float = 1.5,
num_samples: int = 50
) -> dict:
"""
Check line-of-sight between transmitter and receiver
Args:
tx_lat, tx_lon: Transmitter coordinates
tx_height: Transmitter antenna height above ground (meters)
rx_lat, rx_lon: Receiver coordinates
rx_height: Receiver height above ground (meters), default 1.5m (person)
num_samples: Number of points to sample along path
Returns:
{
"has_los": bool,
"clearance": float, # minimum clearance in meters (negative = blocked)
"blocked_at": float | None, # distance where blocked (meters)
"profile": [...] # elevation profile with LOS line
}
"""
# Get elevation profile
profile = await self.terrain.get_elevation_profile(
tx_lat, tx_lon, rx_lat, rx_lon, num_samples
)
if not profile:
return {"has_los": True, "clearance": 0, "blocked_at": None, "profile": []}
# Get endpoint elevations
tx_ground = profile[0]["elevation"]
rx_ground = profile[-1]["elevation"]
tx_total = tx_ground + tx_height
rx_total = rx_ground + rx_height
total_distance = profile[-1]["distance"]
min_clearance = float('inf')
blocked_at = None
# Check each point along path
for point in profile:
d = point["distance"]
terrain_elev = point["elevation"]
if total_distance == 0:
los_height = tx_total
else:
# Linear interpolation of LOS line
los_height = tx_total + (rx_total - tx_total) * (d / total_distance)
# Earth curvature correction (with atmospheric refraction)
# Effective Earth radius = K * actual radius
effective_radius = self.K_FACTOR * self.EARTH_RADIUS
curvature = (d * (total_distance - d)) / (2 * effective_radius)
# LOS height after curvature correction
los_height_corrected = los_height - curvature
# Clearance at this point
clearance = los_height_corrected - terrain_elev
# Add to profile for visualization
point["los_height"] = los_height_corrected
point["clearance"] = clearance
if clearance < min_clearance:
min_clearance = clearance
if clearance <= 0:
blocked_at = d
has_los = min_clearance > 0
return {
"has_los": has_los,
"clearance": min_clearance,
"blocked_at": blocked_at,
"profile": profile
}
async def calculate_fresnel_clearance(
self,
tx_lat: float, tx_lon: float, tx_height: float,
rx_lat: float, rx_lon: float, rx_height: float,
frequency_mhz: float,
num_samples: int = 50
) -> dict:
"""
Calculate Fresnel zone clearance
60% clearance of 1st Fresnel zone = good signal
Returns:
{
"clearance_percent": float, # worst-case clearance as % of required
"has_adequate_clearance": bool, # >= 60%
"worst_point_distance": float,
"fresnel_profile": [...]
}
"""
profile = await self.terrain.get_elevation_profile(
tx_lat, tx_lon, rx_lat, rx_lon, num_samples
)
if not profile:
return {
"clearance_percent": 100.0,
"has_adequate_clearance": True,
"worst_point_distance": 0,
"fresnel_profile": []
}
tx_ground = profile[0]["elevation"]
rx_ground = profile[-1]["elevation"]
tx_total = tx_ground + tx_height
rx_total = rx_ground + rx_height
total_distance = profile[-1]["distance"]
# Wavelength (lambda = c / f)
wavelength = 300.0 / frequency_mhz # meters
worst_clearance_pct = 100.0
worst_distance = 0.0
for point in profile:
d = point["distance"]
terrain_elev = point["elevation"]
if d == 0 or d == total_distance:
continue # Skip endpoints
# LOS height at this point
if total_distance > 0:
los_height = tx_total + (rx_total - tx_total) * (d / total_distance)
else:
los_height = tx_total
# 1st Fresnel zone radius at this point
d1 = d
d2 = total_distance - d
fresnel_radius = np.sqrt((wavelength * d1 * d2) / total_distance)
# Required clearance (60% of 1st Fresnel zone)
required_clearance = 0.6 * fresnel_radius
# Actual clearance
actual_clearance = los_height - terrain_elev
# Clearance as percentage of required
if required_clearance > 0:
clearance_pct = (actual_clearance / required_clearance) * 100
else:
clearance_pct = 100.0
# Add to profile
point["fresnel_radius"] = fresnel_radius
point["required_clearance"] = required_clearance
point["clearance_percent"] = clearance_pct
if clearance_pct < worst_clearance_pct:
worst_clearance_pct = clearance_pct
worst_distance = d
return {
"clearance_percent": worst_clearance_pct,
"has_adequate_clearance": worst_clearance_pct >= 60.0,
"worst_point_distance": worst_distance,
"fresnel_profile": profile
}
# Singleton instance
los_service = LineOfSightService()

View File

@@ -0,0 +1,191 @@
import struct
import asyncio
import aiofiles
import httpx
from pathlib import Path
from typing import List, Optional, Tuple
import numpy as np
class TerrainService:
"""
SRTM elevation data service
- Downloads and caches .hgt tiles
- Provides elevation lookups
- Generates elevation profiles
"""
# SRTM tile dimensions (1 arc-second = 3601x3601, 3 arc-second = 1201x1201)
TILE_SIZE = 3601 # 1 arc-second (30m resolution)
# Mirror URLs for SRTM data (USGS requires login, use mirrors)
SRTM_MIRRORS = [
"https://elevation-tiles-prod.s3.amazonaws.com/skadi/{lat_dir}/{tile_name}.hgt.gz",
"https://s3.amazonaws.com/elevation-tiles-prod/skadi/{lat_dir}/{tile_name}.hgt.gz",
]
def __init__(self, cache_dir: str = "/opt/rfcp/backend/data/srtm"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True, parents=True)
self._tile_cache: dict[str, np.ndarray] = {} # In-memory cache
self._max_cached_tiles = 10 # Limit memory usage
def get_tile_name(self, lat: float, lon: float) -> str:
"""Convert lat/lon to SRTM tile name (e.g., N48E035)"""
lat_int = int(lat) if lat >= 0 else int(lat) - 1
lon_int = int(lon) if lon >= 0 else int(lon) - 1
lat_letter = 'N' if lat_int >= 0 else 'S'
lon_letter = 'E' if lon_int >= 0 else 'W'
return f"{lat_letter}{abs(lat_int):02d}{lon_letter}{abs(lon_int):03d}"
def get_tile_path(self, tile_name: str) -> Path:
"""Get local path for tile"""
return self.cache_dir / f"{tile_name}.hgt"
async def download_tile(self, tile_name: str) -> bool:
"""Download SRTM tile from mirror"""
import gzip
tile_path = self.get_tile_path(tile_name)
if tile_path.exists():
return True
lat_dir = tile_name[:3] # e.g., "N48"
async with httpx.AsyncClient(timeout=60.0) as client:
for mirror in self.SRTM_MIRRORS:
url = mirror.format(lat_dir=lat_dir, tile_name=tile_name)
try:
response = await client.get(url)
if response.status_code == 200:
# Decompress gzip
decompressed = gzip.decompress(response.content)
async with aiofiles.open(tile_path, 'wb') as f:
await f.write(decompressed)
print(f"Downloaded {tile_name} from {mirror}")
return True
except Exception as e:
print(f"Failed to download from {mirror}: {e}")
continue
print(f"Failed to download tile {tile_name}")
return False
async def load_tile(self, tile_name: str) -> Optional[np.ndarray]:
"""Load tile into memory (with caching)"""
# Check memory cache
if tile_name in self._tile_cache:
return self._tile_cache[tile_name]
tile_path = self.get_tile_path(tile_name)
# Download if missing
if not tile_path.exists():
success = await self.download_tile(tile_name)
if not success:
return None
# Read HGT file (big-endian signed 16-bit integers)
try:
async with aiofiles.open(tile_path, 'rb') as f:
data = await f.read()
# Parse as numpy array
arr = np.frombuffer(data, dtype='>i2').reshape(self.TILE_SIZE, self.TILE_SIZE)
# Manage cache size
if len(self._tile_cache) >= self._max_cached_tiles:
# Remove oldest entry
oldest = next(iter(self._tile_cache))
del self._tile_cache[oldest]
self._tile_cache[tile_name] = arr
return arr
except Exception as e:
print(f"Error loading tile {tile_name}: {e}")
return None
async def get_elevation(self, lat: float, lon: float) -> float:
"""Get elevation at specific coordinate (meters above sea level)"""
tile_name = self.get_tile_name(lat, lon)
tile = await self.load_tile(tile_name)
if tile is None:
return 0.0 # No data, assume sea level
# Calculate position within tile
lat_int = int(lat) if lat >= 0 else int(lat) - 1
lon_int = int(lon) if lon >= 0 else int(lon) - 1
lat_frac = lat - lat_int
lon_frac = lon - lon_int
# Row 0 = north edge, row 3600 = south edge
row = int((1 - lat_frac) * (self.TILE_SIZE - 1))
col = int(lon_frac * (self.TILE_SIZE - 1))
# Clamp to valid range
row = max(0, min(row, self.TILE_SIZE - 1))
col = max(0, min(col, self.TILE_SIZE - 1))
elevation = tile[row, col]
# -32768 = void/no data
if elevation == -32768:
return 0.0
return float(elevation)
async def get_elevation_profile(
self,
lat1: float, lon1: float,
lat2: float, lon2: float,
num_points: int = 100
) -> List[dict]:
"""
Get elevation profile between two points
Returns list of {lat, lon, elevation, distance} dicts
"""
lats = np.linspace(lat1, lat2, num_points)
lons = np.linspace(lon1, lon2, num_points)
# Calculate cumulative distances
total_distance = self.haversine_distance(lat1, lon1, lat2, lon2)
distances = np.linspace(0, total_distance, num_points)
profile = []
for i, (lat, lon, dist) in enumerate(zip(lats, lons, distances)):
elev = await self.get_elevation(lat, lon)
profile.append({
"lat": float(lat),
"lon": float(lon),
"elevation": elev,
"distance": float(dist)
})
return profile
@staticmethod
def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Calculate distance between two points in meters"""
EARTH_RADIUS = 6371000 # meters
lat1, lon1, lat2, lon2 = map(np.radians, [lat1, lon1, lat2, lon2])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2
c = 2 * np.arcsin(np.sqrt(a))
return EARTH_RADIUS * c
# Singleton instance
terrain_service = TerrainService()

View File

@@ -8,3 +8,4 @@ pydantic-settings==2.1.0
numpy==1.26.4 numpy==1.26.4
scipy==1.12.0 scipy==1.12.0
requests==2.31.0 requests==2.31.0
httpx==0.27.0