import time import asyncio from fastapi import APIRouter, HTTPException, BackgroundTasks from typing import List, Optional from pydantic import BaseModel from app.services.coverage_service import ( coverage_service, CoverageSettings, SiteParams, CoveragePoint, apply_preset, PRESETS, select_propagation_model, ) from app.services.parallel_coverage_service import CancellationToken from app.services.boundary_service import calculate_coverage_boundary router = APIRouter() class CoverageRequest(BaseModel): """Request body for coverage calculation""" sites: List[SiteParams] settings: CoverageSettings = CoverageSettings() class BoundaryPoint(BaseModel): """Single boundary coordinate""" lat: float lon: float class CoverageResponse(BaseModel): """Coverage calculation response""" points: List[CoveragePoint] count: int settings: CoverageSettings stats: dict computation_time: float # seconds models_used: List[str] # which models were active boundary: Optional[List[BoundaryPoint]] = None # coverage boundary polygon @router.post("/calculate") async def calculate_coverage(request: CoverageRequest) -> CoverageResponse: """ Calculate RF coverage for one or more sites Returns grid of RSRP values with terrain and building effects. Supports propagation model presets: fast, standard, detailed, full. """ if not request.sites: raise HTTPException(400, "At least one site required") if len(request.sites) > 10: raise HTTPException(400, "Maximum 10 sites per request") # Validate settings if request.settings.radius > 50000: raise HTTPException(400, "Maximum radius 50km") if request.settings.resolution < 50: raise HTTPException(400, "Minimum resolution 50m") # Apply preset and determine active models effective_settings = apply_preset(request.settings.model_copy()) models_used = _get_active_models(effective_settings) # Add the selected propagation model for the first site's frequency env = getattr(effective_settings, 'environment', 'urban') primary_model = select_propagation_model(request.sites[0].frequency, env) if primary_model.name not in models_used: models_used.insert(0, primary_model.name) # Time the calculation start_time = time.time() cancel_token = CancellationToken() # Dynamic timeout based on radius (large radius needs more time for tiled processing) radius_m = request.settings.radius if radius_m > 30_000: calc_timeout = 600.0 # 10 min for 30-50km elif radius_m > 10_000: calc_timeout = 480.0 # 8 min for 10-30km else: calc_timeout = 300.0 # 5 min for ≤10km try: if len(request.sites) == 1: points = await asyncio.wait_for( coverage_service.calculate_coverage( request.sites[0], request.settings, cancel_token, ), timeout=calc_timeout, ) else: points = await asyncio.wait_for( coverage_service.calculate_multi_site_coverage( request.sites, request.settings, cancel_token, ), timeout=calc_timeout, ) except asyncio.TimeoutError: cancel_token.cancel() # Force cleanup orphaned worker processes from app.services.parallel_coverage_service import _kill_worker_processes killed = _kill_worker_processes() timeout_min = int(calc_timeout / 60) detail = f"Calculation timeout ({timeout_min} min). Cleaned up {killed} workers." if killed else f"Calculation timeout ({timeout_min} min) — try smaller radius or lower resolution" raise HTTPException(408, detail) except asyncio.CancelledError: cancel_token.cancel() from app.services.parallel_coverage_service import _kill_worker_processes _kill_worker_processes() raise HTTPException(499, "Client disconnected") computation_time = time.time() - start_time # Calculate stats rsrp_values = [p.rsrp for p in points] los_count = sum(1 for p in points if p.has_los) stats = { "min_rsrp": min(rsrp_values) if rsrp_values else 0, "max_rsrp": max(rsrp_values) if rsrp_values else 0, "avg_rsrp": sum(rsrp_values) / len(rsrp_values) if rsrp_values else 0, "los_percentage": (los_count / len(points) * 100) if points else 0, "points_with_buildings": sum(1 for p in points if p.building_loss > 0), "points_with_terrain_loss": sum(1 for p in points if p.terrain_loss > 0), "points_with_reflection_gain": sum(1 for p in points if p.reflection_gain > 0), "points_with_vegetation_loss": sum(1 for p in points if p.vegetation_loss > 0), "points_with_rain_loss": sum(1 for p in points if p.rain_loss > 0), "points_with_indoor_loss": sum(1 for p in points if p.indoor_loss > 0), "points_with_atmospheric_loss": sum(1 for p in points if p.atmospheric_loss > 0), } # Calculate coverage boundary boundary = None if points: boundary_coords = calculate_coverage_boundary( [p.model_dump() for p in points], threshold_dbm=request.settings.min_signal, ) if boundary_coords: boundary = [BoundaryPoint(**c) for c in boundary_coords] return CoverageResponse( points=points, count=len(points), settings=effective_settings, stats=stats, computation_time=round(computation_time, 2), models_used=models_used, boundary=boundary, ) @router.post("/preview") async def calculate_preview(request: CoverageRequest) -> CoverageResponse: """ Fast radial preview using terrain-only along 360 spokes. Returns coverage points much faster than full calculation by skipping building/OSM data and using radial spokes instead of grid. """ if not request.sites: raise HTTPException(400, "At least one site required") site = request.sites[0] effective_settings = apply_preset(request.settings.model_copy()) env = getattr(effective_settings, 'environment', 'urban') primary_model = select_propagation_model(site.frequency, env) models_used = ["terrain_los", primary_model.name] start_time = time.time() try: points = await asyncio.wait_for( coverage_service.calculate_radial_preview( site, request.settings, ), timeout=30.0, ) except asyncio.TimeoutError: raise HTTPException(408, "Preview timeout (30s)") computation_time = time.time() - start_time rsrp_values = [p.rsrp for p in points] los_count = sum(1 for p in points if p.has_los) stats = { "min_rsrp": min(rsrp_values) if rsrp_values else 0, "max_rsrp": max(rsrp_values) if rsrp_values else 0, "avg_rsrp": sum(rsrp_values) / len(rsrp_values) if rsrp_values else 0, "los_percentage": (los_count / len(points) * 100) if points else 0, "mode": "radial_preview", } return CoverageResponse( points=points, count=len(points), settings=effective_settings, stats=stats, computation_time=round(computation_time, 2), models_used=models_used, ) @router.get("/presets") async def get_presets(): """Get available propagation model presets""" return { "presets": { "fast": { "description": "Quick calculation - terrain only", **PRESETS["fast"], "estimated_speed": "~5 seconds for 5km radius" }, "standard": { "description": "Balanced - terrain + buildings with materials", **PRESETS["standard"], "estimated_speed": "~30 seconds for 5km radius" }, "detailed": { "description": "Accurate - adds dominant path + vegetation", **PRESETS["detailed"], "estimated_speed": "~2 minutes for 5km radius" }, "full": { "description": "Maximum realism - all models + water + vegetation", **PRESETS["full"], "estimated_speed": "~5 minutes for 5km radius" } } } @router.get("/buildings") async def get_buildings( min_lat: float, min_lon: float, max_lat: float, max_lon: float ): """ Get buildings in bounding box (for debugging/visualization) """ from app.services.buildings_service import buildings_service # Limit bbox size if (max_lat - min_lat) > 0.1 or (max_lon - min_lon) > 0.1: raise HTTPException(400, "Bbox too large (max 0.1 degrees)") buildings = await buildings_service.fetch_buildings( min_lat, min_lon, max_lat, max_lon ) return { "count": len(buildings), "buildings": [b.model_dump() for b in buildings] } @router.post("/link-budget") async def calculate_link_budget(request: dict): """Calculate point-to-point link budget. Body: { "tx_lat": 48.46, "tx_lon": 35.04, "tx_power_dbm": 43, "tx_gain_dbi": 18, "tx_cable_loss_db": 2, "tx_height_m": 30, "rx_lat": 48.50, "rx_lon": 35.10, "rx_gain_dbi": 0, "rx_cable_loss_db": 0, "rx_sensitivity_dbm": -100, "rx_height_m": 1.5, "frequency_mhz": 1800 } """ import math from app.services.terrain_service import terrain_service # Extract parameters with defaults tx_lat = request.get("tx_lat", 48.46) tx_lon = request.get("tx_lon", 35.04) tx_power_dbm = request.get("tx_power_dbm", 43) tx_gain_dbi = request.get("tx_gain_dbi", 18) tx_cable_loss_db = request.get("tx_cable_loss_db", 2) tx_height_m = request.get("tx_height_m", 30) rx_lat = request.get("rx_lat", 48.50) rx_lon = request.get("rx_lon", 35.10) rx_gain_dbi = request.get("rx_gain_dbi", 0) rx_cable_loss_db = request.get("rx_cable_loss_db", 0) rx_sensitivity_dbm = request.get("rx_sensitivity_dbm", -100) rx_height_m = request.get("rx_height_m", 1.5) freq = request.get("frequency_mhz", 1800) # Calculate distance distance_m = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon) distance_km = distance_m / 1000 # Get elevations tx_elev = await terrain_service.get_elevation(tx_lat, tx_lon) rx_elev = await terrain_service.get_elevation(rx_lat, rx_lon) # EIRP eirp_dbm = tx_power_dbm + tx_gain_dbi - tx_cable_loss_db # Free space path loss if distance_km > 0: fspl_db = 20 * math.log10(distance_km) + 20 * math.log10(freq) + 32.45 else: fspl_db = 0 # Terrain profile for LOS check profile = await terrain_service.get_elevation_profile( tx_lat, tx_lon, rx_lat, rx_lon, num_points=100 ) # LOS check - does terrain block line of sight? tx_total_height = tx_elev + tx_height_m rx_total_height = rx_elev + rx_height_m terrain_loss_db = 0.0 los_clear = True obstructions = [] for i, point in enumerate(profile): if i == 0 or i == len(profile) - 1: continue # Linear interpolation of LOS line at this point fraction = i / (len(profile) - 1) los_height = tx_total_height + fraction * (rx_total_height - tx_total_height) if point["elevation"] > los_height: los_clear = False obstruction_height = point["elevation"] - los_height obstructions.append({ "distance_m": point["distance"], "height_above_los_m": round(obstruction_height, 1), }) # Knife-edge diffraction estimate: ~6dB per major obstruction terrain_loss_db += min(6.0, obstruction_height * 0.3) # Cap terrain loss at reasonable max terrain_loss_db = min(terrain_loss_db, 40.0) total_path_loss = fspl_db + terrain_loss_db # Received power rx_power_dbm = eirp_dbm - total_path_loss + rx_gain_dbi - rx_cable_loss_db # Link margin margin_db = rx_power_dbm - rx_sensitivity_dbm return { "distance_km": round(distance_km, 2), "distance_m": round(distance_m, 1), "tx_elevation_m": round(tx_elev, 1), "rx_elevation_m": round(rx_elev, 1), "eirp_dbm": round(eirp_dbm, 1), "fspl_db": round(fspl_db, 1), "terrain_loss_db": round(terrain_loss_db, 1), "total_path_loss_db": round(total_path_loss, 1), "los_clear": los_clear, "obstructions": obstructions, "rx_power_dbm": round(rx_power_dbm, 1), "margin_db": round(margin_db, 1), "status": "OK" if margin_db >= 0 else "FAIL", "link_budget": { "tx_power_dbm": tx_power_dbm, "tx_gain_dbi": tx_gain_dbi, "tx_cable_loss_db": tx_cable_loss_db, "rx_gain_dbi": rx_gain_dbi, "rx_cable_loss_db": rx_cable_loss_db, "rx_sensitivity_dbm": rx_sensitivity_dbm, }, } @router.post("/fresnel-profile") async def fresnel_profile(request: dict): """Calculate terrain profile with Fresnel zone boundaries. Body: { "tx_lat": 48.46, "tx_lon": 35.04, "tx_height_m": 30, "rx_lat": 48.50, "rx_lon": 35.10, "rx_height_m": 1.5, "frequency_mhz": 1800, "num_points": 100 } """ import math from app.services.terrain_service import terrain_service tx_lat = request.get("tx_lat", 48.46) tx_lon = request.get("tx_lon", 35.04) rx_lat = request.get("rx_lat", 48.50) rx_lon = request.get("rx_lon", 35.10) tx_height = request.get("tx_height_m", 30) rx_height = request.get("rx_height_m", 1.5) freq = request.get("frequency_mhz", 1800) num_points = request.get("num_points", 100) # Get terrain profile profile = await terrain_service.get_elevation_profile( tx_lat, tx_lon, rx_lat, rx_lon, num_points ) if not profile: return {"error": "Could not generate terrain profile"} total_distance = profile[-1]["distance"] if profile else 0 # Get endpoint elevations tx_elev = profile[0]["elevation"] rx_elev = profile[-1]["elevation"] tx_total = tx_elev + tx_height rx_total = rx_elev + rx_height wavelength = 300.0 / freq # meters # Calculate Fresnel zone at each profile point fresnel_data = [] los_blocked = False fresnel_blocked = False worst_clearance = float('inf') fresnel_intrusion_count = 0 for i, point in enumerate(profile): d1 = point["distance"] # distance from tx d2 = total_distance - d1 # distance to rx # LOS height at this point (linear interpolation) if total_distance > 0: fraction = d1 / total_distance else: fraction = 0 los_height = tx_total + fraction * (rx_total - tx_total) # First Fresnel zone radius if d1 > 0 and d2 > 0 and total_distance > 0: f1_radius = math.sqrt((1 * wavelength * d1 * d2) / total_distance) else: f1_radius = 0 # Fresnel zone boundaries (height above sea level) fresnel_top = los_height + f1_radius fresnel_bottom = los_height - f1_radius # Clearance: how much space between terrain and Fresnel bottom clearance = fresnel_bottom - point["elevation"] if clearance < worst_clearance: worst_clearance = clearance if point["elevation"] > los_height: los_blocked = True if point["elevation"] > fresnel_bottom: fresnel_blocked = True fresnel_intrusion_count += 1 fresnel_data.append({ "distance": round(point["distance"], 1), "lat": point["lat"], "lon": point["lon"], "terrain_elevation": round(point["elevation"], 1), "los_height": round(los_height, 1), "fresnel_top": round(fresnel_top, 1), "fresnel_bottom": round(fresnel_bottom, 1), "f1_radius": round(f1_radius, 1), "clearance": round(clearance, 1), }) # Calculate Fresnel clearance percentage fresnel_clear_pct = round(100 * (1 - fresnel_intrusion_count / len(profile)), 1) if profile else 100 # Estimate additional loss due to Fresnel obstruction if los_blocked: estimated_loss_db = 10 + abs(worst_clearance) * 0.5 # rough estimate elif fresnel_blocked: estimated_loss_db = 3 + (100 - fresnel_clear_pct) * 0.06 # 3-6 dB typical else: estimated_loss_db = 0 return { "profile": fresnel_data, "total_distance_m": round(total_distance, 1), "tx_elevation": round(tx_elev, 1), "rx_elevation": round(rx_elev, 1), "frequency_mhz": freq, "wavelength_m": round(wavelength, 4), "los_clear": not los_blocked, "fresnel_clear": not fresnel_blocked, "fresnel_clear_pct": fresnel_clear_pct, "worst_clearance_m": round(worst_clearance, 1), "estimated_loss_db": round(estimated_loss_db, 1), "recommendation": ( "Clear — excellent link" if not fresnel_blocked else "Fresnel zone partially blocked — expect 3-6 dB additional loss" if not los_blocked else "LOS blocked — significant diffraction loss expected" ), } @router.post("/interference") async def calculate_interference(request: CoverageRequest): """Calculate C/I (carrier-to-interference) ratio for multi-site scenario. Uses the same request format as /calculate but returns interference analysis instead of raw coverage. Requires 2+ sites to be meaningful. Returns for each grid point: - C/I ratio (carrier to interference) in dB - Best server index - Best server RSRP """ import numpy as np from app.services.gpu_service import gpu_service if len(request.sites) < 2: raise HTTPException(400, "At least 2 sites required for interference analysis") if len(request.sites) > 10: raise HTTPException(400, "Maximum 10 sites per request") # First calculate coverage for all sites start_time = time.time() cancel_token = CancellationToken() try: # Calculate coverage for each site individually site_results = [] for site in request.sites: points = await asyncio.wait_for( coverage_service.calculate_coverage( site, request.settings, cancel_token, ), timeout=120.0, # 2 min per site ) site_results.append(points) except asyncio.TimeoutError: cancel_token.cancel() raise HTTPException(408, "Calculation timeout") computation_time = time.time() - start_time # Build coordinate -> RSRP mapping for each site # We need to align the grids (same points for all sites) coord_set = set() for points in site_results: for p in points: coord_set.add((round(p.lat, 6), round(p.lon, 6))) coord_list = sorted(coord_set) # Build RSRP arrays aligned to coord_list rsrp_grids = [] frequencies = [] for idx, (site, points) in enumerate(zip(request.sites, site_results)): # Map coordinates to RSRP point_map = {(round(p.lat, 6), round(p.lon, 6)): p.rsrp for p in points} rsrp_array = np.array([ point_map.get(coord, -150) # -150 dBm = no coverage for coord in coord_list ], dtype=np.float64) rsrp_grids.append(rsrp_array) frequencies.append(site.frequency) # Calculate C/I using GPU service ci_ratio, best_server_idx, best_rsrp = gpu_service.calculate_interference_vectorized( rsrp_grids, frequencies ) # Build result points with C/I data ci_points = [] for i, (lat, lon) in enumerate(coord_list): ci_points.append({ "lat": lat, "lon": lon, "ci_ratio_db": round(float(ci_ratio[i]), 1), "best_server_idx": int(best_server_idx[i]), "best_server_rsrp": round(float(best_rsrp[i]), 1), }) # Calculate statistics ci_values = [p["ci_ratio_db"] for p in ci_points] stats = { "min_ci_db": round(min(ci_values), 1) if ci_values else 0, "max_ci_db": round(max(ci_values), 1) if ci_values else 0, "avg_ci_db": round(sum(ci_values) / len(ci_values), 1) if ci_values else 0, "good_coverage_pct": round(100 * sum(1 for c in ci_values if c >= 10) / len(ci_values), 1) if ci_values else 0, "marginal_coverage_pct": round(100 * sum(1 for c in ci_values if 0 <= c < 10) / len(ci_values), 1) if ci_values else 0, "interference_dominant_pct": round(100 * sum(1 for c in ci_values if c < 0) / len(ci_values), 1) if ci_values else 0, } # Check for frequency groups unique_freqs = set(frequencies) freq_groups = {} for freq in unique_freqs: freq_groups[freq] = sum(1 for f in frequencies if f == freq) return { "points": ci_points, "count": len(ci_points), "stats": stats, "computation_time": round(computation_time, 2), "sites": [{"name": s.name, "frequency_mhz": s.frequency} for s in request.sites], "frequency_groups": freq_groups, "warning": None if any(c > 1 for c in freq_groups.values()) else "All sites on different frequencies - no co-channel interference", } def _get_active_models(settings: CoverageSettings) -> List[str]: """Determine which propagation models are active""" models = [] # Base propagation model added by caller via select_propagation_model() if settings.use_terrain: models.append("terrain_los") if settings.use_buildings: models.append("buildings") if settings.use_materials: models.append("materials") if settings.use_dominant_path: models.append("dominant_path") if settings.use_street_canyon: models.append("street_canyon") if settings.use_reflections: models.append("reflections") if settings.use_water_reflection: models.append("water_reflection") if settings.use_vegetation: models.append("vegetation") if settings.rain_rate > 0: models.append("rain_attenuation") if settings.indoor_loss_type != "none": models.append("indoor_penetration") if settings.use_atmospheric: models.append("atmospheric") return models