Files
rfcp/backend/app/services/coverage_service.py

1552 lines
61 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import gc
import math
import os
import sys
import time
import threading
import numpy as np
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import List, Optional, Tuple, Callable
_coverage_log_file = None
def _clog(msg: str):
"""Coverage debug log — always flushed, with timestamp and thread name.
Writes to stdout, stderr, AND a file so output is always available."""
global _coverage_log_file
ts = time.strftime('%H:%M:%S')
thr = threading.current_thread().name
line = f"[COVERAGE {ts}] [{thr}] {msg}"
print(line, flush=True)
# Backup: also write to stderr in case stdout is broken
try:
sys.stderr.write(line + '\n')
sys.stderr.flush()
except Exception:
pass
# Backup: also write to a file
try:
if _coverage_log_file is None:
log_dir = os.environ.get('RFCP_DATA_PATH', './data')
os.makedirs(log_dir, exist_ok=True)
log_path = os.path.join(log_dir, 'coverage-debug.log')
_coverage_log_file = open(log_path, 'a')
_coverage_log_file.write(f"\n{'='*60}\n")
_coverage_log_file.write(f"[COVERAGE {ts}] Log started\n")
_coverage_log_file.flush()
_coverage_log_file.write(line + '\n')
_coverage_log_file.flush()
except Exception:
pass
from pydantic import BaseModel
from app.services.terrain_service import terrain_service, TerrainService
from app.services.los_service import los_service
from app.services.buildings_service import buildings_service, Building
from app.services.materials_service import materials_service
from app.services.dominant_path_service import (
dominant_path_service, find_dominant_paths_vectorized,
get_lod_level, LODLevel, SIMPLIFIED_MAX_BUILDINGS,
_filter_buildings_by_distance,
)
from app.services.street_canyon_service import street_canyon_service, Street
from app.services.reflection_service import reflection_service
from app.services.spatial_index import get_spatial_index, SpatialIndex
from app.services.water_service import water_service, WaterBody
from app.services.vegetation_service import vegetation_service, VegetationArea
from app.services.weather_service import weather_service
from app.services.indoor_service import indoor_service
from app.services.atmospheric_service import atmospheric_service
from app.services.parallel_coverage_service import (
calculate_coverage_parallel, get_cpu_count, get_parallel_backend,
CancellationToken,
)
# NOTE: gpu_manager and gpu_service are imported INSIDE functions that need them,
# NOT at module level. This prevents worker processes from initializing CuPy/CUDA
# which causes cudaErrorInsufficientDriver errors in child processes.
# ── New propagation models (Phase 3.0) ──
from app.propagation.base import PropagationModel, PropagationInput, PropagationOutput
from app.propagation.free_space import FreeSpaceModel
from app.propagation.okumura_hata import OkumuraHataModel
from app.propagation.cost231_hata import Cost231HataModel
from app.propagation.cost231_wi import Cost231WIModel
from app.propagation.itu_r_p1546 import ITUR_P1546Model
from app.propagation.longley_rice import LongleyRiceModel
from app.propagation.itu_r_p526 import KnifeEdgeDiffractionModel
# Pre-instantiate models (stateless, thread-safe)
_PROPAGATION_MODELS = {
'free_space': FreeSpaceModel(),
'okumura_hata': OkumuraHataModel(),
'cost231_hata': Cost231HataModel(),
'cost231_wi': Cost231WIModel(),
'itu_r_p1546': ITUR_P1546Model(),
'longley_rice': LongleyRiceModel(),
}
_DIFFRACTION_MODEL = KnifeEdgeDiffractionModel()
def select_propagation_model(frequency_mhz: float, environment: str = "urban") -> PropagationModel:
"""Select the best propagation model for a given frequency and environment.
Model selection logic:
- < 150 MHz: Longley-Rice (ITM, designed for VHF)
- 150-520 MHz: ITU-R P.1546 (urban) / Longley-Rice (rural)
- 520-1500 MHz: Okumura-Hata
- 1500-2000 MHz: COST-231 Hata
- > 2000 MHz: Free-Space Path Loss
"""
if frequency_mhz < 150:
return _PROPAGATION_MODELS['longley_rice']
elif frequency_mhz <= 520:
if environment in ('rural', 'open'):
return _PROPAGATION_MODELS['longley_rice']
return _PROPAGATION_MODELS['itu_r_p1546']
elif frequency_mhz <= 1500:
return _PROPAGATION_MODELS['okumura_hata']
elif frequency_mhz <= 2000:
return _PROPAGATION_MODELS['cost231_hata']
else:
return _PROPAGATION_MODELS['free_space']
# ── OSM data filtering ──
# OSM fetches use 1-degree grid cells — much larger than the coverage radius.
# Passing all buildings to ProcessPool workers causes MemoryError (pickle copy
# per worker). Filter to coverage bbox and cap count for safety.
MAX_BUILDINGS_FOR_WORKERS = 15000
def _filter_buildings_to_bbox(
buildings: list,
min_lat: float, min_lon: float,
max_lat: float, max_lon: float,
site_lat: float, site_lon: float,
log_fn=None,
max_buildings: int = MAX_BUILDINGS_FOR_WORKERS,
) -> list:
"""Filter buildings to coverage bbox and cap at max_buildings.
Returns buildings sorted by distance to site (nearest first) so the
cap preserves buildings most likely to affect coverage.
"""
if not buildings or len(buildings) <= max_buildings:
return buildings
original = len(buildings)
# Fast bbox filter: keep buildings with any vertex inside the bbox
# Use a small buffer (~500m ≈ 0.005°) for LOS checks near edges
buf = 0.005
filtered = []
for b in buildings:
for lon_pt, lat_pt in b.geometry:
if (min_lat - buf) <= lat_pt <= (max_lat + buf) and \
(min_lon - buf) <= lon_pt <= (max_lon + buf):
filtered.append(b)
break
if log_fn:
log_fn(f"Building bbox filter: {original} -> {len(filtered)}")
# If still too many, sort by centroid distance and cap
if len(filtered) > max_buildings:
def _centroid_dist(b):
lats = [p[1] for p in b.geometry]
lons = [p[0] for p in b.geometry]
clat = sum(lats) / len(lats)
clon = sum(lons) / len(lons)
return (clat - site_lat) ** 2 + (clon - site_lon) ** 2
filtered.sort(key=_centroid_dist)
filtered = filtered[:max_buildings]
if log_fn:
log_fn(f"Building distance cap: -> {len(filtered)} (nearest to site)")
return filtered
def _filter_osm_list_to_bbox(items: list, min_lat: float, min_lon: float,
max_lat: float, max_lon: float,
max_count: int = 20000) -> list:
"""Filter OSM items (streets/water/vegetation) to coverage bbox.
Items must have a .geometry attribute (list of [lon, lat] pairs) or
lat/lon attributes. Returns at most max_count items.
"""
if not items or len(items) <= max_count:
return items
buf = 0.005
filtered = []
for item in items:
geom = getattr(item, 'geometry', None) or getattr(item, 'points', None)
if geom:
for pt in geom:
if isinstance(pt, (list, tuple)) and len(pt) >= 2:
lon_pt, lat_pt = pt[0], pt[1]
elif hasattr(pt, 'lat'):
lat_pt, lon_pt = pt.lat, pt.lon
else:
continue
if (min_lat - buf) <= lat_pt <= (max_lat + buf) and \
(min_lon - buf) <= lon_pt <= (max_lon + buf):
filtered.append(item)
break
else:
# No geometry — keep it
filtered.append(item)
return filtered[:max_count]
class CoveragePoint(BaseModel):
lat: float
lon: float
rsrp: float # dBm
distance: float # meters from site
has_los: bool
terrain_loss: float # dB
building_loss: float # dB
reflection_gain: float = 0.0 # dB
vegetation_loss: float = 0.0 # dB
rain_loss: float = 0.0 # dB
indoor_loss: float = 0.0 # dB
atmospheric_loss: float = 0.0 # dB
class CoverageSettings(BaseModel):
radius: float = 10000 # meters
resolution: float = 200 # meters
min_signal: float = -120 # dBm threshold
# Environment type for propagation model selection
environment: str = "urban" # urban, suburban, rural, open
# Layer toggles
use_terrain: bool = True
use_buildings: bool = True
use_materials: bool = True
use_dominant_path: bool = False
use_street_canyon: bool = False
use_reflections: bool = False
use_water_reflection: bool = False
use_vegetation: bool = False
# Vegetation season
season: str = "summer"
# Weather
rain_rate: float = 0.0 # mm/h (0=none, 5=light, 25=heavy)
# Indoor
indoor_loss_type: str = "none" # none, light, medium, heavy, vehicle
# Atmospheric
use_atmospheric: bool = False
temperature_c: float = 15.0
humidity_percent: float = 50.0
# Fading margin (dB) — additional safety loss subtracted from RSRP
fading_margin: float = 0.0
# Preset
preset: Optional[str] = None # fast, standard, detailed, full
# Propagation model presets
PRESETS = {
"fast": {
"use_terrain": True,
"use_buildings": False,
"use_materials": False,
"use_dominant_path": False,
"use_street_canyon": False,
"use_reflections": False,
"use_water_reflection": False,
"use_vegetation": False,
},
"standard": {
"use_terrain": True,
"use_buildings": True,
"use_materials": True,
"use_dominant_path": False,
"use_street_canyon": False,
"use_reflections": False,
"use_water_reflection": False,
"use_vegetation": False,
},
"detailed": {
"use_terrain": True,
"use_buildings": True,
"use_materials": True,
"use_dominant_path": True,
"use_street_canyon": False,
"use_reflections": False,
"use_water_reflection": False,
"use_vegetation": True,
},
"full": {
"use_terrain": True,
"use_buildings": True,
"use_materials": True,
"use_dominant_path": True,
"use_street_canyon": True,
"use_reflections": True,
"use_water_reflection": True,
"use_vegetation": True,
},
}
def apply_preset(settings: CoverageSettings) -> CoverageSettings:
"""Apply preset configuration to settings"""
if settings.preset and settings.preset in PRESETS:
for key, value in PRESETS[settings.preset].items():
setattr(settings, key, value)
return settings
class SiteParams(BaseModel):
lat: float
lon: float
height: float = 30 # antenna height meters
power: float = 43 # dBm (20W)
gain: float = 15 # dBi
frequency: float = 1800 # MHz
azimuth: Optional[float] = None # degrees, None = omni
beamwidth: Optional[float] = 65 # degrees
class CoverageService:
"""
RF Coverage calculation with terrain, buildings, materials,
dominant path, street canyon, reflections, water, and vegetation
"""
EARTH_RADIUS = 6371000
def __init__(self):
self.terrain = terrain_service
self.buildings = buildings_service
self.los = los_service
async def _fetch_osm_grid_aligned(
self,
min_lat: float, min_lon: float,
max_lat: float, max_lon: float,
settings: CoverageSettings
) -> dict:
"""
Fetch OSM data using 1-degree grid-aligned cells.
This ensures cache keys match the region download grid,
so pre-cached data is actually used.
"""
t0 = time.time()
lat_start = int(math.floor(min_lat))
lat_end = int(math.floor(max_lat))
lon_start = int(math.floor(min_lon))
lon_end = int(math.floor(max_lon))
cells = []
for lat_int in range(lat_start, lat_end + 1):
for lon_int in range(lon_start, lon_end + 1):
cells.append((float(lat_int), float(lon_int),
float(lat_int + 1), float(lon_int + 1)))
buildings: List[Building] = []
streets: List[Street] = []
water_bodies: List[WaterBody] = []
vegetation_areas: List[VegetationArea] = []
cache_stats = {"buildings": "skip", "streets": "skip",
"water": "skip", "vegetation": "skip"}
for cell_min_lat, cell_min_lon, cell_max_lat, cell_max_lon in cells:
cell_label = f"[{cell_min_lat:.0f},{cell_min_lon:.0f}]"
if settings.use_buildings:
t1 = time.time()
chunk = await self.buildings.fetch_buildings(
cell_min_lat, cell_min_lon, cell_max_lat, cell_max_lon
)
dt = time.time() - t1
src = "CACHE" if dt < 0.5 else "API"
buildings.extend(chunk)
cache_stats["buildings"] = src
_clog(f"Buildings {cell_label}: {len(chunk)} items ({src}, {dt:.1f}s)")
if settings.use_street_canyon:
t1 = time.time()
chunk = await street_canyon_service.fetch_streets(
cell_min_lat, cell_min_lon, cell_max_lat, cell_max_lon
)
dt = time.time() - t1
src = "CACHE" if dt < 0.5 else "API"
streets.extend(chunk)
cache_stats["streets"] = src
_clog(f"Streets {cell_label}: {len(chunk)} items ({src}, {dt:.1f}s)")
if settings.use_water_reflection:
t1 = time.time()
chunk = await water_service.fetch_water_bodies(
cell_min_lat, cell_min_lon, cell_max_lat, cell_max_lon
)
dt = time.time() - t1
src = "CACHE" if dt < 0.5 else "API"
water_bodies.extend(chunk)
cache_stats["water"] = src
_clog(f"Water {cell_label}: {len(chunk)} items ({src}, {dt:.1f}s)")
if settings.use_vegetation:
t1 = time.time()
chunk = await vegetation_service.fetch_vegetation(
cell_min_lat, cell_min_lon, cell_max_lat, cell_max_lon
)
dt = time.time() - t1
src = "CACHE" if dt < 0.5 else "API"
vegetation_areas.extend(chunk)
cache_stats["vegetation"] = src
_clog(f"Vegetation {cell_label}: {len(chunk)} items ({src}, {dt:.1f}s)")
total_fetch = time.time() - t0
_clog(f"OSM fetch total: {total_fetch:.1f}s "
f"({len(cells)} cells, "
f"{len(buildings)} bldgs, {len(streets)} streets, "
f"{len(water_bodies)} water, {len(vegetation_areas)} veg)")
_clog(f"Cache status: {cache_stats}")
return {
"buildings": buildings,
"streets": streets,
"water_bodies": water_bodies,
"vegetation_areas": vegetation_areas,
}
async def calculate_coverage(
self,
site: SiteParams,
settings: CoverageSettings,
cancel_token: Optional[CancellationToken] = None,
progress_fn: Optional[Callable[[str, float], None]] = None,
tile_callback: Optional[Callable] = None,
) -> List[CoveragePoint]:
"""
Calculate coverage grid for a single site
Returns list of CoveragePoint with RSRP values.
progress_fn(phase, pct): optional callback for progress updates (0.0-1.0).
tile_callback(points, tile_idx, total_tiles): optional callback for per-tile
partial results when using tiled processing (radius > 10km).
"""
calc_start = time.time()
# Apply preset if specified
settings = apply_preset(settings)
# ── Tiled processing for large radius ──
from app.services.tile_processor import TILE_THRESHOLD_M
if settings.radius > TILE_THRESHOLD_M:
return await self.calculate_coverage_tiled(
site, settings, cancel_token, progress_fn, tile_callback
)
points = []
# Generate grid
grid = self._generate_grid(
site.lat, site.lon,
settings.radius,
settings.resolution
)
_clog(f"Grid: {len(grid)} points, radius={settings.radius}m, res={settings.resolution}m")
# Calculate bbox for data fetching
lat_delta = settings.radius / 111000
lon_delta = settings.radius / (111000 * np.cos(np.radians(site.lat)))
min_lat = site.lat - lat_delta
max_lat = site.lat + lat_delta
min_lon = site.lon - lon_delta
max_lon = site.lon + lon_delta
_clog(f"Bbox: [{min_lat:.4f}, {min_lon:.4f}, {max_lat:.4f}, {max_lon:.4f}]")
# ━━━ PHASE 1: Fetch OSM data ━━━
_clog("━━━ PHASE 1: Fetching OSM data ━━━")
if progress_fn:
progress_fn("Fetching map data", 0.10)
await asyncio.sleep(0) # Yield so progress_sender can flush WS message
t_osm = time.time()
osm_data = await self._fetch_osm_grid_aligned(
min_lat, min_lon, max_lat, max_lon, settings
)
osm_time = time.time() - t_osm
buildings = osm_data["buildings"]
streets = osm_data["streets"]
water_bodies = osm_data["water_bodies"]
vegetation_areas = osm_data["vegetation_areas"]
_clog(f"━━━ PHASE 1 done: {osm_time:.1f}s ━━━")
# ── Filter OSM data to coverage area ──
# OSM cells are 1-degree wide, often far larger than the coverage radius.
# Passing 350k buildings to ProcessPool workers causes MemoryError (pickle).
buildings = _filter_buildings_to_bbox(
buildings, min_lat, min_lon, max_lat, max_lon,
site.lat, site.lon, _clog,
)
streets = _filter_osm_list_to_bbox(streets, min_lat, min_lon, max_lat, max_lon)
water_bodies = _filter_osm_list_to_bbox(water_bodies, min_lat, min_lon, max_lat, max_lon)
# Cap vegetation at 5000 — each area requires O(samples × areas)
# point-in-polygon checks per grid point. 20k+ areas with dominant
# path enabled causes OOM via worker memory explosion.
vegetation_areas = _filter_osm_list_to_bbox(
vegetation_areas, min_lat, min_lon, max_lat, max_lon,
max_count=5000,
)
_clog(f"Filtered OSM data: {len(buildings)} bldgs, {len(streets)} streets, "
f"{len(water_bodies)} water, {len(vegetation_areas)} veg")
# Build spatial index for buildings
spatial_idx: Optional[SpatialIndex] = None
if buildings:
cache_key = f"{min_lat:.3f},{min_lon:.3f},{max_lat:.3f},{max_lon:.3f}"
spatial_idx = get_spatial_index(cache_key, buildings)
# ━━━ PHASE 2: Pre-load terrain ━━━
_clog("━━━ PHASE 2: Pre-loading terrain ━━━")
if progress_fn:
progress_fn("Loading terrain", 0.25)
await asyncio.sleep(0)
t_terrain = time.time()
tile_names = await self.terrain.ensure_tiles_for_bbox(
min_lat, min_lon, max_lat, max_lon
)
for tn in tile_names:
self.terrain._load_tile(tn)
site_elevation = self.terrain.get_elevation_sync(site.lat, site.lon)
point_elevations = {}
for lat, lon in grid:
point_elevations[(lat, lon)] = self.terrain.get_elevation_sync(lat, lon)
terrain_time = time.time() - t_terrain
_clog(f"Tiles: {len(tile_names)}, site elev: {site_elevation:.0f}m, "
f"pre-computed {len(grid)} elevations")
_clog(f"━━━ PHASE 2 done: {terrain_time:.1f}s ━━━")
# ━━━ PHASE 2.5: Vectorized pre-computation (GPU/NumPy) ━━━
if progress_fn:
progress_fn("Pre-computing propagation", 0.35)
await asyncio.sleep(0)
from app.services.gpu_service import gpu_service
t_gpu = time.time()
# Import GPU modules here (main process only) to avoid CUDA context issues in workers
from app.services.gpu_backend import gpu_manager
xp = gpu_manager.get_array_module()
grid_lats = xp.array([lat for lat, lon in grid], dtype=xp.float64)
grid_lons = xp.array([lon for lat, lon in grid], dtype=xp.float64)
pre_distances = gpu_service.precompute_distances(
grid_lats, grid_lons, site.lat, site.lon
)
pre_path_loss = gpu_service.precompute_path_loss(
pre_distances, site.frequency, site.height,
environment=getattr(settings, 'environment', 'urban'),
)
gpu_time = time.time() - t_gpu
backend_name = "GPU (CUDA)" if gpu_manager.gpu_available else "CPU (NumPy)"
_clog(f"Precomputed {len(grid)} distances+path_loss on {backend_name} in {gpu_time:.2f}s")
# Build lookup dict for point loop
precomputed = {}
for i, (lat, lon) in enumerate(grid):
precomputed[(lat, lon)] = {
'distance': float(pre_distances[i]),
'path_loss': float(pre_path_loss[i]),
}
gpu_time = time.time() - t_gpu
env = getattr(settings, 'environment', 'urban')
selected_model = select_propagation_model(site.frequency, env)
_clog(f"━━━ PHASE 2.5: Vectorized pre-computation done: {gpu_time:.3f}s "
f"({len(grid)} points, model={selected_model.name}, freq={site.frequency}MHz, "
f"env={env}, backend={'GPU' if gpu_service.available else 'CPU/NumPy'}) ━━━")
# ━━━ PHASE 3: Point calculation ━━━
dominant_path_service._log_count = 0 # Reset diagnostic counter
t_points = time.time()
use_parallel = len(grid) > 100 and get_cpu_count() > 1
num_workers = get_cpu_count()
if progress_fn:
progress_fn("Calculating coverage", 0.40)
await asyncio.sleep(0)
if use_parallel:
backend = get_parallel_backend()
_clog(f"━━━ PHASE 3: Calculating {len(grid)} points "
f"(PARALLEL/{backend}, {num_workers} workers) ━━━")
try:
loop = asyncio.get_event_loop()
result_dicts, timing = await loop.run_in_executor(
None,
lambda: calculate_coverage_parallel(
grid, point_elevations,
site.model_dump(), settings.model_dump(),
self.terrain._tile_cache,
buildings, streets, water_bodies, vegetation_areas,
site_elevation, num_workers, _clog,
cancel_token=cancel_token,
precomputed=precomputed,
progress_fn=progress_fn,
),
)
# Convert dicts back to CoveragePoint objects
points = [CoveragePoint(**d) for d in result_dicts]
except Exception as e:
_clog(f"Parallel failed ({e}), falling back to sequential")
use_parallel = False
if not use_parallel:
_clog(f"━━━ PHASE 3: Calculating {len(grid)} points (sequential) ━━━")
loop = asyncio.get_event_loop()
points, timing = await loop.run_in_executor(
None,
lambda: self._run_point_loop(
grid, site, settings, buildings, streets,
spatial_idx, water_bodies, vegetation_areas,
site_elevation, point_elevations,
cancel_token=cancel_token,
precomputed=precomputed,
progress_fn=progress_fn,
),
)
if progress_fn:
progress_fn("Finalizing", 0.95)
await asyncio.sleep(0)
points_time = time.time() - t_points
total_time = time.time() - calc_start
_clog(f"━━━ PHASE 3 done: {points_time:.1f}s ━━━")
_clog("=== RESULTS ===")
_clog(f" Grid points: {len(grid)}")
_clog(f" Result points: {len(points)}")
_clog(f" OSM fetch: {osm_time:.1f}s")
_clog(f" Terrain pre-load:{terrain_time:.1f}s")
_clog(f" Point calc: {points_time:.1f}s "
f"({points_time/max(1,len(grid))*1000:.1f}ms/point)")
_clog(f" TOTAL: {total_time:.1f}s")
_clog(f" Mode: {'parallel (' + str(num_workers) + ' workers)' if use_parallel else 'sequential'}")
_clog(f" Tiles in memory: {len(self.terrain._tile_cache)}")
if any(isinstance(v, (int, float)) and v > 0.001 for v in timing.values()):
_clog("=== PER-STEP BREAKDOWN ===")
lod_keys = {"lod_none", "lod_simplified", "lod_full"}
for step, dt in timing.items():
if step in lod_keys:
continue # Print LOD stats separately
if isinstance(dt, (int, float)) and dt > 0.001:
_clog(f" {step:20s} {dt:.3f}s "
f"({dt/max(1,len(grid))*1000:.2f}ms/point)")
elif not isinstance(dt, (int, float)):
_clog(f" {step:20s} {dt}")
# LOD stats
lod_none = timing.get("lod_none", 0)
lod_simp = timing.get("lod_simplified", 0)
lod_full = timing.get("lod_full", 0)
lod_total = lod_none + lod_simp + lod_full
if lod_total > 0:
_clog(f"=== LOD BREAKDOWN ({lod_total} points with dominant_path) ===")
_clog(f" LOD_NONE (>3km) {lod_none:5d} points ({lod_none*100//lod_total}%) — skipped")
_clog(f" LOD_SIMPLIFIED {lod_simp:5d} points ({lod_simp*100//lod_total}%) — {SIMPLIFIED_MAX_BUILDINGS} buildings max")
_clog(f" LOD_FULL (<1.5km) {lod_full:5d} points ({lod_full*100//lod_total}%) — full calculation")
return points
async def calculate_multi_site_coverage(
self,
sites: List[SiteParams],
settings: CoverageSettings,
cancel_token: Optional[CancellationToken] = None,
progress_fn: Optional[Callable[[str, float], None]] = None,
tile_callback: Optional[Callable] = None,
) -> List[CoveragePoint]:
"""
Calculate combined coverage from multiple sites
Best server (strongest signal) wins at each point
progress_fn(phase, pct): optional callback for progress updates (0.0-1.0).
tile_callback: forwarded to calculate_coverage for progressive results.
"""
if not sites:
return []
# Apply preset once
settings = apply_preset(settings)
# Per-site progress tracking for averaged overall progress
num_sites = len(sites)
_site_progress = [0.0] * num_sites
def _make_site_progress(idx: int):
"""Create a progress_fn for one site that reports scaled overall progress."""
def _site_fn(phase: str, pct: float, _eta=None):
_site_progress[idx] = pct
if progress_fn:
overall = sum(_site_progress) / num_sites
progress_fn(f"Site {idx + 1}/{num_sites}: {phase}", overall)
return _site_fn
# Get all individual coverages
all_coverages = await asyncio.gather(*[
self.calculate_coverage(
site, settings, cancel_token,
progress_fn=_make_site_progress(i) if progress_fn else None,
tile_callback=tile_callback,
)
for i, site in enumerate(sites)
])
# Combine by best signal
point_map: dict[Tuple[float, float], CoveragePoint] = {}
for coverage in all_coverages:
for point in coverage:
key = (round(point.lat, 6), round(point.lon, 6))
if key not in point_map or point.rsrp > point_map[key].rsrp:
point_map[key] = point
return list(point_map.values())
async def calculate_coverage_tiled(
self,
site: SiteParams,
settings: CoverageSettings,
cancel_token: Optional[CancellationToken] = None,
progress_fn: Optional[Callable[[str, float], None]] = None,
tile_callback: Optional[Callable] = None,
) -> List[CoveragePoint]:
"""Tile-based coverage for large radius (>10km).
Splits the coverage area into 5km sub-tiles. Each tile loads its
own OSM data and terrain, processes its grid points, then frees
memory before moving to the next tile. This keeps peak RAM
bounded regardless of total coverage area.
tile_callback(points, tile_idx, total_tiles): async callback
invoked with partial results after each tile completes.
"""
from app.services.tile_processor import (
generate_tile_grid, partition_grid_to_tiles, get_adaptive_worker_count,
)
calc_start = time.time()
# NOTE: settings already has preset applied by calculate_coverage()
# Generate full adaptive grid (lightweight — just coordinate tuples)
grid = self._generate_grid(
site.lat, site.lon, settings.radius, settings.resolution,
)
_clog(f"Tiled mode: {len(grid)} total grid points, radius={settings.radius}m")
# Generate tiles and partition grid points
tiles = generate_tile_grid(site.lat, site.lon, settings.radius)
total_tiles = len(tiles)
tile_grids = partition_grid_to_tiles(grid, tiles)
_clog(f"Generated {total_tiles} tiles")
# Free full grid reference
del grid
# ── Pre-fetch buildings for inner zone (≤20km) ──
# This avoids re-reading the disk JSON cache (7-8s) per tile.
inner_radius_m = min(settings.radius, 20_000)
needs_osm = (settings.use_buildings
or getattr(settings, 'use_street_canyon', False)
or getattr(settings, 'use_water_reflection', False)
or getattr(settings, 'use_vegetation', False))
prefetched_buildings: List[Building] = []
prefetched_streets: list = []
prefetched_water: list = []
prefetched_vegetation: list = []
if needs_osm:
lat_delta = inner_radius_m / 111_320.0
lon_delta = inner_radius_m / (111_320.0 * max(math.cos(math.radians(site.lat)), 0.01))
inner_bbox = (
site.lat - lat_delta, site.lon - lon_delta,
site.lat + lat_delta, site.lon + lon_delta,
)
if progress_fn:
progress_fn("Pre-fetching map data", 0.02)
_clog(f"Pre-fetching OSM for inner zone ({inner_radius_m/1000:.0f}km)")
osm_prefetch = await self._fetch_osm_grid_aligned(
inner_bbox[0], inner_bbox[1], inner_bbox[2], inner_bbox[3],
settings,
)
prefetched_buildings = osm_prefetch.get("buildings", [])
prefetched_streets = osm_prefetch.get("streets", [])
prefetched_water = osm_prefetch.get("water_bodies", [])
prefetched_vegetation = osm_prefetch.get("vegetation_areas", [])
del osm_prefetch
_clog(f"Pre-fetched: {len(prefetched_buildings)} buildings, "
f"{len(prefetched_streets)} streets, "
f"{len(prefetched_water)} water, "
f"{len(prefetched_vegetation)} veg")
# Clear singleton memory cache — we hold our own reference
self.buildings._memory_cache.clear()
gc.collect()
site_elevation: Optional[float] = None
all_points: List[CoveragePoint] = []
# FSPL pre-check: compute minimum distance to each tile and estimate
# free-space signal. Skip tiles where even best-case FSPL < min_signal.
eirp_dbm = site.power + site.gain
min_signal = getattr(settings, 'min_signal', -130)
tiles_skipped_fspl = 0
for tile_idx, tile in enumerate(tiles):
if cancel_token and cancel_token.is_cancelled:
_clog("Tiled calculation cancelled")
break
tile_grid = tile_grids.get(tile.index, [])
if not tile_grid:
continue
tile_start = time.time()
min_lat, min_lon, max_lat, max_lon = tile.bbox
# Quick FSPL check: closest edge of tile to site
clamp_lat = max(min_lat, min(site.lat, max_lat))
clamp_lon = max(min_lon, min(site.lon, max_lon))
closest_dist = TerrainService.haversine_distance(
site.lat, site.lon, clamp_lat, clamp_lon,
)
if closest_dist > 500: # Skip check for tiles containing the site
fspl_db = 20 * math.log10(closest_dist) + 20 * math.log10(site.frequency * 1e6) - 147.55
best_rsrp = eirp_dbm - fspl_db
if best_rsrp < min_signal:
tiles_skipped_fspl += 1
continue
_clog(f"━━━ Tile {tile_idx + 1}/{total_tiles}: "
f"{len(tile_grid)} points ━━━")
# Per-tile progress mapped to overall progress range
def _tile_progress(phase: str, pct: float, _idx=tile_idx):
if progress_fn:
overall = (_idx + pct) / total_tiles
progress_fn(
f"Tile {_idx + 1}/{total_tiles}: {phase}", overall,
)
# ── 1. Filter pre-fetched OSM data for this tile ──
tile_center_lat = (min_lat + max_lat) / 2
tile_center_lon = (min_lon + max_lon) / 2
tile_dist_m = TerrainService.haversine_distance(
site.lat, site.lon, tile_center_lat, tile_center_lon,
)
skip_buildings = tile_dist_m > 20_000
_tile_progress("Filtering map data", 0.10)
await asyncio.sleep(0)
if skip_buildings:
buildings: list = []
streets: list = []
water_bodies: list = []
vegetation_areas: list = []
else:
# Fast in-memory filter from pre-fetched data (no disk I/O)
buildings = _filter_buildings_to_bbox(
prefetched_buildings, min_lat, min_lon, max_lat, max_lon,
site.lat, site.lon, _clog,
max_buildings=5000,
)
streets = _filter_osm_list_to_bbox(
prefetched_streets, min_lat, min_lon, max_lat, max_lon,
)
water_bodies = _filter_osm_list_to_bbox(
prefetched_water, min_lat, min_lon, max_lat, max_lon,
)
vegetation_areas = _filter_osm_list_to_bbox(
prefetched_vegetation, min_lat, min_lon, max_lat, max_lon,
max_count=5000,
)
spatial_idx: Optional[SpatialIndex] = None
if buildings:
cache_key = f"tile_{tile_idx}_{min_lat:.3f},{min_lon:.3f}"
spatial_idx = get_spatial_index(cache_key, buildings)
# ── 2. Pre-load terrain for this tile ──
_tile_progress("Loading terrain", 0.25)
await asyncio.sleep(0)
tile_names = await self.terrain.ensure_tiles_for_bbox(
min_lat, min_lon, max_lat, max_lon,
)
for tn in tile_names:
self.terrain._load_tile(tn)
if site_elevation is None:
site_elevation = self.terrain.get_elevation_sync(
site.lat, site.lon,
)
point_elevations = {}
for lat, lon in tile_grid:
point_elevations[(lat, lon)] = self.terrain.get_elevation_sync(
lat, lon,
)
# ── 3. Precompute distances / path loss ──
_tile_progress("Pre-computing propagation", 0.35)
await asyncio.sleep(0)
from app.services.gpu_service import gpu_service
from app.services.gpu_backend import gpu_manager
t_gpu = time.time()
xp = gpu_manager.get_array_module()
grid_lats = xp.array([lat for lat, _lon in tile_grid], dtype=xp.float64)
grid_lons = xp.array([_lon for _lat, _lon in tile_grid], dtype=xp.float64)
pre_distances = gpu_service.precompute_distances(
grid_lats, grid_lons, site.lat, site.lon,
)
pre_path_loss = gpu_service.precompute_path_loss(
pre_distances, site.frequency, site.height,
environment=getattr(settings, 'environment', 'urban'),
)
gpu_time = time.time() - t_gpu
backend_name = "GPU (CUDA)" if gpu_manager.gpu_available else "CPU (NumPy)"
_clog(f"Tile {tile_idx+1}: precomputed {len(tile_grid)} pts on {backend_name} in {gpu_time:.2f}s")
precomputed = {}
for i, (lat, lon) in enumerate(tile_grid):
precomputed[(lat, lon)] = {
'distance': float(pre_distances[i]),
'path_loss': float(pre_path_loss[i]),
}
# ── 4. Calculate points (parallel with adaptive workers) ──
_tile_progress("Calculating coverage", 0.40)
await asyncio.sleep(0)
num_workers = get_adaptive_worker_count(
settings.radius, get_cpu_count(),
)
use_parallel = len(tile_grid) > 100 and num_workers > 1
if use_parallel:
loop = asyncio.get_event_loop()
result_dicts, _timing = await loop.run_in_executor(
None,
lambda: calculate_coverage_parallel(
tile_grid, point_elevations,
site.model_dump(), settings.model_dump(),
self.terrain._tile_cache,
buildings, streets, water_bodies, vegetation_areas,
site_elevation, num_workers, _clog,
cancel_token=cancel_token,
precomputed=precomputed,
),
)
tile_points = [CoveragePoint(**d) for d in result_dicts]
else:
loop = asyncio.get_event_loop()
tile_points, _timing = await loop.run_in_executor(
None,
lambda: self._run_point_loop(
tile_grid, site, settings, buildings, streets,
spatial_idx, water_bodies, vegetation_areas,
site_elevation, point_elevations,
cancel_token=cancel_token,
precomputed=precomputed,
),
)
all_points.extend(tile_points)
# Send partial results via callback
if tile_callback and tile_points:
await tile_callback(tile_points, tile_idx, total_tiles)
tile_time = time.time() - tile_start
_clog(f"Tile {tile_idx + 1}/{total_tiles} done: "
f"{len(tile_points)} points in {tile_time:.1f}s")
# ── 5. Free per-tile memory ──
del buildings, streets, water_bodies, vegetation_areas
del spatial_idx, point_elevations, precomputed
del pre_distances, pre_path_loss, grid_lats, grid_lons
gc.collect()
# Free pre-fetched OSM data
del prefetched_buildings, prefetched_streets
del prefetched_water, prefetched_vegetation
gc.collect()
total_time = time.time() - calc_start
_clog(f"━━━ Tiled calculation complete: "
f"{len(all_points)} points in {total_time:.1f}s "
f"({tiles_skipped_fspl} tiles skipped by FSPL pre-check) ━━━")
if progress_fn:
progress_fn("Finalizing", 0.95)
await asyncio.sleep(0)
return all_points
# Adaptive resolution zone boundaries (meters)
_ADAPTIVE_ZONES = [
(0, 2000), # Inner: full user resolution
(2000, 5000), # Middle: at least 300m
(5000, float('inf')), # Outer: at least 500m
]
_ADAPTIVE_MIN_RES = [None, 300, 500] # Minimum resolution per zone
def _generate_grid(
self,
center_lat: float, center_lon: float,
radius: float, resolution: float
) -> List[Tuple[float, float]]:
"""Generate coverage grid with adaptive resolution.
Close to TX (<2km): user's chosen resolution (details matter).
Mid-range (2-5km): at least 300m resolution.
Far (>5km): at least 500m resolution.
For small radii or coarse base resolution, this degenerates to a
uniform grid (no zones exceed their minimum).
"""
cos_lat = np.cos(np.radians(center_lat))
seen = set()
points = []
for zone_idx, (zone_min_m, zone_max_m) in enumerate(self._ADAPTIVE_ZONES):
if zone_min_m >= radius:
break # No points in this zone
zone_max_m = min(zone_max_m, radius)
min_res = self._ADAPTIVE_MIN_RES[zone_idx]
zone_res = max(resolution, min_res) if min_res else resolution
lat_step = zone_res / 111000
lon_step = zone_res / (111000 * cos_lat)
# Grid bounds for this annular ring (with small overlap at boundaries)
lat_delta = zone_max_m / 111000
lon_delta = zone_max_m / (111000 * cos_lat)
lat = center_lat - lat_delta
while lat <= center_lat + lat_delta:
lon = center_lon - lon_delta
while lon <= center_lon + lon_delta:
dist = TerrainService.haversine_distance(
center_lat, center_lon, lat, lon
)
if zone_min_m <= dist <= zone_max_m:
# Round to avoid floating-point duplicates at zone boundaries
key = (round(lat, 7), round(lon, 7))
if key not in seen:
seen.add(key)
points.append(key)
lon += lon_step
lat += lat_step
_clog(f"Adaptive grid: {len(points)} points "
f"(radius={radius:.0f}m, base_res={resolution:.0f}m)")
return points
def _run_point_loop(
self, grid, site, settings, buildings, streets,
spatial_idx, water_bodies, vegetation_areas,
site_elevation, point_elevations,
cancel_token=None, precomputed=None,
progress_fn=None,
):
"""Sync point loop - runs in ThreadPoolExecutor, bypasses event loop."""
points = []
timing = {"los": 0.0, "buildings": 0.0, "antenna": 0.0,
"dominant_path": 0.0, "street_canyon": 0.0,
"reflection": 0.0, "vegetation": 0.0,
"lod_none": 0, "lod_simplified": 0, "lod_full": 0}
total = len(grid)
log_interval = max(1, total // 20)
for i, (lat, lon) in enumerate(grid):
if cancel_token and cancel_token.is_cancelled:
_clog(f"Cancelled at {i}/{total}")
break
if i % log_interval == 0:
_clog(f"Progress: {i}/{total} ({i*100//total}%)")
if progress_fn:
progress_fn("Calculating coverage", 0.40 + 0.55 * (i / total))
pre = precomputed.get((lat, lon)) if precomputed else None
point = self._calculate_point_sync(
site, lat, lon, settings, buildings, streets,
spatial_idx, water_bodies, vegetation_areas,
site_elevation, point_elevations.get((lat, lon), 0.0),
timing,
precomputed_distance=pre.get('distance') if pre else None,
precomputed_path_loss=pre.get('path_loss') if pre else None,
)
if point.rsrp >= settings.min_signal:
points.append(point)
_clog(f"Progress: {total}/{total} (100%)")
return points, timing
def _calculate_point_sync(
self,
site: SiteParams,
lat: float, lon: float,
settings: CoverageSettings,
buildings: List[Building],
streets: List[Street],
spatial_idx: Optional[SpatialIndex],
water_bodies: List[WaterBody],
vegetation_areas: List[VegetationArea],
site_elevation: float,
point_elevation: float,
timing: dict,
precomputed_distance: Optional[float] = None,
precomputed_path_loss: Optional[float] = None,
) -> CoveragePoint:
"""Fully synchronous point calculation. All terrain tiles must be pre-loaded."""
# Distance (use precomputed if available)
if precomputed_distance is not None:
distance = precomputed_distance
else:
distance = TerrainService.haversine_distance(site.lat, site.lon, lat, lon)
if distance < 1:
distance = 1
# Base path loss (use precomputed if available, else use new model)
if precomputed_path_loss is not None:
path_loss = precomputed_path_loss
else:
env = getattr(settings, 'environment', 'urban')
model = select_propagation_model(site.frequency, env)
prop_input = PropagationInput(
frequency_mhz=site.frequency,
distance_m=distance,
tx_height_m=site.height,
rx_height_m=1.5,
environment=env,
)
path_loss = model.calculate(prop_input).path_loss_db
# Antenna pattern
antenna_loss = 0.0
if site.azimuth is not None and site.beamwidth:
t0 = time.time()
antenna_loss = self._antenna_pattern_loss(
site.lat, site.lon, lat, lon, site.azimuth, site.beamwidth
)
timing["antenna"] += time.time() - t0
# Terrain LOS (sync)
terrain_loss = 0.0
has_los = True
if settings.use_terrain:
t0 = time.time()
los_result = self.los.check_line_of_sight_sync(
site.lat, site.lon, site.height, lat, lon, 1.5
)
has_los = los_result["has_los"]
if not has_los:
terrain_loss = self._diffraction_loss(
los_result["clearance"], site.frequency
)
timing["los"] += time.time() - t0
# Building loss (spatial index)
building_loss = 0.0
t0 = time.time()
nearby_buildings = (
spatial_idx.query_line(site.lat, site.lon, lat, lon)
if spatial_idx else buildings
)
# Cap building count for the intersection loop — query_line can
# return hundreds of buildings; sort by proximity so the first
# intersecting building is found faster (loop breaks early).
if len(nearby_buildings) > 50:
nearby_buildings = _filter_buildings_by_distance(
nearby_buildings,
(site.lat, site.lon), (lat, lon),
max_count=50, max_distance=500,
)
if settings.use_buildings and nearby_buildings:
site_total_h = site.height + site_elevation
point_total_h = 1.5 + point_elevation
if settings.use_materials:
for building in nearby_buildings:
intersection = self.buildings.line_intersects_building(
site.lat, site.lon, site_total_h,
lat, lon, point_total_h, building
)
if intersection is not None:
material = materials_service.detect_material(building.tags)
building_loss += materials_service.get_penetration_loss(
material, site.frequency
)
has_los = False
break
else:
for building in nearby_buildings:
intersection = self.buildings.line_intersects_building(
site.lat, site.lon, site_total_h,
lat, lon, point_total_h, building
)
if intersection is not None:
building_loss += 20.0
has_los = False
break
timing["buildings"] += time.time() - t0
# Dominant path (vectorized NumPy) with LOD optimization
# Only enter when there are actual buildings (spatial_idx with data OR non-empty list)
has_building_data = nearby_buildings or (spatial_idx is not None and spatial_idx._grid)
if settings.use_dominant_path and has_building_data:
lod = get_lod_level(distance)
# LOD_NONE: skip dominant path entirely for distant points (>3km)
if lod == LODLevel.NONE:
timing["lod_none"] += 1
else:
t0 = time.time()
try:
# LOD_SIMPLIFIED: limit buildings for mid-range points (1.5-3km)
dp_buildings = nearby_buildings
if lod == LODLevel.SIMPLIFIED:
timing["lod_simplified"] += 1
if len(nearby_buildings) > SIMPLIFIED_MAX_BUILDINGS:
dp_buildings = nearby_buildings[:SIMPLIFIED_MAX_BUILDINGS]
else:
timing["lod_full"] += 1
# nearby_buildings already filtered via spatial index —
# don't pass spatial_idx to avoid redundant query_line()
# and query_point() inside the vectorized function.
dominant = find_dominant_paths_vectorized(
site.lat, site.lon, site.height,
lat, lon, 1.5,
site.frequency, dp_buildings,
)
if dominant['path_type'] == 'direct':
has_los = True
building_loss = 0.0
elif dominant['path_type'] == 'reflection':
building_loss = max(0.0, building_loss - (10.0 - dominant['total_loss']))
has_los = False
elif dominant['path_type'] == 'diffraction':
if dominant['total_loss'] > building_loss:
building_loss = dominant['total_loss']
has_los = False
except Exception:
pass # Skip dominant path on error — use base model
timing["dominant_path"] += time.time() - t0
# Street canyon (sync)
if settings.use_street_canyon and streets:
t0 = time.time()
try:
canyon_loss, _street_path = street_canyon_service.calculate_street_canyon_loss_sync(
site.lat, site.lon, site.height,
lat, lon, 1.5,
site.frequency, streets
)
# Only use street canyon if it's a finite improvement
if math.isfinite(canyon_loss) and canyon_loss < (path_loss + terrain_loss + building_loss):
path_loss = canyon_loss
terrain_loss = 0
building_loss = 0
except Exception:
pass # Skip street canyon on error
timing["street_canyon"] += time.time() - t0
# Vegetation (already sync)
veg_loss = 0.0
if settings.use_vegetation and vegetation_areas:
t0 = time.time()
try:
veg_loss = vegetation_service.calculate_vegetation_loss(
site.lat, site.lon, lat, lon, vegetation_areas, settings.season
)
except Exception:
veg_loss = 0.0
timing["vegetation"] += time.time() - t0
# Reflections (sync)
reflection_gain = 0.0
if settings.use_reflections and nearby_buildings:
t0 = time.time()
try:
is_over_water = False
if settings.use_water_reflection and water_bodies:
is_over_water = water_service.point_over_water(lat, lon, water_bodies) is not None
refl_paths = reflection_service.find_reflection_paths_sync(
site.lat, site.lon, site.height,
lat, lon, 1.5,
site.frequency, nearby_buildings,
include_ground=True
)
if is_over_water and refl_paths:
water_path = reflection_service._calculate_ground_reflection(
site.lat, site.lon, site.height,
lat, lon, 1.5,
site.frequency, is_water=True
)
if water_path:
refl_paths = [p for p in refl_paths if "ground" not in p.materials]
refl_paths.append(water_path)
refl_paths.sort(key=lambda p: p.total_loss)
if refl_paths:
direct_rsrp = (site.power + site.gain - path_loss - antenna_loss
- terrain_loss - building_loss - veg_loss)
combined_rsrp = reflection_service.combine_paths(
direct_rsrp, refl_paths, site.power + site.gain
)
reflection_gain = max(0, combined_rsrp - direct_rsrp)
except Exception:
reflection_gain = 0.0
timing["reflection"] += time.time() - t0
elif settings.use_water_reflection and water_bodies and not settings.use_reflections:
try:
is_over_water = water_service.point_over_water(lat, lon, water_bodies) is not None
if is_over_water:
reflection_gain = 3.0
except Exception:
pass
# Rain
rain_loss = 0.0
if settings.rain_rate > 0:
rain_loss = weather_service.calculate_rain_attenuation(
site.frequency, distance / 1000, settings.rain_rate
)
# Indoor
indoor_loss = 0.0
if settings.indoor_loss_type != "none":
indoor_loss = indoor_service.calculate_indoor_loss(
site.frequency, settings.indoor_loss_type
)
# Atmospheric
atmo_loss = 0.0
if settings.use_atmospheric:
atmo_loss = atmospheric_service.calculate_atmospheric_loss(
site.frequency, distance / 1000,
settings.temperature_c, settings.humidity_percent
)
# Final RSRP
rsrp = (site.power + site.gain - path_loss - antenna_loss
- terrain_loss - building_loss - veg_loss
- rain_loss - indoor_loss - atmo_loss
+ reflection_gain
- settings.fading_margin)
return CoveragePoint(
lat=lat, lon=lon, rsrp=rsrp, distance=distance,
has_los=has_los, terrain_loss=terrain_loss,
building_loss=building_loss, reflection_gain=reflection_gain,
vegetation_loss=veg_loss, rain_loss=rain_loss,
indoor_loss=indoor_loss, atmospheric_loss=atmo_loss,
)
def _antenna_pattern_loss(
self,
site_lat: float, site_lon: float,
point_lat: float, point_lon: float,
azimuth: float, beamwidth: float
) -> float:
"""Calculate antenna pattern attenuation"""
bearing = self._calculate_bearing(site_lat, site_lon, point_lat, point_lon)
angle_diff = abs(bearing - azimuth)
if angle_diff > 180:
angle_diff = 360 - angle_diff
half_beamwidth = beamwidth / 2
if angle_diff <= half_beamwidth:
loss = 3 * (angle_diff / half_beamwidth) ** 2
else:
loss = 3 + 12 * ((angle_diff - half_beamwidth) / half_beamwidth) ** 2
loss = min(loss, 25)
return loss
def _calculate_bearing(
self,
lat1: float, lon1: float,
lat2: float, lon2: float
) -> float:
"""Calculate bearing from point 1 to point 2 (degrees)"""
# Use math for scalar operations (faster than numpy/cupy for single values)
lat1_r = math.radians(lat1)
lon1_r = math.radians(lon1)
lat2_r = math.radians(lat2)
lon2_r = math.radians(lon2)
dlon = lon2_r - lon1_r
x = math.sin(dlon) * math.cos(lat2_r)
y = math.cos(lat1_r) * math.sin(lat2_r) - math.sin(lat1_r) * math.cos(lat2_r) * math.cos(dlon)
bearing = math.degrees(math.atan2(x, y))
return (bearing + 360) % 360
def _diffraction_loss(self, clearance: float, frequency: float) -> float:
"""Knife-edge diffraction loss using ITU-R P.526 model."""
return _DIFFRACTION_MODEL.calculate_clearance_loss(clearance, frequency)
async def calculate_radial_preview(
self,
site: SiteParams,
settings: CoverageSettings,
num_spokes: int = 360,
points_per_spoke: int = 50,
) -> List[CoveragePoint]:
"""Fast radial preview using terrain-only along 360 spokes.
Much faster than full grid because:
- No OSM data fetch (no buildings/vegetation/water)
- Terrain profile reused per spoke
- Fewer total points at long range
"""
calc_start = time.time()
settings = apply_preset(settings)
# Pre-load terrain tiles for bbox
lat_delta = settings.radius / 111000
cos_lat = np.cos(np.radians(site.lat))
lon_delta = settings.radius / (111000 * cos_lat)
min_lat = site.lat - lat_delta
max_lat = site.lat + lat_delta
min_lon = site.lon - lon_delta
max_lon = site.lon + lon_delta
tile_names = await self.terrain.ensure_tiles_for_bbox(
min_lat, min_lon, max_lat, max_lon
)
for tn in tile_names:
self.terrain._load_tile(tn)
site_elevation = self.terrain.get_elevation_sync(site.lat, site.lon)
# Select propagation model
env = getattr(settings, 'environment', 'urban')
model = select_propagation_model(site.frequency, env)
points: List[CoveragePoint] = []
for angle_deg in range(num_spokes):
angle_rad = math.radians(angle_deg)
cos_a = math.cos(angle_rad)
sin_a = math.sin(angle_rad)
# Antenna pattern loss for this spoke direction
antenna_loss = 0.0
if site.azimuth is not None and site.beamwidth:
angle_diff = abs(angle_deg - site.azimuth)
if angle_diff > 180:
angle_diff = 360 - angle_diff
half_bw = site.beamwidth / 2
if angle_diff <= half_bw:
antenna_loss = 3 * (angle_diff / half_bw) ** 2
else:
antenna_loss = 3 + 12 * ((angle_diff - half_bw) / half_bw) ** 2
antenna_loss = min(antenna_loss, 25)
for i in range(1, points_per_spoke + 1):
distance = i * (settings.radius / points_per_spoke)
# Move point along bearing
lat_offset = (distance / 111000) * cos_a
lon_offset = (distance / (111000 * cos_lat)) * sin_a
rx_lat = site.lat + lat_offset
rx_lon = site.lon + lon_offset
# Path loss
prop_input = PropagationInput(
frequency_mhz=site.frequency,
distance_m=distance,
tx_height_m=site.height,
rx_height_m=1.5,
environment=env,
)
path_loss = model.calculate(prop_input).path_loss_db
# Terrain LOS check
terrain_loss = 0.0
has_los = True
if settings.use_terrain:
los_result = self.los.check_line_of_sight_sync(
site.lat, site.lon, site.height,
rx_lat, rx_lon, 1.5,
)
has_los = los_result['has_los']
if not has_los:
terrain_loss = self._diffraction_loss(
los_result['clearance'], site.frequency
)
rsrp = (site.power + site.gain - path_loss
- antenna_loss - terrain_loss
- settings.fading_margin)
if rsrp >= settings.min_signal:
points.append(CoveragePoint(
lat=rx_lat, lon=rx_lon, rsrp=rsrp,
distance=distance, has_los=has_los,
terrain_loss=terrain_loss, building_loss=0.0,
))
total_time = time.time() - calc_start
_clog(f"Radial preview: {len(points)} points, {num_spokes} spokes × "
f"{points_per_spoke} pts/spoke, {total_time:.1f}s")
return points
# Singleton
coverage_service = CoverageService()