Files
rfcp/backend/app/services/dominant_path_service.py
2026-02-01 23:51:21 +02:00

973 lines
34 KiB
Python

import time
import numpy as np
from enum import Enum
from typing import List, Tuple, Optional, Dict, Any, TYPE_CHECKING
from dataclasses import dataclass
from app.services.terrain_service import terrain_service
from app.services.buildings_service import buildings_service, Building
from app.services.materials_service import materials_service, BuildingMaterial
from app.services.geometry_vectorized import (
points_to_local_coords,
line_intersects_polygons_batch,
find_best_reflection_path_vectorized,
)
if TYPE_CHECKING:
from app.services.spatial_index import SpatialIndex
# ── Level of Detail (LOD) for dominant path calculations ──
class LODLevel(Enum):
"""Distance-based level of detail for dominant path analysis.
At long distances, building-level multipath contributes minimally
to path loss — macro propagation models suffice.
"""
NONE = "none" # Skip dominant path entirely
SIMPLIFIED = "simplified" # Check only nearest few buildings
FULL = "full" # Full calculation (current behavior)
# LOD distance thresholds (meters)
LOD_THRESHOLD_NONE = 3000 # >3km: skip dominant path
LOD_THRESHOLD_SIMPLIFIED = 1500 # 1.5-3km: simplified mode
# Simplified mode limits
SIMPLIFIED_MAX_BUILDINGS = 5
SIMPLIFIED_MAX_WALLS = 50
def get_lod_level(distance_m: float) -> LODLevel:
"""Determine LOD level based on TX-RX distance."""
if distance_m > LOD_THRESHOLD_NONE:
return LODLevel.NONE
elif distance_m > LOD_THRESHOLD_SIMPLIFIED:
return LODLevel.SIMPLIFIED
else:
return LODLevel.FULL
@dataclass
class RayPath:
"""Single ray path from TX to RX"""
path_type: str # "direct", "reflected", "diffracted", "street"
total_distance: float # meters
path_loss: float # dB
reflection_points: List[Tuple[float, float]] # [(lat, lon), ...]
materials_crossed: List[BuildingMaterial]
is_valid: bool # Does this path exist?
MAX_BUILDINGS_FOR_LINE = 30
MAX_BUILDINGS_FOR_REFLECTION = 20
MAX_DISTANCE_FROM_PATH = 200 # meters
def _filter_buildings_by_distance(buildings, tx_point, rx_point, max_count=100, max_distance=500):
"""Filter buildings to only those close to the TX-RX path.
Sort by distance to path midpoint, filter by max_distance, take top max_count.
Uses squared Euclidean distance (no sqrt) for speed.
"""
if len(buildings) <= max_count:
return buildings
mid_lat = (tx_point[0] + rx_point[0]) / 2
mid_lon = (tx_point[1] + rx_point[1]) / 2
max_dist_sq = max_distance * max_distance
def dist_sq_to_midpoint(building):
# Building centroid from geometry or fallback to midpoint
geom = building.geometry
if geom:
blat = sum(p[1] for p in geom) / len(geom)
blon = sum(p[0] for p in geom) / len(geom)
else:
blat, blon = mid_lat, mid_lon
dlat = (blat - mid_lat) * 111000
dlon = (blon - mid_lon) * 111000 * 0.7 # rough cos correction
return dlat * dlat + dlon * dlon
scored = [(b, dist_sq_to_midpoint(b)) for b in buildings]
scored.sort(key=lambda x: x[1])
# Filter by max distance and take top N
filtered = [b for b, d in scored if d <= max_dist_sq]
return filtered[:max_count]
# ── Vectorized dominant path (NumPy) ──
_vec_log_count = 0
def _buildings_to_arrays(buildings: List[Building], ref_lat: float, ref_lon: float):
"""Convert Building objects to numpy arrays for vectorized geometry.
Returns:
walls_start: (W, 2) wall start points in local XY meters
walls_end: (W, 2) wall end points in local XY meters
wall_to_building: (W,) mapping wall index -> building index
poly_x: flattened polygon X coords
poly_y: flattened polygon Y coords
poly_lengths: (num_polygons,) vertices per polygon
"""
all_walls_start = []
all_walls_end = []
wall_to_building = []
all_poly_x = []
all_poly_y = []
poly_lengths = []
for i, b in enumerate(buildings):
geom = b.geometry # [[lon, lat], ...]
if not geom or len(geom) < 3:
poly_lengths.append(0)
continue
poly_lats = np.array([p[1] for p in geom])
poly_lons = np.array([p[0] for p in geom])
px, py = points_to_local_coords(ref_lat, ref_lon, poly_lats, poly_lons)
all_poly_x.extend(px)
all_poly_y.extend(py)
poly_lengths.append(len(geom))
# Extract wall segments
for j in range(len(geom) - 1):
all_walls_start.append([px[j], py[j]])
all_walls_end.append([px[j + 1], py[j + 1]])
wall_to_building.append(i)
return (
np.array(all_walls_start) if all_walls_start else np.zeros((0, 2)),
np.array(all_walls_end) if all_walls_end else np.zeros((0, 2)),
np.array(wall_to_building, dtype=int) if wall_to_building else np.zeros(0, dtype=int),
np.array(all_poly_x) if all_poly_x else np.zeros(0),
np.array(all_poly_y) if all_poly_y else np.zeros(0),
np.array(poly_lengths, dtype=int),
)
def find_dominant_paths_vectorized(
tx_lat: float, tx_lon: float, tx_height: float,
rx_lat: float, rx_lon: float, rx_height: float,
frequency_mhz: float,
buildings: List[Building],
spatial_idx: 'Optional[SpatialIndex]' = None,
) -> Dict[str, Any]:
"""Vectorized dominant path finding using NumPy batch operations.
Replaces the loop-based find_dominant_paths_sync() with:
1. Batch building-to-array conversion
2. Vectorized LOS polygon intersection check
3. Vectorized reflection point calculation
4. Simplified diffraction estimate
Returns dict with:
has_los, path_type, total_loss, path_length, reflection_point
"""
global _vec_log_count
# Fast path: no buildings at all → direct LOS, skip all numpy work
has_spatial_data = spatial_idx is not None and spatial_idx._grid
if not buildings and not has_spatial_data:
return {
'has_los': True,
'path_type': 'direct',
'total_loss': 0.0,
'path_length': 0.0,
'reflection_point': None,
}
# Get nearby buildings via spatial index (same filtering as sync version)
if spatial_idx:
line_buildings = spatial_idx.query_line(tx_lat, tx_lon, rx_lat, rx_lon)
else:
line_buildings = buildings
# No nearby buildings along this line → direct LOS
if not line_buildings:
return {
'has_los': True,
'path_type': 'direct',
'total_loss': 0.0,
'path_length': 0.0,
'reflection_point': None,
}
line_buildings = _filter_buildings_by_distance(
line_buildings,
(tx_lat, tx_lon), (rx_lat, rx_lon),
max_count=MAX_BUILDINGS_FOR_LINE,
max_distance=MAX_DISTANCE_FROM_PATH,
)
# Reference point for local coordinate system
ref_lat = (tx_lat + rx_lat) / 2
ref_lon = (tx_lon + rx_lon) / 2
# Convert TX/RX to local meters
tx_xy = points_to_local_coords(ref_lat, ref_lon, np.array([tx_lat]), np.array([tx_lon]))
rx_xy = points_to_local_coords(ref_lat, ref_lon, np.array([rx_lat]), np.array([rx_lon]))
tx = np.array([tx_xy[0][0], tx_xy[1][0]])
rx = np.array([rx_xy[0][0], rx_xy[1][0]])
direct_dist = np.linalg.norm(rx - tx)
# Convert buildings to arrays
walls_start, walls_end, wall_to_bldg, poly_x, poly_y, poly_lengths = (
_buildings_to_arrays(line_buildings, ref_lat, ref_lon)
)
# Diagnostic log for first few points
_vec_log_count += 1
if _vec_log_count <= 3:
print(
f"[DOMINANT_PATH_VEC] Point #{_vec_log_count}: "
f"buildings={len(line_buildings)}, walls={len(walls_start)}, "
f"dist={direct_dist:.0f}m",
flush=True,
)
# No buildings → direct LOS
if len(poly_lengths) == 0 or np.all(poly_lengths < 3):
return {
'has_los': True,
'path_type': 'direct',
'total_loss': 0.0,
'path_length': direct_dist,
'reflection_point': None,
}
# Step 1: Vectorized direct LOS check
intersects, _ = line_intersects_polygons_batch(tx, rx, poly_x, poly_y, poly_lengths)
if not np.any(intersects):
return {
'has_los': True,
'path_type': 'direct',
'total_loss': 0.0,
'path_length': direct_dist,
'reflection_point': None,
}
# Step 2: Vectorized reflection path finding
# Use all line buildings for reflection walls
if spatial_idx:
mid_lat = (tx_lat + rx_lat) / 2
mid_lon = (tx_lon + rx_lon) / 2
refl_buildings = spatial_idx.query_point(mid_lat, mid_lon, buffer_cells=3)
refl_buildings = _filter_buildings_by_distance(
refl_buildings,
(tx_lat, tx_lon), (rx_lat, rx_lon),
max_count=MAX_BUILDINGS_FOR_REFLECTION,
max_distance=MAX_DISTANCE_FROM_PATH,
)
# Merge line + reflection buildings (deduplicate by id)
seen_ids = {b.id for b in line_buildings}
merged = list(line_buildings)
for b in refl_buildings:
if b.id not in seen_ids:
merged.append(b)
seen_ids.add(b.id)
r_walls_start, r_walls_end, r_wall_to_bldg, r_poly_x, r_poly_y, r_poly_lengths = (
_buildings_to_arrays(merged, ref_lat, ref_lon)
)
else:
r_walls_start, r_walls_end, r_wall_to_bldg = walls_start, walls_end, wall_to_bldg
r_poly_x, r_poly_y, r_poly_lengths = poly_x, poly_y, poly_lengths
refl_point, refl_length, refl_loss = find_best_reflection_path_vectorized(
tx, rx,
r_walls_start, r_walls_end, r_wall_to_bldg,
r_poly_x, r_poly_y, r_poly_lengths,
max_candidates=30,
max_walls=100,
max_los_checks=10,
)
if refl_point is not None:
# Convert reflection point back to lat/lon
cos_lat = np.cos(np.radians(ref_lat))
refl_lat = ref_lat + refl_point[1] / 110540.0
refl_lon = ref_lon + refl_point[0] / (111320.0 * cos_lat)
return {
'has_los': False,
'path_type': 'reflection',
'total_loss': refl_loss,
'path_length': refl_length,
'reflection_point': (refl_lat, refl_lon),
}
# Step 3: Diffraction fallback
num_blocking = int(np.sum(intersects))
diffraction_loss = 10.0 + 5.0 * min(num_blocking, 5)
return {
'has_los': False,
'path_type': 'diffraction',
'total_loss': diffraction_loss,
'path_length': direct_dist,
'reflection_point': None,
}
class DominantPathService:
"""
Find dominant propagation paths (2-3 strongest)
Path types:
1. Direct (LoS if available)
2. Single reflection off building
3. Over-roof diffraction
4. Around-corner diffraction
"""
MAX_REFLECTIONS = 2
MAX_PATHS = 3
_log_count = 0 # Counter for diagnostic logging
async def find_dominant_paths(
self,
tx_lat: float, tx_lon: float, tx_height: float,
rx_lat: float, rx_lon: float, rx_height: float,
frequency_mhz: float,
buildings: List[Building]
) -> List[RayPath]:
"""Find the dominant propagation paths"""
paths = []
# 1. Try direct path
direct = await self._check_direct_path(
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz, buildings
)
if direct:
paths.append(direct)
# 2. Try single-bounce reflections
reflections = await self._find_reflection_paths(
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz, buildings
)
paths.extend(reflections[:2]) # Max 2 reflection paths
# 3. Try over-roof diffraction (if direct blocked)
if not direct or not direct.is_valid:
diffracted = await self._find_diffraction_path(
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz, buildings
)
if diffracted:
paths.append(diffracted)
# Sort by path loss (best first) and return top N
paths.sort(key=lambda p: p.path_loss)
return paths[:self.MAX_PATHS]
async def _check_direct_path(
self,
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz,
buildings: List[Building]
) -> Optional[RayPath]:
"""Check if direct LoS path exists"""
from app.services.los_service import los_service
# Check terrain LoS
los_result = await los_service.check_line_of_sight(
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height
)
if not los_result["has_los"]:
distance = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon)
return RayPath(
path_type="direct",
total_distance=distance,
path_loss=float('inf'),
reflection_points=[],
materials_crossed=[],
is_valid=False
)
# Check building intersections
materials_crossed = []
for building in buildings:
intersection = self._line_intersects_building_3d(
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
building
)
if intersection:
material = materials_service.detect_material(building.tags)
materials_crossed.append(material)
# Calculate path loss
distance = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon)
path_loss = self._calculate_path_loss(distance, frequency_mhz, tx_height, rx_height)
# Add material penetration losses
for material in materials_crossed:
path_loss += materials_service.get_penetration_loss(material, frequency_mhz)
return RayPath(
path_type="direct",
total_distance=distance,
path_loss=path_loss,
reflection_points=[],
materials_crossed=materials_crossed,
is_valid=len(materials_crossed) < 3 # Too many walls = not viable
)
async def _find_reflection_paths(
self,
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz,
buildings: List[Building]
) -> List[RayPath]:
"""Find viable single-bounce reflection paths"""
reflection_paths = []
for building in buildings:
# Find potential reflection points on building walls
reflection_point = self._find_reflection_point(
tx_lat, tx_lon, rx_lat, rx_lon, building
)
if not reflection_point:
continue
ref_lat, ref_lon = reflection_point
# Check if both segments are clear
# TX -> Reflection point
dist1 = terrain_service.haversine_distance(tx_lat, tx_lon, ref_lat, ref_lon)
# Reflection point -> RX
dist2 = terrain_service.haversine_distance(ref_lat, ref_lon, rx_lat, rx_lon)
total_distance = dist1 + dist2
# Don't consider if much longer than direct path
direct_distance = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon)
if total_distance > direct_distance * 2:
continue
# Calculate path loss
path_loss = self._calculate_path_loss(total_distance, frequency_mhz, tx_height, rx_height)
# Add reflection loss
material = materials_service.detect_material(building.tags)
path_loss += materials_service.get_reflection_loss(material)
reflection_paths.append(RayPath(
path_type="reflected",
total_distance=total_distance,
path_loss=path_loss,
reflection_points=[(ref_lat, ref_lon)],
materials_crossed=[],
is_valid=True
))
return reflection_paths
async def _find_diffraction_path(
self,
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz,
buildings: List[Building]
) -> Optional[RayPath]:
"""Find over-roof diffraction path"""
# Find highest obstacle between TX and RX
max_height = 0
obstacle_lat, obstacle_lon = None, None
# Sample points along direct path
num_samples = 20
for i in range(1, num_samples - 1):
t = i / num_samples
lat = tx_lat + t * (rx_lat - tx_lat)
lon = tx_lon + t * (rx_lon - tx_lon)
# Check terrain
terrain_elev = await terrain_service.get_elevation(lat, lon)
if terrain_elev > max_height:
max_height = terrain_elev
obstacle_lat, obstacle_lon = lat, lon
# Check buildings at this point
for building in buildings:
if buildings_service.point_in_building(lat, lon, building):
if building.height > max_height:
max_height = building.height
obstacle_lat, obstacle_lon = lat, lon
if not obstacle_lat:
return None
# Calculate diffraction loss (simplified knife-edge)
distance = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon)
# Fresnel parameter
tx_elev = await terrain_service.get_elevation(tx_lat, tx_lon)
rx_elev = await terrain_service.get_elevation(rx_lat, rx_lon)
tx_total = tx_elev + tx_height
rx_total = rx_elev + rx_height
# Height of LoS at obstacle point
d1 = terrain_service.haversine_distance(tx_lat, tx_lon, obstacle_lat, obstacle_lon)
los_height = tx_total + (rx_total - tx_total) * (d1 / distance) if distance > 0 else tx_total
clearance = los_height - max_height
# Knife-edge diffraction loss
diffraction_loss = self._knife_edge_loss(clearance, frequency_mhz, distance, d1)
path_loss = self._calculate_path_loss(distance, frequency_mhz, tx_height, rx_height)
path_loss += diffraction_loss
return RayPath(
path_type="diffracted",
total_distance=distance,
path_loss=path_loss,
reflection_points=[(obstacle_lat, obstacle_lon)],
materials_crossed=[],
is_valid=True
)
def _find_reflection_point(
self,
tx_lat: float, tx_lon: float,
rx_lat: float, rx_lon: float,
building: Building
) -> Optional[Tuple[float, float]]:
"""Find specular reflection point on building wall"""
# Simplified: find closest wall segment and calculate reflection
geometry = building.geometry
best_point = None
best_score = float('inf')
for i in range(len(geometry) - 1):
wall_start = geometry[i]
wall_end = geometry[i + 1]
# Find reflection point on this wall segment
ref_point = self._specular_reflection(
tx_lon, tx_lat, rx_lon, rx_lat,
wall_start[0], wall_start[1],
wall_end[0], wall_end[1]
)
if ref_point:
# Score by total path length
d1 = np.sqrt((ref_point[0] - tx_lon)**2 + (ref_point[1] - tx_lat)**2)
d2 = np.sqrt((ref_point[0] - rx_lon)**2 + (ref_point[1] - rx_lat)**2)
score = d1 + d2
if score < best_score:
best_score = score
best_point = (ref_point[1], ref_point[0]) # Return as (lat, lon)
return best_point
def _specular_reflection(
self,
tx_x, tx_y, rx_x, rx_y,
wall_x1, wall_y1, wall_x2, wall_y2
) -> Optional[Tuple[float, float]]:
"""Calculate specular reflection point on wall segment"""
# Wall vector
wall_dx = wall_x2 - wall_x1
wall_dy = wall_y2 - wall_y1
wall_len = np.sqrt(wall_dx**2 + wall_dy**2)
if wall_len < 1e-10:
return None
# Wall normal
normal_x = -wall_dy / wall_len
normal_y = wall_dx / wall_len
# Mirror TX across wall
# Project TX onto wall
tx_rel_x = tx_x - wall_x1
tx_rel_y = tx_y - wall_y1
dot = tx_rel_x * normal_x + tx_rel_y * normal_y
mirror_x = tx_x - 2 * dot * normal_x
mirror_y = tx_y - 2 * dot * normal_y
# Find intersection of (mirror -> RX) with wall
# Parametric line: mirror + t * (rx - mirror)
dx = rx_x - mirror_x
dy = rx_y - mirror_y
# Wall parametric: wall1 + s * (wall2 - wall1)
denom = dx * wall_dy - dy * wall_dx
if abs(denom) < 1e-10:
return None # Parallel
t = ((wall_x1 - mirror_x) * wall_dy - (wall_y1 - mirror_y) * wall_dx) / denom
s = ((wall_x1 - mirror_x) * dy - (wall_y1 - mirror_y) * dx) / (-denom)
# Check if intersection is on wall segment and between mirror and RX
if 0 <= s <= 1 and 0 <= t <= 1:
ref_x = mirror_x + t * dx
ref_y = mirror_y + t * dy
return (ref_x, ref_y)
return None
def _line_intersects_building_3d(
self,
lat1, lon1, height1,
lat2, lon2, height2,
building: Building
) -> bool:
"""Check if 3D line intersects building volume"""
# Sample along line
for t in np.linspace(0, 1, 20):
lat = lat1 + t * (lat2 - lat1)
lon = lon1 + t * (lon2 - lon1)
height = height1 + t * (height2 - height1)
if buildings_service.point_in_building(lat, lon, building):
if height < building.height:
return True
return False
def _calculate_path_loss(self, distance, frequency_mhz, tx_height, rx_height) -> float:
"""Okumura-Hata path loss"""
d_km = max(distance / 1000, 0.1)
a_hm = (1.1 * np.log10(frequency_mhz) - 0.7) * rx_height - (1.56 * np.log10(frequency_mhz) - 0.8)
L = (69.55 + 26.16 * np.log10(frequency_mhz) - 13.82 * np.log10(tx_height) - a_hm +
(44.9 - 6.55 * np.log10(tx_height)) * np.log10(d_km))
return L
def _knife_edge_loss(self, clearance, frequency_mhz, total_distance, d1) -> float:
"""Knife-edge diffraction loss"""
if clearance >= 0:
return 0.0
wavelength = 300 / frequency_mhz
d2 = total_distance - d1
if d1 <= 0 or d2 <= 0 or wavelength <= 0:
return 0.0
# Fresnel parameter v
v = abs(clearance) * np.sqrt(2 * (d1 + d2) / (wavelength * d1 * d2))
# Lee's approximation
if v <= -0.78:
return 0
elif v < 0:
return 6.02 + 9.11 * v - 1.27 * v**2
elif v < 2.4:
return 6.02 + 9.11 * v + 1.27 * v**2
else:
return 13 + 20 * np.log10(v)
# ── Sync versions (terrain tiles must be pre-loaded) ──
def find_dominant_paths_sync(
self,
tx_lat: float, tx_lon: float, tx_height: float,
rx_lat: float, rx_lon: float, rx_height: float,
frequency_mhz: float,
buildings: List[Building],
spatial_idx: 'Optional[SpatialIndex]' = None
) -> List[RayPath]:
"""Sync version - uses spatial index for O(1) building lookups.
Args:
buildings: fallback list (only used if spatial_idx is None)
spatial_idx: grid-based spatial index for fast local queries
"""
# Fast path: no buildings at all → direct LOS only
has_spatial_data = spatial_idx is not None and spatial_idx._grid
if not buildings and not has_spatial_data:
distance = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon)
return [RayPath(
path_type="direct",
total_distance=distance,
path_loss=self._calculate_path_loss(distance, frequency_mhz, tx_height, rx_height),
reflection_points=[],
materials_crossed=[],
is_valid=True,
)]
paths = []
# Use spatial index to get only buildings along the TX→RX line
if spatial_idx:
line_buildings = spatial_idx.query_line(tx_lat, tx_lon, rx_lat, rx_lon)
else:
line_buildings = buildings
# Filter to limit building count — prevents 600+ buildings per point
original_line_count = len(line_buildings)
line_buildings = _filter_buildings_by_distance(
line_buildings,
(tx_lat, tx_lon), (rx_lat, rx_lon),
max_count=MAX_BUILDINGS_FOR_LINE,
max_distance=MAX_DISTANCE_FROM_PATH,
)
direct = self._check_direct_path_sync(
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz, line_buildings
)
if direct:
paths.append(direct)
# Early termination: if direct path is valid and clear, skip expensive
# reflection/diffraction — they won't produce a better path
if direct and direct.is_valid and not direct.materials_crossed:
return [direct]
# For reflections, only check buildings near the midpoint (~500m)
if spatial_idx:
mid_lat = (tx_lat + rx_lat) / 2
mid_lon = (tx_lon + rx_lon) / 2
# buffer_cells=3 with 0.001° cell ≈ 333m radius
reflection_buildings = spatial_idx.query_point(mid_lat, mid_lon, buffer_cells=3)
else:
reflection_buildings = buildings
# Filter reflection buildings to limit count
original_refl_count = len(reflection_buildings)
reflection_buildings = _filter_buildings_by_distance(
reflection_buildings,
(tx_lat, tx_lon), (rx_lat, rx_lon),
max_count=MAX_BUILDINGS_FOR_REFLECTION,
max_distance=MAX_DISTANCE_FROM_PATH,
)
# Log building counts for first 3 points so user can verify filtering
DominantPathService._log_count += 1
if DominantPathService._log_count <= 3:
import sys
msg = (f"[DOMINANT_PATH] Point #{DominantPathService._log_count}: "
f"line_bldgs={len(line_buildings)} (from {original_line_count}), "
f"refl_bldgs={len(reflection_buildings)} (from {original_refl_count}), "
f"total_available={len(buildings)}, "
f"spatial_idx={'YES' if spatial_idx else 'NO'}, "
f"early_exit={'YES' if direct and direct.is_valid and not direct.materials_crossed else 'NO'}")
print(msg, flush=True)
try:
sys.stderr.write(msg + '\n')
sys.stderr.flush()
except Exception:
pass
reflections = self._find_reflection_paths_sync(
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz, reflection_buildings
)
paths.extend(reflections[:2])
if not direct or not direct.is_valid:
diffracted = self._find_diffraction_path_sync(
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz, spatial_idx=spatial_idx, buildings_fallback=buildings
)
if diffracted:
paths.append(diffracted)
paths.sort(key=lambda p: p.path_loss)
return paths[:self.MAX_PATHS]
def _check_direct_path_sync(
self,
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz,
buildings: List[Building]
) -> Optional[RayPath]:
"""Sync direct path check using sync LOS.
buildings should already be spatially filtered to the TX→RX line."""
from app.services.los_service import los_service
los_result = los_service.check_line_of_sight_sync(
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height
)
if not los_result["has_los"]:
distance = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon)
return RayPath(
path_type="direct",
total_distance=distance,
path_loss=float('inf'),
reflection_points=[],
materials_crossed=[],
is_valid=False
)
materials_crossed = []
for building in buildings:
intersection = self._line_intersects_building_3d(
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
building
)
if intersection:
material = materials_service.detect_material(building.tags)
materials_crossed.append(material)
if len(materials_crossed) >= 3:
break # Early termination — too many walls
distance = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon)
path_loss = self._calculate_path_loss(distance, frequency_mhz, tx_height, rx_height)
for material in materials_crossed:
path_loss += materials_service.get_penetration_loss(material, frequency_mhz)
return RayPath(
path_type="direct",
total_distance=distance,
path_loss=path_loss,
reflection_points=[],
materials_crossed=materials_crossed,
is_valid=len(materials_crossed) < 3
)
def _find_reflection_paths_sync(
self,
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz,
buildings: List[Building]
) -> List[RayPath]:
"""Sync reflection paths.
buildings should already be spatially filtered to nearby area."""
reflection_paths = []
direct_distance = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon)
for building in buildings:
reflection_point = self._find_reflection_point(
tx_lat, tx_lon, rx_lat, rx_lon, building
)
if not reflection_point:
continue
ref_lat, ref_lon = reflection_point
dist1 = terrain_service.haversine_distance(tx_lat, tx_lon, ref_lat, ref_lon)
dist2 = terrain_service.haversine_distance(ref_lat, ref_lon, rx_lat, rx_lon)
total_distance = dist1 + dist2
if total_distance > direct_distance * 2:
continue
path_loss = self._calculate_path_loss(total_distance, frequency_mhz, tx_height, rx_height)
material = materials_service.detect_material(building.tags)
path_loss += materials_service.get_reflection_loss(material)
reflection_paths.append(RayPath(
path_type="reflected",
total_distance=total_distance,
path_loss=path_loss,
reflection_points=[(ref_lat, ref_lon)],
materials_crossed=[],
is_valid=True
))
return reflection_paths
def _find_diffraction_path_sync(
self,
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz,
spatial_idx: 'Optional[SpatialIndex]' = None,
buildings_fallback: Optional[List[Building]] = None
) -> Optional[RayPath]:
"""Sync diffraction path.
Uses spatial_idx.query_point at each sample for O(1) building lookup."""
max_height = 0
obstacle_lat, obstacle_lon = None, None
num_samples = 20
for i in range(1, num_samples - 1):
t = i / num_samples
lat = tx_lat + t * (rx_lat - tx_lat)
lon = tx_lon + t * (rx_lon - tx_lon)
terrain_elev = terrain_service.get_elevation_sync(lat, lon)
if terrain_elev > max_height:
max_height = terrain_elev
obstacle_lat, obstacle_lon = lat, lon
# Use spatial index for O(1) lookup at this sample point
if spatial_idx:
local_buildings = spatial_idx.query_point(lat, lon, buffer_cells=1)
else:
local_buildings = buildings_fallback or []
for building in local_buildings:
if buildings_service.point_in_building(lat, lon, building):
if building.height > max_height:
max_height = building.height
obstacle_lat, obstacle_lon = lat, lon
if not obstacle_lat:
return None
distance = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon)
tx_elev = terrain_service.get_elevation_sync(tx_lat, tx_lon)
rx_elev = terrain_service.get_elevation_sync(rx_lat, rx_lon)
tx_total = tx_elev + tx_height
rx_total = rx_elev + rx_height
d1 = terrain_service.haversine_distance(tx_lat, tx_lon, obstacle_lat, obstacle_lon)
los_height = tx_total + (rx_total - tx_total) * (d1 / distance) if distance > 0 else tx_total
clearance = los_height - max_height
diffraction_loss = self._knife_edge_loss(clearance, frequency_mhz, distance, d1)
path_loss = self._calculate_path_loss(distance, frequency_mhz, tx_height, rx_height)
path_loss += diffraction_loss
return RayPath(
path_type="diffracted",
total_distance=distance,
path_loss=path_loss,
reflection_points=[(obstacle_lat, obstacle_lon)],
materials_crossed=[],
is_valid=True
)
dominant_path_service = DominantPathService()