651 lines
22 KiB
Python
651 lines
22 KiB
Python
import time
|
|
import asyncio
|
|
|
|
from fastapi import APIRouter, HTTPException, BackgroundTasks
|
|
from typing import List, Optional
|
|
from pydantic import BaseModel
|
|
from app.services.coverage_service import (
|
|
coverage_service,
|
|
CoverageSettings,
|
|
SiteParams,
|
|
CoveragePoint,
|
|
apply_preset,
|
|
PRESETS,
|
|
select_propagation_model,
|
|
)
|
|
from app.services.parallel_coverage_service import CancellationToken
|
|
from app.services.boundary_service import calculate_coverage_boundary
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
class CoverageRequest(BaseModel):
|
|
"""Request body for coverage calculation"""
|
|
sites: List[SiteParams]
|
|
settings: CoverageSettings = CoverageSettings()
|
|
|
|
|
|
class BoundaryPoint(BaseModel):
|
|
"""Single boundary coordinate"""
|
|
lat: float
|
|
lon: float
|
|
|
|
|
|
class CoverageResponse(BaseModel):
|
|
"""Coverage calculation response"""
|
|
points: List[CoveragePoint]
|
|
count: int
|
|
settings: CoverageSettings
|
|
stats: dict
|
|
computation_time: float # seconds
|
|
models_used: List[str] # which models were active
|
|
boundary: Optional[List[BoundaryPoint]] = None # coverage boundary polygon
|
|
|
|
|
|
@router.post("/calculate")
|
|
async def calculate_coverage(request: CoverageRequest) -> CoverageResponse:
|
|
"""
|
|
Calculate RF coverage for one or more sites
|
|
|
|
Returns grid of RSRP values with terrain and building effects.
|
|
Supports propagation model presets: fast, standard, detailed, full.
|
|
"""
|
|
if not request.sites:
|
|
raise HTTPException(400, "At least one site required")
|
|
|
|
if len(request.sites) > 10:
|
|
raise HTTPException(400, "Maximum 10 sites per request")
|
|
|
|
# Validate settings
|
|
if request.settings.radius > 50000:
|
|
raise HTTPException(400, "Maximum radius 50km")
|
|
|
|
if request.settings.resolution < 50:
|
|
raise HTTPException(400, "Minimum resolution 50m")
|
|
|
|
# Apply preset and determine active models
|
|
effective_settings = apply_preset(request.settings.model_copy())
|
|
models_used = _get_active_models(effective_settings)
|
|
|
|
# Add the selected propagation model for the first site's frequency
|
|
env = getattr(effective_settings, 'environment', 'urban')
|
|
primary_model = select_propagation_model(request.sites[0].frequency, env)
|
|
if primary_model.name not in models_used:
|
|
models_used.insert(0, primary_model.name)
|
|
|
|
# Time the calculation
|
|
start_time = time.time()
|
|
cancel_token = CancellationToken()
|
|
|
|
# Dynamic timeout based on radius (large radius needs more time for tiled processing)
|
|
radius_m = request.settings.radius
|
|
if radius_m > 30_000:
|
|
calc_timeout = 600.0 # 10 min for 30-50km
|
|
elif radius_m > 10_000:
|
|
calc_timeout = 480.0 # 8 min for 10-30km
|
|
else:
|
|
calc_timeout = 300.0 # 5 min for ≤10km
|
|
|
|
try:
|
|
if len(request.sites) == 1:
|
|
points = await asyncio.wait_for(
|
|
coverage_service.calculate_coverage(
|
|
request.sites[0],
|
|
request.settings,
|
|
cancel_token,
|
|
),
|
|
timeout=calc_timeout,
|
|
)
|
|
else:
|
|
points = await asyncio.wait_for(
|
|
coverage_service.calculate_multi_site_coverage(
|
|
request.sites,
|
|
request.settings,
|
|
cancel_token,
|
|
),
|
|
timeout=calc_timeout,
|
|
)
|
|
except asyncio.TimeoutError:
|
|
cancel_token.cancel()
|
|
# Force cleanup orphaned worker processes
|
|
from app.services.parallel_coverage_service import _kill_worker_processes
|
|
killed = _kill_worker_processes()
|
|
timeout_min = int(calc_timeout / 60)
|
|
detail = f"Calculation timeout ({timeout_min} min). Cleaned up {killed} workers." if killed else f"Calculation timeout ({timeout_min} min) — try smaller radius or lower resolution"
|
|
raise HTTPException(408, detail)
|
|
except asyncio.CancelledError:
|
|
cancel_token.cancel()
|
|
from app.services.parallel_coverage_service import _kill_worker_processes
|
|
_kill_worker_processes()
|
|
raise HTTPException(499, "Client disconnected")
|
|
|
|
computation_time = time.time() - start_time
|
|
|
|
# Calculate stats
|
|
rsrp_values = [p.rsrp for p in points]
|
|
los_count = sum(1 for p in points if p.has_los)
|
|
|
|
stats = {
|
|
"min_rsrp": min(rsrp_values) if rsrp_values else 0,
|
|
"max_rsrp": max(rsrp_values) if rsrp_values else 0,
|
|
"avg_rsrp": sum(rsrp_values) / len(rsrp_values) if rsrp_values else 0,
|
|
"los_percentage": (los_count / len(points) * 100) if points else 0,
|
|
"points_with_buildings": sum(1 for p in points if p.building_loss > 0),
|
|
"points_with_terrain_loss": sum(1 for p in points if p.terrain_loss > 0),
|
|
"points_with_reflection_gain": sum(1 for p in points if p.reflection_gain > 0),
|
|
"points_with_vegetation_loss": sum(1 for p in points if p.vegetation_loss > 0),
|
|
"points_with_rain_loss": sum(1 for p in points if p.rain_loss > 0),
|
|
"points_with_indoor_loss": sum(1 for p in points if p.indoor_loss > 0),
|
|
"points_with_atmospheric_loss": sum(1 for p in points if p.atmospheric_loss > 0),
|
|
}
|
|
|
|
# Calculate coverage boundary
|
|
boundary = None
|
|
if points:
|
|
boundary_coords = calculate_coverage_boundary(
|
|
[p.model_dump() for p in points],
|
|
threshold_dbm=request.settings.min_signal,
|
|
)
|
|
if boundary_coords:
|
|
boundary = [BoundaryPoint(**c) for c in boundary_coords]
|
|
|
|
return CoverageResponse(
|
|
points=points,
|
|
count=len(points),
|
|
settings=effective_settings,
|
|
stats=stats,
|
|
computation_time=round(computation_time, 2),
|
|
models_used=models_used,
|
|
boundary=boundary,
|
|
)
|
|
|
|
|
|
@router.post("/preview")
|
|
async def calculate_preview(request: CoverageRequest) -> CoverageResponse:
|
|
"""
|
|
Fast radial preview using terrain-only along 360 spokes.
|
|
|
|
Returns coverage points much faster than full calculation
|
|
by skipping building/OSM data and using radial spokes instead of grid.
|
|
"""
|
|
if not request.sites:
|
|
raise HTTPException(400, "At least one site required")
|
|
|
|
site = request.sites[0]
|
|
effective_settings = apply_preset(request.settings.model_copy())
|
|
|
|
env = getattr(effective_settings, 'environment', 'urban')
|
|
primary_model = select_propagation_model(site.frequency, env)
|
|
models_used = ["terrain_los", primary_model.name]
|
|
|
|
start_time = time.time()
|
|
|
|
try:
|
|
points = await asyncio.wait_for(
|
|
coverage_service.calculate_radial_preview(
|
|
site, request.settings,
|
|
),
|
|
timeout=30.0,
|
|
)
|
|
except asyncio.TimeoutError:
|
|
raise HTTPException(408, "Preview timeout (30s)")
|
|
|
|
computation_time = time.time() - start_time
|
|
|
|
rsrp_values = [p.rsrp for p in points]
|
|
los_count = sum(1 for p in points if p.has_los)
|
|
|
|
stats = {
|
|
"min_rsrp": min(rsrp_values) if rsrp_values else 0,
|
|
"max_rsrp": max(rsrp_values) if rsrp_values else 0,
|
|
"avg_rsrp": sum(rsrp_values) / len(rsrp_values) if rsrp_values else 0,
|
|
"los_percentage": (los_count / len(points) * 100) if points else 0,
|
|
"mode": "radial_preview",
|
|
}
|
|
|
|
return CoverageResponse(
|
|
points=points,
|
|
count=len(points),
|
|
settings=effective_settings,
|
|
stats=stats,
|
|
computation_time=round(computation_time, 2),
|
|
models_used=models_used,
|
|
)
|
|
|
|
|
|
@router.get("/presets")
|
|
async def get_presets():
|
|
"""Get available propagation model presets"""
|
|
return {
|
|
"presets": {
|
|
"fast": {
|
|
"description": "Quick calculation - terrain only",
|
|
**PRESETS["fast"],
|
|
"estimated_speed": "~5 seconds for 5km radius"
|
|
},
|
|
"standard": {
|
|
"description": "Balanced - terrain + buildings with materials",
|
|
**PRESETS["standard"],
|
|
"estimated_speed": "~30 seconds for 5km radius"
|
|
},
|
|
"detailed": {
|
|
"description": "Accurate - adds dominant path + vegetation",
|
|
**PRESETS["detailed"],
|
|
"estimated_speed": "~2 minutes for 5km radius"
|
|
},
|
|
"full": {
|
|
"description": "Maximum realism - all models + water + vegetation",
|
|
**PRESETS["full"],
|
|
"estimated_speed": "~5 minutes for 5km radius"
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@router.get("/buildings")
|
|
async def get_buildings(
|
|
min_lat: float,
|
|
min_lon: float,
|
|
max_lat: float,
|
|
max_lon: float
|
|
):
|
|
"""
|
|
Get buildings in bounding box (for debugging/visualization)
|
|
"""
|
|
from app.services.buildings_service import buildings_service
|
|
|
|
# Limit bbox size
|
|
if (max_lat - min_lat) > 0.1 or (max_lon - min_lon) > 0.1:
|
|
raise HTTPException(400, "Bbox too large (max 0.1 degrees)")
|
|
|
|
buildings = await buildings_service.fetch_buildings(
|
|
min_lat, min_lon, max_lat, max_lon
|
|
)
|
|
|
|
return {
|
|
"count": len(buildings),
|
|
"buildings": [b.model_dump() for b in buildings]
|
|
}
|
|
|
|
|
|
@router.post("/link-budget")
|
|
async def calculate_link_budget(request: dict):
|
|
"""Calculate point-to-point link budget.
|
|
|
|
Body: {
|
|
"tx_lat": 48.46, "tx_lon": 35.04,
|
|
"tx_power_dbm": 43, "tx_gain_dbi": 18, "tx_cable_loss_db": 2,
|
|
"tx_height_m": 30,
|
|
"rx_lat": 48.50, "rx_lon": 35.10,
|
|
"rx_gain_dbi": 0, "rx_cable_loss_db": 0, "rx_sensitivity_dbm": -100,
|
|
"rx_height_m": 1.5,
|
|
"frequency_mhz": 1800
|
|
}
|
|
"""
|
|
import math
|
|
from app.services.terrain_service import terrain_service
|
|
|
|
# Extract parameters with defaults
|
|
tx_lat = request.get("tx_lat", 48.46)
|
|
tx_lon = request.get("tx_lon", 35.04)
|
|
tx_power_dbm = request.get("tx_power_dbm", 43)
|
|
tx_gain_dbi = request.get("tx_gain_dbi", 18)
|
|
tx_cable_loss_db = request.get("tx_cable_loss_db", 2)
|
|
tx_height_m = request.get("tx_height_m", 30)
|
|
|
|
rx_lat = request.get("rx_lat", 48.50)
|
|
rx_lon = request.get("rx_lon", 35.10)
|
|
rx_gain_dbi = request.get("rx_gain_dbi", 0)
|
|
rx_cable_loss_db = request.get("rx_cable_loss_db", 0)
|
|
rx_sensitivity_dbm = request.get("rx_sensitivity_dbm", -100)
|
|
rx_height_m = request.get("rx_height_m", 1.5)
|
|
|
|
freq = request.get("frequency_mhz", 1800)
|
|
|
|
# Calculate distance
|
|
distance_m = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon)
|
|
distance_km = distance_m / 1000
|
|
|
|
# Get elevations
|
|
tx_elev = await terrain_service.get_elevation(tx_lat, tx_lon)
|
|
rx_elev = await terrain_service.get_elevation(rx_lat, rx_lon)
|
|
|
|
# EIRP
|
|
eirp_dbm = tx_power_dbm + tx_gain_dbi - tx_cable_loss_db
|
|
|
|
# Free space path loss
|
|
if distance_km > 0:
|
|
fspl_db = 20 * math.log10(distance_km) + 20 * math.log10(freq) + 32.45
|
|
else:
|
|
fspl_db = 0
|
|
|
|
# Terrain profile for LOS check
|
|
profile = await terrain_service.get_elevation_profile(
|
|
tx_lat, tx_lon, rx_lat, rx_lon, num_points=100
|
|
)
|
|
|
|
# LOS check - does terrain block line of sight?
|
|
tx_total_height = tx_elev + tx_height_m
|
|
rx_total_height = rx_elev + rx_height_m
|
|
|
|
terrain_loss_db = 0.0
|
|
los_clear = True
|
|
obstructions = []
|
|
|
|
for i, point in enumerate(profile):
|
|
if i == 0 or i == len(profile) - 1:
|
|
continue
|
|
# Linear interpolation of LOS line at this point
|
|
fraction = i / (len(profile) - 1)
|
|
los_height = tx_total_height + fraction * (rx_total_height - tx_total_height)
|
|
if point["elevation"] > los_height:
|
|
los_clear = False
|
|
obstruction_height = point["elevation"] - los_height
|
|
obstructions.append({
|
|
"distance_m": point["distance"],
|
|
"height_above_los_m": round(obstruction_height, 1),
|
|
})
|
|
# Knife-edge diffraction estimate: ~6dB per major obstruction
|
|
terrain_loss_db += min(6.0, obstruction_height * 0.3)
|
|
|
|
# Cap terrain loss at reasonable max
|
|
terrain_loss_db = min(terrain_loss_db, 40.0)
|
|
|
|
total_path_loss = fspl_db + terrain_loss_db
|
|
|
|
# Received power
|
|
rx_power_dbm = eirp_dbm - total_path_loss + rx_gain_dbi - rx_cable_loss_db
|
|
|
|
# Link margin
|
|
margin_db = rx_power_dbm - rx_sensitivity_dbm
|
|
|
|
return {
|
|
"distance_km": round(distance_km, 2),
|
|
"distance_m": round(distance_m, 1),
|
|
"tx_elevation_m": round(tx_elev, 1),
|
|
"rx_elevation_m": round(rx_elev, 1),
|
|
"eirp_dbm": round(eirp_dbm, 1),
|
|
"fspl_db": round(fspl_db, 1),
|
|
"terrain_loss_db": round(terrain_loss_db, 1),
|
|
"total_path_loss_db": round(total_path_loss, 1),
|
|
"los_clear": los_clear,
|
|
"obstructions": obstructions,
|
|
"rx_power_dbm": round(rx_power_dbm, 1),
|
|
"margin_db": round(margin_db, 1),
|
|
"status": "OK" if margin_db >= 0 else "FAIL",
|
|
"link_budget": {
|
|
"tx_power_dbm": tx_power_dbm,
|
|
"tx_gain_dbi": tx_gain_dbi,
|
|
"tx_cable_loss_db": tx_cable_loss_db,
|
|
"rx_gain_dbi": rx_gain_dbi,
|
|
"rx_cable_loss_db": rx_cable_loss_db,
|
|
"rx_sensitivity_dbm": rx_sensitivity_dbm,
|
|
},
|
|
}
|
|
|
|
|
|
@router.post("/fresnel-profile")
|
|
async def fresnel_profile(request: dict):
|
|
"""Calculate terrain profile with Fresnel zone boundaries.
|
|
|
|
Body: {
|
|
"tx_lat": 48.46, "tx_lon": 35.04, "tx_height_m": 30,
|
|
"rx_lat": 48.50, "rx_lon": 35.10, "rx_height_m": 1.5,
|
|
"frequency_mhz": 1800,
|
|
"num_points": 100
|
|
}
|
|
"""
|
|
import math
|
|
from app.services.terrain_service import terrain_service
|
|
|
|
tx_lat = request.get("tx_lat", 48.46)
|
|
tx_lon = request.get("tx_lon", 35.04)
|
|
rx_lat = request.get("rx_lat", 48.50)
|
|
rx_lon = request.get("rx_lon", 35.10)
|
|
tx_height = request.get("tx_height_m", 30)
|
|
rx_height = request.get("rx_height_m", 1.5)
|
|
freq = request.get("frequency_mhz", 1800)
|
|
num_points = request.get("num_points", 100)
|
|
|
|
# Get terrain profile
|
|
profile = await terrain_service.get_elevation_profile(
|
|
tx_lat, tx_lon, rx_lat, rx_lon, num_points
|
|
)
|
|
|
|
if not profile:
|
|
return {"error": "Could not generate terrain profile"}
|
|
|
|
total_distance = profile[-1]["distance"] if profile else 0
|
|
|
|
# Get endpoint elevations
|
|
tx_elev = profile[0]["elevation"]
|
|
rx_elev = profile[-1]["elevation"]
|
|
tx_total = tx_elev + tx_height
|
|
rx_total = rx_elev + rx_height
|
|
|
|
wavelength = 300.0 / freq # meters
|
|
|
|
# Calculate Fresnel zone at each profile point
|
|
fresnel_data = []
|
|
los_blocked = False
|
|
fresnel_blocked = False
|
|
worst_clearance = float('inf')
|
|
fresnel_intrusion_count = 0
|
|
|
|
for i, point in enumerate(profile):
|
|
d1 = point["distance"] # distance from tx
|
|
d2 = total_distance - d1 # distance to rx
|
|
|
|
# LOS height at this point (linear interpolation)
|
|
if total_distance > 0:
|
|
fraction = d1 / total_distance
|
|
else:
|
|
fraction = 0
|
|
los_height = tx_total + fraction * (rx_total - tx_total)
|
|
|
|
# First Fresnel zone radius
|
|
if d1 > 0 and d2 > 0 and total_distance > 0:
|
|
f1_radius = math.sqrt((1 * wavelength * d1 * d2) / total_distance)
|
|
else:
|
|
f1_radius = 0
|
|
|
|
# Fresnel zone boundaries (height above sea level)
|
|
fresnel_top = los_height + f1_radius
|
|
fresnel_bottom = los_height - f1_radius
|
|
|
|
# Clearance: how much space between terrain and Fresnel bottom
|
|
clearance = fresnel_bottom - point["elevation"]
|
|
|
|
if clearance < worst_clearance:
|
|
worst_clearance = clearance
|
|
|
|
if point["elevation"] > los_height:
|
|
los_blocked = True
|
|
if point["elevation"] > fresnel_bottom:
|
|
fresnel_blocked = True
|
|
fresnel_intrusion_count += 1
|
|
|
|
fresnel_data.append({
|
|
"distance": round(point["distance"], 1),
|
|
"lat": point["lat"],
|
|
"lon": point["lon"],
|
|
"terrain_elevation": round(point["elevation"], 1),
|
|
"los_height": round(los_height, 1),
|
|
"fresnel_top": round(fresnel_top, 1),
|
|
"fresnel_bottom": round(fresnel_bottom, 1),
|
|
"f1_radius": round(f1_radius, 1),
|
|
"clearance": round(clearance, 1),
|
|
})
|
|
|
|
# Calculate Fresnel clearance percentage
|
|
fresnel_clear_pct = round(100 * (1 - fresnel_intrusion_count / len(profile)), 1) if profile else 100
|
|
|
|
# Estimate additional loss due to Fresnel obstruction
|
|
if los_blocked:
|
|
estimated_loss_db = 10 + abs(worst_clearance) * 0.5 # rough estimate
|
|
elif fresnel_blocked:
|
|
estimated_loss_db = 3 + (100 - fresnel_clear_pct) * 0.06 # 3-6 dB typical
|
|
else:
|
|
estimated_loss_db = 0
|
|
|
|
return {
|
|
"profile": fresnel_data,
|
|
"total_distance_m": round(total_distance, 1),
|
|
"tx_elevation": round(tx_elev, 1),
|
|
"rx_elevation": round(rx_elev, 1),
|
|
"frequency_mhz": freq,
|
|
"wavelength_m": round(wavelength, 4),
|
|
"los_clear": not los_blocked,
|
|
"fresnel_clear": not fresnel_blocked,
|
|
"fresnel_clear_pct": fresnel_clear_pct,
|
|
"worst_clearance_m": round(worst_clearance, 1),
|
|
"estimated_loss_db": round(estimated_loss_db, 1),
|
|
"recommendation": (
|
|
"Clear — excellent link" if not fresnel_blocked
|
|
else "Fresnel zone partially blocked — expect 3-6 dB additional loss"
|
|
if not los_blocked
|
|
else "LOS blocked — significant diffraction loss expected"
|
|
),
|
|
}
|
|
|
|
|
|
@router.post("/interference")
|
|
async def calculate_interference(request: CoverageRequest):
|
|
"""Calculate C/I (carrier-to-interference) ratio for multi-site scenario.
|
|
|
|
Uses the same request format as /calculate but returns interference analysis
|
|
instead of raw coverage. Requires 2+ sites to be meaningful.
|
|
|
|
Returns for each grid point:
|
|
- C/I ratio (carrier to interference) in dB
|
|
- Best server index
|
|
- Best server RSRP
|
|
"""
|
|
import numpy as np
|
|
from app.services.gpu_service import gpu_service
|
|
|
|
if len(request.sites) < 2:
|
|
raise HTTPException(400, "At least 2 sites required for interference analysis")
|
|
|
|
if len(request.sites) > 10:
|
|
raise HTTPException(400, "Maximum 10 sites per request")
|
|
|
|
# First calculate coverage for all sites
|
|
start_time = time.time()
|
|
cancel_token = CancellationToken()
|
|
|
|
try:
|
|
# Calculate coverage for each site individually
|
|
site_results = []
|
|
for site in request.sites:
|
|
points = await asyncio.wait_for(
|
|
coverage_service.calculate_coverage(
|
|
site,
|
|
request.settings,
|
|
cancel_token,
|
|
),
|
|
timeout=120.0, # 2 min per site
|
|
)
|
|
site_results.append(points)
|
|
|
|
except asyncio.TimeoutError:
|
|
cancel_token.cancel()
|
|
raise HTTPException(408, "Calculation timeout")
|
|
|
|
computation_time = time.time() - start_time
|
|
|
|
# Build coordinate -> RSRP mapping for each site
|
|
# We need to align the grids (same points for all sites)
|
|
coord_set = set()
|
|
for points in site_results:
|
|
for p in points:
|
|
coord_set.add((round(p.lat, 6), round(p.lon, 6)))
|
|
|
|
coord_list = sorted(coord_set)
|
|
|
|
# Build RSRP arrays aligned to coord_list
|
|
rsrp_grids = []
|
|
frequencies = []
|
|
for idx, (site, points) in enumerate(zip(request.sites, site_results)):
|
|
# Map coordinates to RSRP
|
|
point_map = {(round(p.lat, 6), round(p.lon, 6)): p.rsrp for p in points}
|
|
rsrp_array = np.array([
|
|
point_map.get(coord, -150) # -150 dBm = no coverage
|
|
for coord in coord_list
|
|
], dtype=np.float64)
|
|
rsrp_grids.append(rsrp_array)
|
|
frequencies.append(site.frequency)
|
|
|
|
# Calculate C/I using GPU service
|
|
ci_ratio, best_server_idx, best_rsrp = gpu_service.calculate_interference_vectorized(
|
|
rsrp_grids, frequencies
|
|
)
|
|
|
|
# Build result points with C/I data
|
|
ci_points = []
|
|
for i, (lat, lon) in enumerate(coord_list):
|
|
ci_points.append({
|
|
"lat": lat,
|
|
"lon": lon,
|
|
"ci_ratio_db": round(float(ci_ratio[i]), 1),
|
|
"best_server_idx": int(best_server_idx[i]),
|
|
"best_server_rsrp": round(float(best_rsrp[i]), 1),
|
|
})
|
|
|
|
# Calculate statistics
|
|
ci_values = [p["ci_ratio_db"] for p in ci_points]
|
|
stats = {
|
|
"min_ci_db": round(min(ci_values), 1) if ci_values else 0,
|
|
"max_ci_db": round(max(ci_values), 1) if ci_values else 0,
|
|
"avg_ci_db": round(sum(ci_values) / len(ci_values), 1) if ci_values else 0,
|
|
"good_coverage_pct": round(100 * sum(1 for c in ci_values if c >= 10) / len(ci_values), 1) if ci_values else 0,
|
|
"marginal_coverage_pct": round(100 * sum(1 for c in ci_values if 0 <= c < 10) / len(ci_values), 1) if ci_values else 0,
|
|
"interference_dominant_pct": round(100 * sum(1 for c in ci_values if c < 0) / len(ci_values), 1) if ci_values else 0,
|
|
}
|
|
|
|
# Check for frequency groups
|
|
unique_freqs = set(frequencies)
|
|
freq_groups = {}
|
|
for freq in unique_freqs:
|
|
freq_groups[freq] = sum(1 for f in frequencies if f == freq)
|
|
|
|
return {
|
|
"points": ci_points,
|
|
"count": len(ci_points),
|
|
"stats": stats,
|
|
"computation_time": round(computation_time, 2),
|
|
"sites": [{"name": s.name, "frequency_mhz": s.frequency} for s in request.sites],
|
|
"frequency_groups": freq_groups,
|
|
"warning": None if any(c > 1 for c in freq_groups.values()) else "All sites on different frequencies - no co-channel interference",
|
|
}
|
|
|
|
|
|
def _get_active_models(settings: CoverageSettings) -> List[str]:
|
|
"""Determine which propagation models are active"""
|
|
models = [] # Base propagation model added by caller via select_propagation_model()
|
|
|
|
if settings.use_terrain:
|
|
models.append("terrain_los")
|
|
if settings.use_buildings:
|
|
models.append("buildings")
|
|
if settings.use_materials:
|
|
models.append("materials")
|
|
if settings.use_dominant_path:
|
|
models.append("dominant_path")
|
|
if settings.use_street_canyon:
|
|
models.append("street_canyon")
|
|
if settings.use_reflections:
|
|
models.append("reflections")
|
|
if settings.use_water_reflection:
|
|
models.append("water_reflection")
|
|
if settings.use_vegetation:
|
|
models.append("vegetation")
|
|
if settings.rain_rate > 0:
|
|
models.append("rain_attenuation")
|
|
if settings.indoor_loss_type != "none":
|
|
models.append("indoor_penetration")
|
|
if settings.use_atmospheric:
|
|
models.append("atmospheric")
|
|
|
|
return models
|