145 lines
4.7 KiB
Python
145 lines
4.7 KiB
Python
"""
|
|
GPU-accelerated computation service using CuPy.
|
|
Falls back to NumPy when CuPy/CUDA is not available.
|
|
|
|
Provides vectorized batch operations for coverage calculation:
|
|
- Haversine distance (site -> all grid points)
|
|
- Okumura-Hata path loss (all distances at once)
|
|
|
|
Usage:
|
|
from app.services.gpu_service import gpu_service, GPU_AVAILABLE
|
|
"""
|
|
|
|
import numpy as np
|
|
from typing import Dict, Any
|
|
|
|
from app.services.gpu_backend import gpu_manager
|
|
|
|
# Backward-compatible exports
|
|
GPU_AVAILABLE = gpu_manager.gpu_available
|
|
GPU_INFO: Dict[str, Any] | None = (
|
|
{
|
|
"name": gpu_manager._active_device.name,
|
|
"memory_mb": gpu_manager._active_device.memory_mb,
|
|
**gpu_manager._active_device.extra,
|
|
}
|
|
if gpu_manager.gpu_available and gpu_manager._active_device
|
|
else None
|
|
)
|
|
|
|
# Array module: cupy on GPU, numpy on CPU
|
|
xp = gpu_manager.get_array_module()
|
|
|
|
|
|
def _to_cpu(arr):
|
|
"""Transfer array to CPU numpy if on GPU."""
|
|
return gpu_manager.to_cpu(arr)
|
|
|
|
|
|
class GPUService:
|
|
"""GPU-accelerated batch operations for coverage calculation."""
|
|
|
|
@property
|
|
def available(self) -> bool:
|
|
return gpu_manager.gpu_available
|
|
|
|
def get_info(self) -> Dict[str, Any]:
|
|
"""Return GPU info dict for system endpoint."""
|
|
if not gpu_manager.gpu_available:
|
|
return {"available": False, "name": None, "memory_mb": None}
|
|
return {"available": True, **(GPU_INFO or {})}
|
|
|
|
def precompute_distances(
|
|
self,
|
|
grid_lats: np.ndarray,
|
|
grid_lons: np.ndarray,
|
|
site_lat: float,
|
|
site_lon: float,
|
|
) -> np.ndarray:
|
|
"""Vectorized haversine distance from site to all grid points.
|
|
|
|
Returns distances in meters as a CPU numpy array.
|
|
"""
|
|
_xp = gpu_manager.get_array_module()
|
|
lat1 = _xp.radians(_xp.asarray(grid_lats, dtype=_xp.float64))
|
|
lon1 = _xp.radians(_xp.asarray(grid_lons, dtype=_xp.float64))
|
|
lat2 = _xp.radians(_xp.float64(site_lat))
|
|
lon2 = _xp.radians(_xp.float64(site_lon))
|
|
|
|
dlat = lat2 - lat1
|
|
dlon = lon2 - lon1
|
|
|
|
a = _xp.sin(dlat / 2) ** 2 + _xp.cos(lat1) * _xp.cos(lat2) * _xp.sin(dlon / 2) ** 2
|
|
c = 2 * _xp.arcsin(_xp.sqrt(a))
|
|
|
|
distances = 6371000.0 * c
|
|
return _to_cpu(distances)
|
|
|
|
def precompute_path_loss(
|
|
self,
|
|
distances: np.ndarray,
|
|
frequency_mhz: float,
|
|
tx_height: float,
|
|
rx_height: float = 1.5,
|
|
environment: str = "urban",
|
|
) -> np.ndarray:
|
|
"""Vectorized path loss using the appropriate propagation model.
|
|
|
|
Selects model based on frequency (Phase 3.0 model selection), then
|
|
applies the correct formula in a single vectorized numpy pass.
|
|
|
|
Returns path loss in dB as a CPU numpy array.
|
|
"""
|
|
_xp = gpu_manager.get_array_module()
|
|
d_arr = _xp.asarray(distances, dtype=_xp.float64)
|
|
d_km = _xp.maximum(d_arr / 1000.0, 0.1)
|
|
|
|
freq = float(frequency_mhz)
|
|
h_tx = max(float(tx_height), 1.0)
|
|
h_rx = max(float(rx_height), 1.0)
|
|
|
|
log_f = _xp.log10(_xp.float64(freq))
|
|
log_hb = _xp.log10(_xp.float64(max(h_tx, 1.0)))
|
|
|
|
if freq > 2000:
|
|
# Free-Space Path Loss: FSPL = 20*log10(d_km) + 20*log10(f) + 32.45
|
|
L = 20.0 * _xp.log10(d_km) + 20.0 * log_f + 32.45
|
|
|
|
elif freq > 1500:
|
|
# COST-231 Hata: extends Okumura-Hata to 1500-2000 MHz
|
|
a_hm = (1.1 * log_f - 0.7) * h_rx - (1.56 * log_f - 0.8)
|
|
L = (46.3 + 33.9 * log_f - 13.82 * log_hb - a_hm
|
|
+ (44.9 - 6.55 * log_hb) * _xp.log10(d_km))
|
|
if environment == "urban":
|
|
L += 3.0 # Metropolitan center correction
|
|
|
|
elif freq >= 150:
|
|
# Okumura-Hata: 150-1500 MHz
|
|
if environment == "urban" and freq >= 400:
|
|
a_hm = 3.2 * (_xp.log10(11.75 * h_rx) ** 2) - 4.97
|
|
else:
|
|
a_hm = (1.1 * log_f - 0.7) * h_rx - (1.56 * log_f - 0.8)
|
|
|
|
L_urban = (69.55 + 26.16 * log_f - 13.82 * log_hb - a_hm
|
|
+ (44.9 - 6.55 * log_hb) * _xp.log10(d_km))
|
|
|
|
if environment == "suburban":
|
|
L = L_urban - 2 * (_xp.log10(freq / 28) ** 2) - 5.4
|
|
elif environment == "rural":
|
|
L = L_urban - 4.78 * (log_f ** 2) + 18.33 * log_f - 35.94
|
|
elif environment == "open":
|
|
L = L_urban - 4.78 * (log_f ** 2) + 18.33 * log_f - 40.94
|
|
else:
|
|
L = L_urban
|
|
|
|
else:
|
|
# Very low frequency — Longley-Rice simplified (area mode)
|
|
# Use FSPL as baseline with terrain roughness correction
|
|
L = 20.0 * _xp.log10(d_km) + 20.0 * log_f + 32.45 + 10.0
|
|
|
|
return _to_cpu(L)
|
|
|
|
|
|
# Singleton
|
|
gpu_service = GPUService()
|