575 lines
21 KiB
Python
575 lines
21 KiB
Python
"""
|
|
GPU-accelerated computation service using CuPy.
|
|
Falls back to NumPy when CuPy/CUDA is not available.
|
|
|
|
Provides vectorized batch operations for coverage calculation:
|
|
- Haversine distance (site -> all grid points)
|
|
- Okumura-Hata path loss (all distances at once)
|
|
|
|
Usage:
|
|
from app.services.gpu_service import gpu_service, GPU_AVAILABLE
|
|
"""
|
|
|
|
import numpy as np
|
|
from typing import Dict, Any
|
|
|
|
from app.services.gpu_backend import gpu_manager
|
|
|
|
# Backward-compatible exports
|
|
GPU_AVAILABLE = gpu_manager.gpu_available
|
|
GPU_INFO: Dict[str, Any] | None = (
|
|
{
|
|
"name": gpu_manager._active_device.name,
|
|
"memory_mb": gpu_manager._active_device.memory_mb,
|
|
**gpu_manager._active_device.extra,
|
|
}
|
|
if gpu_manager.gpu_available and gpu_manager._active_device
|
|
else None
|
|
)
|
|
|
|
# Array module: cupy on GPU, numpy on CPU
|
|
xp = gpu_manager.get_array_module()
|
|
|
|
|
|
def _to_cpu(arr):
|
|
"""Transfer array to CPU numpy if on GPU."""
|
|
return gpu_manager.to_cpu(arr)
|
|
|
|
|
|
class GPUService:
|
|
"""GPU-accelerated batch operations for coverage calculation."""
|
|
|
|
@property
|
|
def available(self) -> bool:
|
|
return gpu_manager.gpu_available
|
|
|
|
def get_info(self) -> Dict[str, Any]:
|
|
"""Return GPU info dict for system endpoint."""
|
|
if not gpu_manager.gpu_available:
|
|
return {"available": False, "name": None, "memory_mb": None}
|
|
return {"available": True, **(GPU_INFO or {})}
|
|
|
|
def precompute_distances(
|
|
self,
|
|
grid_lats: np.ndarray,
|
|
grid_lons: np.ndarray,
|
|
site_lat: float,
|
|
site_lon: float,
|
|
) -> np.ndarray:
|
|
"""Vectorized haversine distance from site to all grid points.
|
|
|
|
Returns distances in meters as a CPU numpy array.
|
|
"""
|
|
_xp = gpu_manager.get_array_module()
|
|
lat1 = _xp.radians(_xp.asarray(grid_lats, dtype=_xp.float64))
|
|
lon1 = _xp.radians(_xp.asarray(grid_lons, dtype=_xp.float64))
|
|
lat2 = _xp.radians(_xp.float64(site_lat))
|
|
lon2 = _xp.radians(_xp.float64(site_lon))
|
|
|
|
dlat = lat2 - lat1
|
|
dlon = lon2 - lon1
|
|
|
|
a = _xp.sin(dlat / 2) ** 2 + _xp.cos(lat1) * _xp.cos(lat2) * _xp.sin(dlon / 2) ** 2
|
|
c = 2 * _xp.arcsin(_xp.sqrt(a))
|
|
|
|
distances = 6371000.0 * c
|
|
return _to_cpu(distances)
|
|
|
|
def precompute_path_loss(
|
|
self,
|
|
distances: np.ndarray,
|
|
frequency_mhz: float,
|
|
tx_height: float,
|
|
rx_height: float = 1.5,
|
|
environment: str = "urban",
|
|
) -> np.ndarray:
|
|
"""Vectorized path loss using the appropriate propagation model.
|
|
|
|
Selects model based on frequency (Phase 3.0 model selection), then
|
|
applies the correct formula in a single vectorized numpy pass.
|
|
|
|
Returns path loss in dB as a CPU numpy array.
|
|
"""
|
|
_xp = gpu_manager.get_array_module()
|
|
d_arr = _xp.asarray(distances, dtype=_xp.float64)
|
|
d_km = _xp.maximum(d_arr / 1000.0, 0.1)
|
|
|
|
freq = float(frequency_mhz)
|
|
h_tx = max(float(tx_height), 1.0)
|
|
h_rx = max(float(rx_height), 1.0)
|
|
|
|
log_f = _xp.log10(_xp.float64(freq))
|
|
log_hb = _xp.log10(_xp.float64(max(h_tx, 1.0)))
|
|
|
|
if freq > 2000:
|
|
# Free-Space Path Loss: FSPL = 20*log10(d_km) + 20*log10(f) + 32.45
|
|
L = 20.0 * _xp.log10(d_km) + 20.0 * log_f + 32.45
|
|
|
|
elif freq > 1500:
|
|
# COST-231 Hata: extends Okumura-Hata to 1500-2000 MHz
|
|
a_hm = (1.1 * log_f - 0.7) * h_rx - (1.56 * log_f - 0.8)
|
|
L = (46.3 + 33.9 * log_f - 13.82 * log_hb - a_hm
|
|
+ (44.9 - 6.55 * log_hb) * _xp.log10(d_km))
|
|
if environment == "urban":
|
|
L += 3.0 # Metropolitan center correction
|
|
|
|
elif freq >= 150:
|
|
# Okumura-Hata: 150-1500 MHz
|
|
if environment == "urban" and freq >= 400:
|
|
a_hm = 3.2 * (_xp.log10(11.75 * h_rx) ** 2) - 4.97
|
|
else:
|
|
a_hm = (1.1 * log_f - 0.7) * h_rx - (1.56 * log_f - 0.8)
|
|
|
|
L_urban = (69.55 + 26.16 * log_f - 13.82 * log_hb - a_hm
|
|
+ (44.9 - 6.55 * log_hb) * _xp.log10(d_km))
|
|
|
|
if environment == "suburban":
|
|
L = L_urban - 2 * (_xp.log10(freq / 28) ** 2) - 5.4
|
|
elif environment == "rural":
|
|
L = L_urban - 4.78 * (log_f ** 2) + 18.33 * log_f - 35.94
|
|
elif environment == "open":
|
|
L = L_urban - 4.78 * (log_f ** 2) + 18.33 * log_f - 40.94
|
|
else:
|
|
L = L_urban
|
|
|
|
else:
|
|
# Very low frequency — Longley-Rice simplified (area mode)
|
|
# Use FSPL as baseline with terrain roughness correction
|
|
L = 20.0 * _xp.log10(d_km) + 20.0 * log_f + 32.45 + 10.0
|
|
|
|
return _to_cpu(L)
|
|
|
|
def batch_terrain_los(
|
|
self,
|
|
site_lat: float,
|
|
site_lon: float,
|
|
site_height: float,
|
|
site_elevation: float,
|
|
grid_lats: np.ndarray,
|
|
grid_lons: np.ndarray,
|
|
grid_elevations: np.ndarray,
|
|
distances: np.ndarray,
|
|
frequency_mhz: float,
|
|
terrain_cache: dict,
|
|
num_samples: int = 30,
|
|
) -> tuple[np.ndarray, np.ndarray]:
|
|
"""Batch compute terrain LOS and diffraction loss for all grid points.
|
|
|
|
This is the key GPU optimization — instead of sampling terrain profiles
|
|
one point at a time, we sample ALL profiles in parallel using vectorized
|
|
operations.
|
|
|
|
Args:
|
|
site_lat, site_lon: Site coordinates
|
|
site_height: Antenna height above ground (meters)
|
|
site_elevation: Ground elevation at site (meters)
|
|
grid_lats, grid_lons: All grid point coordinates
|
|
grid_elevations: Ground elevation at each grid point
|
|
distances: Pre-computed distances from site to each point (meters)
|
|
frequency_mhz: Frequency for diffraction calculation
|
|
terrain_cache: Dict[tile_name -> numpy array] from terrain_service
|
|
num_samples: Number of samples per terrain profile
|
|
|
|
Returns:
|
|
(has_los, terrain_loss) - both shape (N,)
|
|
has_los: boolean array, True if clear line of sight
|
|
terrain_loss: diffraction loss in dB (0 if has_los)
|
|
"""
|
|
_xp = gpu_manager.get_array_module()
|
|
N = len(grid_lats)
|
|
|
|
if N == 0:
|
|
return np.array([], dtype=bool), np.array([], dtype=np.float64)
|
|
|
|
# Convert inputs to GPU arrays
|
|
g_lats = _xp.asarray(grid_lats, dtype=_xp.float64)
|
|
g_lons = _xp.asarray(grid_lons, dtype=_xp.float64)
|
|
g_elevs = _xp.asarray(grid_elevations, dtype=_xp.float64)
|
|
g_dists = _xp.asarray(distances, dtype=_xp.float64)
|
|
|
|
# Heights
|
|
tx_total = float(site_elevation + site_height)
|
|
rx_height = 1.5 # Receiver height above ground
|
|
|
|
# Earth curvature constants
|
|
EARTH_RADIUS = 6371000.0
|
|
K_FACTOR = 4.0 / 3.0
|
|
effective_radius = K_FACTOR * EARTH_RADIUS
|
|
|
|
# Sample terrain profiles for all points at once
|
|
# Create sample positions: shape (N, num_samples)
|
|
t = _xp.linspace(0, 1, num_samples, dtype=_xp.float64) # (S,)
|
|
t = t.reshape(1, -1) # (1, S)
|
|
|
|
# Interpolate lat/lon for all sample points
|
|
# sample_lats[i, j] = site_lat + t[j] * (grid_lats[i] - site_lat)
|
|
dlat = g_lats.reshape(-1, 1) - site_lat # (N, 1)
|
|
dlon = g_lons.reshape(-1, 1) - site_lon # (N, 1)
|
|
sample_lats = site_lat + t * dlat # (N, S)
|
|
sample_lons = site_lon + t * dlon # (N, S)
|
|
|
|
# Sample distances along path: shape (N, S)
|
|
sample_dists = t * g_dists.reshape(-1, 1) # (N, S)
|
|
|
|
# Get terrain elevations for all samples
|
|
# This is the tricky part - we need to look up from the tile cache
|
|
# For GPU efficiency, we'll do this on CPU then transfer
|
|
sample_lats_cpu = _to_cpu(sample_lats).flatten()
|
|
sample_lons_cpu = _to_cpu(sample_lons).flatten()
|
|
|
|
# Batch elevation lookup from cache
|
|
sample_elevs_cpu = self._batch_elevation_lookup(
|
|
sample_lats_cpu, sample_lons_cpu, terrain_cache
|
|
)
|
|
sample_elevs = _xp.asarray(sample_elevs_cpu, dtype=_xp.float64).reshape(N, num_samples)
|
|
|
|
# Compute LOS line height at each sample point
|
|
# Linear interpolation from tx to rx
|
|
rx_total = g_elevs + rx_height # (N,)
|
|
los_heights = tx_total + t * (rx_total.reshape(-1, 1) - tx_total) # (N, S)
|
|
|
|
# Earth curvature correction at each sample
|
|
total_dist = g_dists.reshape(-1, 1) # (N, 1)
|
|
d = sample_dists # (N, S)
|
|
curvature = (d * (total_dist - d)) / (2 * effective_radius) # (N, S)
|
|
los_heights_corrected = los_heights - curvature # (N, S)
|
|
|
|
# Clearance at each sample point
|
|
clearances = los_heights_corrected - sample_elevs # (N, S)
|
|
|
|
# Minimum clearance per profile
|
|
min_clearances = _xp.min(clearances, axis=1) # (N,)
|
|
|
|
# Has LOS if minimum clearance > 0
|
|
has_los = min_clearances > 0 # (N,)
|
|
|
|
# Diffraction loss for points without LOS
|
|
# Using simplified ITU-R P.526 formula
|
|
terrain_loss = _xp.zeros(N, dtype=_xp.float64)
|
|
|
|
# Only compute diffraction where blocked
|
|
blocked_mask = ~has_los
|
|
blocked_clearances = min_clearances[blocked_mask]
|
|
|
|
if _xp.any(blocked_mask):
|
|
# v = |clearance| / 10 (simplified Fresnel parameter)
|
|
v = _xp.abs(blocked_clearances) / 10.0
|
|
|
|
# Diffraction loss formula from ITU-R P.526
|
|
loss = _xp.where(
|
|
v <= 0,
|
|
_xp.zeros_like(v),
|
|
_xp.where(
|
|
v < 2.4,
|
|
6.02 + 9.11 * v + 1.65 * v ** 2,
|
|
12.95 + 20 * _xp.log10(v)
|
|
)
|
|
)
|
|
# Cap at reasonable max
|
|
loss = _xp.minimum(loss, 40.0)
|
|
terrain_loss[blocked_mask] = loss
|
|
|
|
return _to_cpu(has_los).astype(bool), _to_cpu(terrain_loss)
|
|
|
|
def _batch_elevation_lookup(
|
|
self,
|
|
lats: np.ndarray,
|
|
lons: np.ndarray,
|
|
terrain_cache: dict,
|
|
) -> np.ndarray:
|
|
"""Look up elevations from cached terrain tiles with bilinear interpolation.
|
|
|
|
Vectorized implementation: processes per-tile (1-4 tiles) instead of
|
|
per-point (thousands of points). Uses bilinear interpolation for
|
|
sub-meter accuracy (vs 15m error with nearest-neighbor at 30m resolution).
|
|
|
|
Args:
|
|
lats, lons: Flattened arrays of coordinates
|
|
terrain_cache: Dict mapping tile_name -> numpy array
|
|
|
|
Returns:
|
|
elevations: Same shape as input lats
|
|
"""
|
|
elevations = np.zeros(len(lats), dtype=np.float64)
|
|
|
|
# Vectorized tile identification
|
|
lat_ints = np.floor(lats).astype(int)
|
|
lon_ints = np.floor(lons).astype(int)
|
|
|
|
# Process per tile (usually 1-4 tiles, not per point)
|
|
unique_tiles = set(zip(lat_ints, lon_ints))
|
|
|
|
for lat_int, lon_int in unique_tiles:
|
|
lat_letter = 'N' if lat_int >= 0 else 'S'
|
|
lon_letter = 'E' if lon_int >= 0 else 'W'
|
|
tile_name = f"{lat_letter}{abs(lat_int):02d}{lon_letter}{abs(lon_int):03d}"
|
|
|
|
tile = terrain_cache.get(tile_name)
|
|
if tile is None:
|
|
continue
|
|
|
|
# Mask for points in this tile
|
|
mask = (lat_ints == lat_int) & (lon_ints == lon_int)
|
|
tile_lats = lats[mask]
|
|
tile_lons = lons[mask]
|
|
|
|
size = tile.shape[0]
|
|
|
|
# Vectorized bilinear interpolation
|
|
lat_frac = tile_lats - lat_int
|
|
lon_frac = tile_lons - lon_int
|
|
|
|
row_exact = (1.0 - lat_frac) * (size - 1)
|
|
col_exact = lon_frac * (size - 1)
|
|
|
|
r0 = np.clip(row_exact.astype(int), 0, size - 2)
|
|
c0 = np.clip(col_exact.astype(int), 0, size - 2)
|
|
r1 = r0 + 1
|
|
c1 = c0 + 1
|
|
|
|
dr = row_exact - r0
|
|
dc = col_exact - c0
|
|
|
|
# Get four corner values for all points at once
|
|
z00 = tile[r0, c0].astype(np.float64)
|
|
z01 = tile[r0, c1].astype(np.float64)
|
|
z10 = tile[r1, c0].astype(np.float64)
|
|
z11 = tile[r1, c1].astype(np.float64)
|
|
|
|
# Bilinear interpolation (vectorized)
|
|
result = (z00 * (1 - dr) * (1 - dc) +
|
|
z01 * (1 - dr) * dc +
|
|
z10 * dr * (1 - dc) +
|
|
z11 * dr * dc)
|
|
|
|
# Handle void values (-32768) - set to 0
|
|
void_mask = (z00 == -32768) | (z01 == -32768) | (z10 == -32768) | (z11 == -32768)
|
|
result[void_mask] = 0.0
|
|
|
|
elevations[mask] = result
|
|
|
|
return elevations
|
|
|
|
def batch_antenna_pattern(
|
|
self,
|
|
site_lat: float,
|
|
site_lon: float,
|
|
grid_lats: np.ndarray,
|
|
grid_lons: np.ndarray,
|
|
azimuth: float,
|
|
beamwidth: float,
|
|
) -> np.ndarray:
|
|
"""Batch compute antenna pattern loss for all grid points.
|
|
|
|
Returns antenna_loss in dB, shape (N,)
|
|
"""
|
|
_xp = gpu_manager.get_array_module()
|
|
N = len(grid_lats)
|
|
|
|
if N == 0 or azimuth is None or not beamwidth:
|
|
return np.zeros(N, dtype=np.float64)
|
|
|
|
# Convert to radians
|
|
lat1 = _xp.radians(_xp.float64(site_lat))
|
|
lon1 = _xp.radians(_xp.float64(site_lon))
|
|
lat2 = _xp.radians(_xp.asarray(grid_lats, dtype=_xp.float64))
|
|
lon2 = _xp.radians(_xp.asarray(grid_lons, dtype=_xp.float64))
|
|
|
|
# Calculate bearing from site to each point
|
|
dlon = lon2 - lon1
|
|
x = _xp.sin(dlon) * _xp.cos(lat2)
|
|
y = _xp.cos(lat1) * _xp.sin(lat2) - _xp.sin(lat1) * _xp.cos(lat2) * _xp.cos(dlon)
|
|
bearings = (_xp.degrees(_xp.arctan2(x, y)) + 360) % 360
|
|
|
|
# Angle difference from antenna azimuth
|
|
angle_diff = _xp.abs(bearings - azimuth)
|
|
angle_diff = _xp.where(angle_diff > 180, 360 - angle_diff, angle_diff)
|
|
|
|
# Antenna pattern loss (simplified sector pattern)
|
|
half_bw = beamwidth / 2
|
|
in_main = angle_diff <= half_bw
|
|
loss_main = 3 * (angle_diff / half_bw) ** 2
|
|
loss_side = 3 + 12 * ((angle_diff - half_bw) / half_bw) ** 2
|
|
loss_side = _xp.minimum(loss_side, 25.0)
|
|
|
|
antenna_loss = _xp.where(in_main, loss_main, loss_side)
|
|
return _to_cpu(antenna_loss)
|
|
|
|
def batch_final_rsrp(
|
|
self,
|
|
tx_power: float,
|
|
tx_gain: float,
|
|
path_loss: np.ndarray,
|
|
terrain_loss: np.ndarray,
|
|
antenna_loss: np.ndarray,
|
|
building_loss: np.ndarray,
|
|
vegetation_loss: np.ndarray,
|
|
rain_loss: np.ndarray,
|
|
indoor_loss: np.ndarray,
|
|
atmospheric_loss: np.ndarray,
|
|
reflection_gain: np.ndarray,
|
|
fading_margin: float = 0.0,
|
|
) -> np.ndarray:
|
|
"""Vectorized final RSRP calculation.
|
|
|
|
RSRP = tx_power + tx_gain - path_loss - terrain_loss - antenna_loss
|
|
- building_loss - vegetation_loss - rain_loss - indoor_loss
|
|
- atmospheric_loss + reflection_gain - fading_margin
|
|
|
|
Returns RSRP in dBm, shape (N,)
|
|
"""
|
|
_xp = gpu_manager.get_array_module()
|
|
|
|
rsrp = (
|
|
float(tx_power) + float(tx_gain)
|
|
- _xp.asarray(path_loss, dtype=_xp.float64)
|
|
- _xp.asarray(terrain_loss, dtype=_xp.float64)
|
|
- _xp.asarray(antenna_loss, dtype=_xp.float64)
|
|
- _xp.asarray(building_loss, dtype=_xp.float64)
|
|
- _xp.asarray(vegetation_loss, dtype=_xp.float64)
|
|
- _xp.asarray(rain_loss, dtype=_xp.float64)
|
|
- _xp.asarray(indoor_loss, dtype=_xp.float64)
|
|
- _xp.asarray(atmospheric_loss, dtype=_xp.float64)
|
|
+ _xp.asarray(reflection_gain, dtype=_xp.float64)
|
|
- float(fading_margin)
|
|
)
|
|
|
|
return _to_cpu(rsrp)
|
|
|
|
def calculate_interference(
|
|
self,
|
|
rsrp_grids: list,
|
|
frequencies: list,
|
|
) -> tuple:
|
|
"""Calculate C/I (carrier-to-interference) ratio for multi-site scenarios.
|
|
|
|
For each grid point:
|
|
- C = signal strength from strongest (serving) cell
|
|
- I = sum of signal strengths from all other co-frequency cells
|
|
- C/I = C(dBm) - 10*log10(sum of linear interference powers)
|
|
|
|
Args:
|
|
rsrp_grids: List of RSRP arrays, one per site, shape (N,) each
|
|
frequencies: List of frequencies (MHz) for each site
|
|
|
|
Returns:
|
|
(ci_ratio, best_server_idx, best_rsrp)
|
|
ci_ratio: C/I in dB, shape (N,)
|
|
best_server_idx: Index of serving cell per point, shape (N,)
|
|
best_rsrp: RSRP of serving cell per point, shape (N,)
|
|
"""
|
|
_xp = gpu_manager.get_array_module()
|
|
|
|
if len(rsrp_grids) < 2:
|
|
# Single site - no interference, return infinity C/I
|
|
if rsrp_grids:
|
|
n_points = len(rsrp_grids[0])
|
|
return (
|
|
np.full(n_points, 50.0, dtype=np.float64), # 50 dB = effectively no interference
|
|
np.zeros(n_points, dtype=np.int32),
|
|
np.array(rsrp_grids[0], dtype=np.float64),
|
|
)
|
|
return np.array([]), np.array([]), np.array([])
|
|
|
|
# Stack RSRP grids: shape (num_sites, num_points)
|
|
rsrp_stack = _xp.stack([_xp.asarray(g, dtype=_xp.float64) for g in rsrp_grids], axis=0)
|
|
num_sites, num_points = rsrp_stack.shape
|
|
|
|
# Convert to linear power (mW)
|
|
rsrp_linear = _xp.power(10.0, rsrp_stack / 10.0)
|
|
|
|
# Best server per point
|
|
best_server_idx = _xp.argmax(rsrp_stack, axis=0)
|
|
best_rsrp = _xp.take_along_axis(rsrp_stack, best_server_idx[_xp.newaxis, :], axis=0)[0]
|
|
best_rsrp_linear = _xp.take_along_axis(rsrp_linear, best_server_idx[_xp.newaxis, :], axis=0)[0]
|
|
|
|
# Group sites by frequency for co-channel interference
|
|
freq_array = _xp.asarray(frequencies, dtype=_xp.float64)
|
|
|
|
# Calculate interference only from co-frequency sites
|
|
interference_linear = _xp.zeros(num_points, dtype=_xp.float64)
|
|
|
|
for point_idx in range(num_points):
|
|
serving_site = int(_to_cpu(best_server_idx[point_idx]))
|
|
serving_freq = frequencies[serving_site]
|
|
|
|
# Sum power from all other sites on same frequency
|
|
for site_idx in range(num_sites):
|
|
if site_idx != serving_site and frequencies[site_idx] == serving_freq:
|
|
interference_linear[point_idx] += rsrp_linear[site_idx, point_idx]
|
|
|
|
# C/I ratio in dB
|
|
# Avoid log10(0) with small epsilon
|
|
epsilon = 1e-30
|
|
ci_ratio = 10 * _xp.log10(best_rsrp_linear / (interference_linear + epsilon))
|
|
|
|
# Clip to reasonable range (-20 to 50 dB)
|
|
ci_ratio = _xp.clip(ci_ratio, -20, 50)
|
|
|
|
return (
|
|
_to_cpu(ci_ratio),
|
|
_to_cpu(best_server_idx).astype(np.int32),
|
|
_to_cpu(best_rsrp),
|
|
)
|
|
|
|
def calculate_interference_vectorized(
|
|
self,
|
|
rsrp_grids: list,
|
|
frequencies: list,
|
|
) -> tuple:
|
|
"""Fully vectorized C/I calculation (faster for GPU).
|
|
|
|
Same as calculate_interference but avoids Python loops.
|
|
"""
|
|
_xp = gpu_manager.get_array_module()
|
|
|
|
if len(rsrp_grids) < 2:
|
|
if rsrp_grids:
|
|
n_points = len(rsrp_grids[0])
|
|
return (
|
|
np.full(n_points, 50.0, dtype=np.float64),
|
|
np.zeros(n_points, dtype=np.int32),
|
|
np.array(rsrp_grids[0], dtype=np.float64),
|
|
)
|
|
return np.array([]), np.array([]), np.array([])
|
|
|
|
# Stack RSRP grids: shape (num_sites, num_points)
|
|
rsrp_stack = _xp.stack([_xp.asarray(g, dtype=_xp.float64) for g in rsrp_grids], axis=0)
|
|
num_sites, num_points = rsrp_stack.shape
|
|
|
|
# Convert to linear power (mW)
|
|
rsrp_linear = _xp.power(10.0, rsrp_stack / 10.0)
|
|
|
|
# Best server per point
|
|
best_server_idx = _xp.argmax(rsrp_stack, axis=0)
|
|
best_rsrp = _xp.take_along_axis(rsrp_stack, best_server_idx[_xp.newaxis, :], axis=0)[0]
|
|
best_rsrp_linear = _xp.take_along_axis(rsrp_linear, best_server_idx[_xp.newaxis, :], axis=0)[0]
|
|
|
|
# Create frequency match matrix: (num_sites, num_sites)
|
|
freq_array = _xp.asarray(frequencies, dtype=_xp.float64)
|
|
freq_match = freq_array[:, _xp.newaxis] == freq_array[_xp.newaxis, :]
|
|
|
|
# Total power from all sites
|
|
total_power = _xp.sum(rsrp_linear, axis=0)
|
|
|
|
# For simplified calculation (all sites same frequency):
|
|
# Interference = total - serving
|
|
interference_linear = total_power - best_rsrp_linear
|
|
|
|
# C/I ratio in dB
|
|
epsilon = 1e-30
|
|
ci_ratio = 10 * _xp.log10(best_rsrp_linear / (interference_linear + epsilon))
|
|
|
|
# Clip to reasonable range
|
|
ci_ratio = _xp.clip(ci_ratio, -20, 50)
|
|
|
|
return (
|
|
_to_cpu(ci_ratio),
|
|
_to_cpu(best_server_idx).astype(np.int32),
|
|
_to_cpu(best_rsrp),
|
|
)
|
|
|
|
|
|
# Singleton
|
|
gpu_service = GPUService()
|