@mytec: WebGL works
This commit is contained in:
@@ -268,6 +268,358 @@ async def get_buildings(
|
||||
}
|
||||
|
||||
|
||||
@router.post("/link-budget")
|
||||
async def calculate_link_budget(request: dict):
|
||||
"""Calculate point-to-point link budget.
|
||||
|
||||
Body: {
|
||||
"tx_lat": 48.46, "tx_lon": 35.04,
|
||||
"tx_power_dbm": 43, "tx_gain_dbi": 18, "tx_cable_loss_db": 2,
|
||||
"tx_height_m": 30,
|
||||
"rx_lat": 48.50, "rx_lon": 35.10,
|
||||
"rx_gain_dbi": 0, "rx_cable_loss_db": 0, "rx_sensitivity_dbm": -100,
|
||||
"rx_height_m": 1.5,
|
||||
"frequency_mhz": 1800
|
||||
}
|
||||
"""
|
||||
import math
|
||||
from app.services.terrain_service import terrain_service
|
||||
|
||||
# Extract parameters with defaults
|
||||
tx_lat = request.get("tx_lat", 48.46)
|
||||
tx_lon = request.get("tx_lon", 35.04)
|
||||
tx_power_dbm = request.get("tx_power_dbm", 43)
|
||||
tx_gain_dbi = request.get("tx_gain_dbi", 18)
|
||||
tx_cable_loss_db = request.get("tx_cable_loss_db", 2)
|
||||
tx_height_m = request.get("tx_height_m", 30)
|
||||
|
||||
rx_lat = request.get("rx_lat", 48.50)
|
||||
rx_lon = request.get("rx_lon", 35.10)
|
||||
rx_gain_dbi = request.get("rx_gain_dbi", 0)
|
||||
rx_cable_loss_db = request.get("rx_cable_loss_db", 0)
|
||||
rx_sensitivity_dbm = request.get("rx_sensitivity_dbm", -100)
|
||||
rx_height_m = request.get("rx_height_m", 1.5)
|
||||
|
||||
freq = request.get("frequency_mhz", 1800)
|
||||
|
||||
# Calculate distance
|
||||
distance_m = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon)
|
||||
distance_km = distance_m / 1000
|
||||
|
||||
# Get elevations
|
||||
tx_elev = await terrain_service.get_elevation(tx_lat, tx_lon)
|
||||
rx_elev = await terrain_service.get_elevation(rx_lat, rx_lon)
|
||||
|
||||
# EIRP
|
||||
eirp_dbm = tx_power_dbm + tx_gain_dbi - tx_cable_loss_db
|
||||
|
||||
# Free space path loss
|
||||
if distance_km > 0:
|
||||
fspl_db = 20 * math.log10(distance_km) + 20 * math.log10(freq) + 32.45
|
||||
else:
|
||||
fspl_db = 0
|
||||
|
||||
# Terrain profile for LOS check
|
||||
profile = await terrain_service.get_elevation_profile(
|
||||
tx_lat, tx_lon, rx_lat, rx_lon, num_points=100
|
||||
)
|
||||
|
||||
# LOS check - does terrain block line of sight?
|
||||
tx_total_height = tx_elev + tx_height_m
|
||||
rx_total_height = rx_elev + rx_height_m
|
||||
|
||||
terrain_loss_db = 0.0
|
||||
los_clear = True
|
||||
obstructions = []
|
||||
|
||||
for i, point in enumerate(profile):
|
||||
if i == 0 or i == len(profile) - 1:
|
||||
continue
|
||||
# Linear interpolation of LOS line at this point
|
||||
fraction = i / (len(profile) - 1)
|
||||
los_height = tx_total_height + fraction * (rx_total_height - tx_total_height)
|
||||
if point["elevation"] > los_height:
|
||||
los_clear = False
|
||||
obstruction_height = point["elevation"] - los_height
|
||||
obstructions.append({
|
||||
"distance_m": point["distance"],
|
||||
"height_above_los_m": round(obstruction_height, 1),
|
||||
})
|
||||
# Knife-edge diffraction estimate: ~6dB per major obstruction
|
||||
terrain_loss_db += min(6.0, obstruction_height * 0.3)
|
||||
|
||||
# Cap terrain loss at reasonable max
|
||||
terrain_loss_db = min(terrain_loss_db, 40.0)
|
||||
|
||||
total_path_loss = fspl_db + terrain_loss_db
|
||||
|
||||
# Received power
|
||||
rx_power_dbm = eirp_dbm - total_path_loss + rx_gain_dbi - rx_cable_loss_db
|
||||
|
||||
# Link margin
|
||||
margin_db = rx_power_dbm - rx_sensitivity_dbm
|
||||
|
||||
return {
|
||||
"distance_km": round(distance_km, 2),
|
||||
"distance_m": round(distance_m, 1),
|
||||
"tx_elevation_m": round(tx_elev, 1),
|
||||
"rx_elevation_m": round(rx_elev, 1),
|
||||
"eirp_dbm": round(eirp_dbm, 1),
|
||||
"fspl_db": round(fspl_db, 1),
|
||||
"terrain_loss_db": round(terrain_loss_db, 1),
|
||||
"total_path_loss_db": round(total_path_loss, 1),
|
||||
"los_clear": los_clear,
|
||||
"obstructions": obstructions,
|
||||
"rx_power_dbm": round(rx_power_dbm, 1),
|
||||
"margin_db": round(margin_db, 1),
|
||||
"status": "OK" if margin_db >= 0 else "FAIL",
|
||||
"link_budget": {
|
||||
"tx_power_dbm": tx_power_dbm,
|
||||
"tx_gain_dbi": tx_gain_dbi,
|
||||
"tx_cable_loss_db": tx_cable_loss_db,
|
||||
"rx_gain_dbi": rx_gain_dbi,
|
||||
"rx_cable_loss_db": rx_cable_loss_db,
|
||||
"rx_sensitivity_dbm": rx_sensitivity_dbm,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@router.post("/fresnel-profile")
|
||||
async def fresnel_profile(request: dict):
|
||||
"""Calculate terrain profile with Fresnel zone boundaries.
|
||||
|
||||
Body: {
|
||||
"tx_lat": 48.46, "tx_lon": 35.04, "tx_height_m": 30,
|
||||
"rx_lat": 48.50, "rx_lon": 35.10, "rx_height_m": 1.5,
|
||||
"frequency_mhz": 1800,
|
||||
"num_points": 100
|
||||
}
|
||||
"""
|
||||
import math
|
||||
from app.services.terrain_service import terrain_service
|
||||
|
||||
tx_lat = request.get("tx_lat", 48.46)
|
||||
tx_lon = request.get("tx_lon", 35.04)
|
||||
rx_lat = request.get("rx_lat", 48.50)
|
||||
rx_lon = request.get("rx_lon", 35.10)
|
||||
tx_height = request.get("tx_height_m", 30)
|
||||
rx_height = request.get("rx_height_m", 1.5)
|
||||
freq = request.get("frequency_mhz", 1800)
|
||||
num_points = request.get("num_points", 100)
|
||||
|
||||
# Get terrain profile
|
||||
profile = await terrain_service.get_elevation_profile(
|
||||
tx_lat, tx_lon, rx_lat, rx_lon, num_points
|
||||
)
|
||||
|
||||
if not profile:
|
||||
return {"error": "Could not generate terrain profile"}
|
||||
|
||||
total_distance = profile[-1]["distance"] if profile else 0
|
||||
|
||||
# Get endpoint elevations
|
||||
tx_elev = profile[0]["elevation"]
|
||||
rx_elev = profile[-1]["elevation"]
|
||||
tx_total = tx_elev + tx_height
|
||||
rx_total = rx_elev + rx_height
|
||||
|
||||
wavelength = 300.0 / freq # meters
|
||||
|
||||
# Calculate Fresnel zone at each profile point
|
||||
fresnel_data = []
|
||||
los_blocked = False
|
||||
fresnel_blocked = False
|
||||
worst_clearance = float('inf')
|
||||
fresnel_intrusion_count = 0
|
||||
|
||||
for i, point in enumerate(profile):
|
||||
d1 = point["distance"] # distance from tx
|
||||
d2 = total_distance - d1 # distance to rx
|
||||
|
||||
# LOS height at this point (linear interpolation)
|
||||
if total_distance > 0:
|
||||
fraction = d1 / total_distance
|
||||
else:
|
||||
fraction = 0
|
||||
los_height = tx_total + fraction * (rx_total - tx_total)
|
||||
|
||||
# First Fresnel zone radius
|
||||
if d1 > 0 and d2 > 0 and total_distance > 0:
|
||||
f1_radius = math.sqrt((1 * wavelength * d1 * d2) / total_distance)
|
||||
else:
|
||||
f1_radius = 0
|
||||
|
||||
# Fresnel zone boundaries (height above sea level)
|
||||
fresnel_top = los_height + f1_radius
|
||||
fresnel_bottom = los_height - f1_radius
|
||||
|
||||
# Clearance: how much space between terrain and Fresnel bottom
|
||||
clearance = fresnel_bottom - point["elevation"]
|
||||
|
||||
if clearance < worst_clearance:
|
||||
worst_clearance = clearance
|
||||
|
||||
if point["elevation"] > los_height:
|
||||
los_blocked = True
|
||||
if point["elevation"] > fresnel_bottom:
|
||||
fresnel_blocked = True
|
||||
fresnel_intrusion_count += 1
|
||||
|
||||
fresnel_data.append({
|
||||
"distance": round(point["distance"], 1),
|
||||
"lat": point["lat"],
|
||||
"lon": point["lon"],
|
||||
"terrain_elevation": round(point["elevation"], 1),
|
||||
"los_height": round(los_height, 1),
|
||||
"fresnel_top": round(fresnel_top, 1),
|
||||
"fresnel_bottom": round(fresnel_bottom, 1),
|
||||
"f1_radius": round(f1_radius, 1),
|
||||
"clearance": round(clearance, 1),
|
||||
})
|
||||
|
||||
# Calculate Fresnel clearance percentage
|
||||
fresnel_clear_pct = round(100 * (1 - fresnel_intrusion_count / len(profile)), 1) if profile else 100
|
||||
|
||||
# Estimate additional loss due to Fresnel obstruction
|
||||
if los_blocked:
|
||||
estimated_loss_db = 10 + abs(worst_clearance) * 0.5 # rough estimate
|
||||
elif fresnel_blocked:
|
||||
estimated_loss_db = 3 + (100 - fresnel_clear_pct) * 0.06 # 3-6 dB typical
|
||||
else:
|
||||
estimated_loss_db = 0
|
||||
|
||||
return {
|
||||
"profile": fresnel_data,
|
||||
"total_distance_m": round(total_distance, 1),
|
||||
"tx_elevation": round(tx_elev, 1),
|
||||
"rx_elevation": round(rx_elev, 1),
|
||||
"frequency_mhz": freq,
|
||||
"wavelength_m": round(wavelength, 4),
|
||||
"los_clear": not los_blocked,
|
||||
"fresnel_clear": not fresnel_blocked,
|
||||
"fresnel_clear_pct": fresnel_clear_pct,
|
||||
"worst_clearance_m": round(worst_clearance, 1),
|
||||
"estimated_loss_db": round(estimated_loss_db, 1),
|
||||
"recommendation": (
|
||||
"Clear — excellent link" if not fresnel_blocked
|
||||
else "Fresnel zone partially blocked — expect 3-6 dB additional loss"
|
||||
if not los_blocked
|
||||
else "LOS blocked — significant diffraction loss expected"
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
@router.post("/interference")
|
||||
async def calculate_interference(request: CoverageRequest):
|
||||
"""Calculate C/I (carrier-to-interference) ratio for multi-site scenario.
|
||||
|
||||
Uses the same request format as /calculate but returns interference analysis
|
||||
instead of raw coverage. Requires 2+ sites to be meaningful.
|
||||
|
||||
Returns for each grid point:
|
||||
- C/I ratio (carrier to interference) in dB
|
||||
- Best server index
|
||||
- Best server RSRP
|
||||
"""
|
||||
import numpy as np
|
||||
from app.services.gpu_service import gpu_service
|
||||
|
||||
if len(request.sites) < 2:
|
||||
raise HTTPException(400, "At least 2 sites required for interference analysis")
|
||||
|
||||
if len(request.sites) > 10:
|
||||
raise HTTPException(400, "Maximum 10 sites per request")
|
||||
|
||||
# First calculate coverage for all sites
|
||||
start_time = time.time()
|
||||
cancel_token = CancellationToken()
|
||||
|
||||
try:
|
||||
# Calculate coverage for each site individually
|
||||
site_results = []
|
||||
for site in request.sites:
|
||||
points = await asyncio.wait_for(
|
||||
coverage_service.calculate_coverage(
|
||||
site,
|
||||
request.settings,
|
||||
cancel_token,
|
||||
),
|
||||
timeout=120.0, # 2 min per site
|
||||
)
|
||||
site_results.append(points)
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
cancel_token.cancel()
|
||||
raise HTTPException(408, "Calculation timeout")
|
||||
|
||||
computation_time = time.time() - start_time
|
||||
|
||||
# Build coordinate -> RSRP mapping for each site
|
||||
# We need to align the grids (same points for all sites)
|
||||
coord_set = set()
|
||||
for points in site_results:
|
||||
for p in points:
|
||||
coord_set.add((round(p.lat, 6), round(p.lon, 6)))
|
||||
|
||||
coord_list = sorted(coord_set)
|
||||
|
||||
# Build RSRP arrays aligned to coord_list
|
||||
rsrp_grids = []
|
||||
frequencies = []
|
||||
for idx, (site, points) in enumerate(zip(request.sites, site_results)):
|
||||
# Map coordinates to RSRP
|
||||
point_map = {(round(p.lat, 6), round(p.lon, 6)): p.rsrp for p in points}
|
||||
rsrp_array = np.array([
|
||||
point_map.get(coord, -150) # -150 dBm = no coverage
|
||||
for coord in coord_list
|
||||
], dtype=np.float64)
|
||||
rsrp_grids.append(rsrp_array)
|
||||
frequencies.append(site.frequency)
|
||||
|
||||
# Calculate C/I using GPU service
|
||||
ci_ratio, best_server_idx, best_rsrp = gpu_service.calculate_interference_vectorized(
|
||||
rsrp_grids, frequencies
|
||||
)
|
||||
|
||||
# Build result points with C/I data
|
||||
ci_points = []
|
||||
for i, (lat, lon) in enumerate(coord_list):
|
||||
ci_points.append({
|
||||
"lat": lat,
|
||||
"lon": lon,
|
||||
"ci_ratio_db": round(float(ci_ratio[i]), 1),
|
||||
"best_server_idx": int(best_server_idx[i]),
|
||||
"best_server_rsrp": round(float(best_rsrp[i]), 1),
|
||||
})
|
||||
|
||||
# Calculate statistics
|
||||
ci_values = [p["ci_ratio_db"] for p in ci_points]
|
||||
stats = {
|
||||
"min_ci_db": round(min(ci_values), 1) if ci_values else 0,
|
||||
"max_ci_db": round(max(ci_values), 1) if ci_values else 0,
|
||||
"avg_ci_db": round(sum(ci_values) / len(ci_values), 1) if ci_values else 0,
|
||||
"good_coverage_pct": round(100 * sum(1 for c in ci_values if c >= 10) / len(ci_values), 1) if ci_values else 0,
|
||||
"marginal_coverage_pct": round(100 * sum(1 for c in ci_values if 0 <= c < 10) / len(ci_values), 1) if ci_values else 0,
|
||||
"interference_dominant_pct": round(100 * sum(1 for c in ci_values if c < 0) / len(ci_values), 1) if ci_values else 0,
|
||||
}
|
||||
|
||||
# Check for frequency groups
|
||||
unique_freqs = set(frequencies)
|
||||
freq_groups = {}
|
||||
for freq in unique_freqs:
|
||||
freq_groups[freq] = sum(1 for f in frequencies if f == freq)
|
||||
|
||||
return {
|
||||
"points": ci_points,
|
||||
"count": len(ci_points),
|
||||
"stats": stats,
|
||||
"computation_time": round(computation_time, 2),
|
||||
"sites": [{"name": s.name, "frequency_mhz": s.frequency} for s in request.sites],
|
||||
"frequency_groups": freq_groups,
|
||||
"warning": None if any(c > 1 for c in freq_groups.values()) else "All sites on different frequencies - no co-channel interference",
|
||||
}
|
||||
|
||||
|
||||
def _get_active_models(settings: CoverageSettings) -> List[str]:
|
||||
"""Determine which propagation models are active"""
|
||||
models = [] # Base propagation model added by caller via select_propagation_model()
|
||||
|
||||
@@ -436,6 +436,139 @@ class GPUService:
|
||||
|
||||
return _to_cpu(rsrp)
|
||||
|
||||
def calculate_interference(
|
||||
self,
|
||||
rsrp_grids: list,
|
||||
frequencies: list,
|
||||
) -> tuple:
|
||||
"""Calculate C/I (carrier-to-interference) ratio for multi-site scenarios.
|
||||
|
||||
For each grid point:
|
||||
- C = signal strength from strongest (serving) cell
|
||||
- I = sum of signal strengths from all other co-frequency cells
|
||||
- C/I = C(dBm) - 10*log10(sum of linear interference powers)
|
||||
|
||||
Args:
|
||||
rsrp_grids: List of RSRP arrays, one per site, shape (N,) each
|
||||
frequencies: List of frequencies (MHz) for each site
|
||||
|
||||
Returns:
|
||||
(ci_ratio, best_server_idx, best_rsrp)
|
||||
ci_ratio: C/I in dB, shape (N,)
|
||||
best_server_idx: Index of serving cell per point, shape (N,)
|
||||
best_rsrp: RSRP of serving cell per point, shape (N,)
|
||||
"""
|
||||
_xp = gpu_manager.get_array_module()
|
||||
|
||||
if len(rsrp_grids) < 2:
|
||||
# Single site - no interference, return infinity C/I
|
||||
if rsrp_grids:
|
||||
n_points = len(rsrp_grids[0])
|
||||
return (
|
||||
np.full(n_points, 50.0, dtype=np.float64), # 50 dB = effectively no interference
|
||||
np.zeros(n_points, dtype=np.int32),
|
||||
np.array(rsrp_grids[0], dtype=np.float64),
|
||||
)
|
||||
return np.array([]), np.array([]), np.array([])
|
||||
|
||||
# Stack RSRP grids: shape (num_sites, num_points)
|
||||
rsrp_stack = _xp.stack([_xp.asarray(g, dtype=_xp.float64) for g in rsrp_grids], axis=0)
|
||||
num_sites, num_points = rsrp_stack.shape
|
||||
|
||||
# Convert to linear power (mW)
|
||||
rsrp_linear = _xp.power(10.0, rsrp_stack / 10.0)
|
||||
|
||||
# Best server per point
|
||||
best_server_idx = _xp.argmax(rsrp_stack, axis=0)
|
||||
best_rsrp = _xp.take_along_axis(rsrp_stack, best_server_idx[_xp.newaxis, :], axis=0)[0]
|
||||
best_rsrp_linear = _xp.take_along_axis(rsrp_linear, best_server_idx[_xp.newaxis, :], axis=0)[0]
|
||||
|
||||
# Group sites by frequency for co-channel interference
|
||||
freq_array = _xp.asarray(frequencies, dtype=_xp.float64)
|
||||
|
||||
# Calculate interference only from co-frequency sites
|
||||
interference_linear = _xp.zeros(num_points, dtype=_xp.float64)
|
||||
|
||||
for point_idx in range(num_points):
|
||||
serving_site = int(_to_cpu(best_server_idx[point_idx]))
|
||||
serving_freq = frequencies[serving_site]
|
||||
|
||||
# Sum power from all other sites on same frequency
|
||||
for site_idx in range(num_sites):
|
||||
if site_idx != serving_site and frequencies[site_idx] == serving_freq:
|
||||
interference_linear[point_idx] += rsrp_linear[site_idx, point_idx]
|
||||
|
||||
# C/I ratio in dB
|
||||
# Avoid log10(0) with small epsilon
|
||||
epsilon = 1e-30
|
||||
ci_ratio = 10 * _xp.log10(best_rsrp_linear / (interference_linear + epsilon))
|
||||
|
||||
# Clip to reasonable range (-20 to 50 dB)
|
||||
ci_ratio = _xp.clip(ci_ratio, -20, 50)
|
||||
|
||||
return (
|
||||
_to_cpu(ci_ratio),
|
||||
_to_cpu(best_server_idx).astype(np.int32),
|
||||
_to_cpu(best_rsrp),
|
||||
)
|
||||
|
||||
def calculate_interference_vectorized(
|
||||
self,
|
||||
rsrp_grids: list,
|
||||
frequencies: list,
|
||||
) -> tuple:
|
||||
"""Fully vectorized C/I calculation (faster for GPU).
|
||||
|
||||
Same as calculate_interference but avoids Python loops.
|
||||
"""
|
||||
_xp = gpu_manager.get_array_module()
|
||||
|
||||
if len(rsrp_grids) < 2:
|
||||
if rsrp_grids:
|
||||
n_points = len(rsrp_grids[0])
|
||||
return (
|
||||
np.full(n_points, 50.0, dtype=np.float64),
|
||||
np.zeros(n_points, dtype=np.int32),
|
||||
np.array(rsrp_grids[0], dtype=np.float64),
|
||||
)
|
||||
return np.array([]), np.array([]), np.array([])
|
||||
|
||||
# Stack RSRP grids: shape (num_sites, num_points)
|
||||
rsrp_stack = _xp.stack([_xp.asarray(g, dtype=_xp.float64) for g in rsrp_grids], axis=0)
|
||||
num_sites, num_points = rsrp_stack.shape
|
||||
|
||||
# Convert to linear power (mW)
|
||||
rsrp_linear = _xp.power(10.0, rsrp_stack / 10.0)
|
||||
|
||||
# Best server per point
|
||||
best_server_idx = _xp.argmax(rsrp_stack, axis=0)
|
||||
best_rsrp = _xp.take_along_axis(rsrp_stack, best_server_idx[_xp.newaxis, :], axis=0)[0]
|
||||
best_rsrp_linear = _xp.take_along_axis(rsrp_linear, best_server_idx[_xp.newaxis, :], axis=0)[0]
|
||||
|
||||
# Create frequency match matrix: (num_sites, num_sites)
|
||||
freq_array = _xp.asarray(frequencies, dtype=_xp.float64)
|
||||
freq_match = freq_array[:, _xp.newaxis] == freq_array[_xp.newaxis, :]
|
||||
|
||||
# Total power from all sites
|
||||
total_power = _xp.sum(rsrp_linear, axis=0)
|
||||
|
||||
# For simplified calculation (all sites same frequency):
|
||||
# Interference = total - serving
|
||||
interference_linear = total_power - best_rsrp_linear
|
||||
|
||||
# C/I ratio in dB
|
||||
epsilon = 1e-30
|
||||
ci_ratio = 10 * _xp.log10(best_rsrp_linear / (interference_linear + epsilon))
|
||||
|
||||
# Clip to reasonable range
|
||||
ci_ratio = _xp.clip(ci_ratio, -20, 50)
|
||||
|
||||
return (
|
||||
_to_cpu(ci_ratio),
|
||||
_to_cpu(best_server_idx).astype(np.int32),
|
||||
_to_cpu(best_rsrp),
|
||||
)
|
||||
|
||||
|
||||
# Singleton
|
||||
gpu_service = GPUService()
|
||||
|
||||
Reference in New Issue
Block a user