import time import numpy as np from typing import List, Tuple, Optional, Dict, Any, TYPE_CHECKING from dataclasses import dataclass from app.services.terrain_service import terrain_service from app.services.buildings_service import buildings_service, Building from app.services.materials_service import materials_service, BuildingMaterial from app.services.geometry_vectorized import ( points_to_local_coords, line_intersects_polygons_batch, find_best_reflection_path_vectorized, ) if TYPE_CHECKING: from app.services.spatial_index import SpatialIndex @dataclass class RayPath: """Single ray path from TX to RX""" path_type: str # "direct", "reflected", "diffracted", "street" total_distance: float # meters path_loss: float # dB reflection_points: List[Tuple[float, float]] # [(lat, lon), ...] materials_crossed: List[BuildingMaterial] is_valid: bool # Does this path exist? MAX_BUILDINGS_FOR_LINE = 30 MAX_BUILDINGS_FOR_REFLECTION = 20 MAX_DISTANCE_FROM_PATH = 200 # meters def _filter_buildings_by_distance(buildings, tx_point, rx_point, max_count=100, max_distance=500): """Filter buildings to only those close to the TX-RX path. Sort by distance to path midpoint, filter by max_distance, take top max_count. Uses squared Euclidean distance (no sqrt) for speed. """ if len(buildings) <= max_count: return buildings mid_lat = (tx_point[0] + rx_point[0]) / 2 mid_lon = (tx_point[1] + rx_point[1]) / 2 max_dist_sq = max_distance * max_distance def dist_sq_to_midpoint(building): # Building centroid from geometry or fallback to midpoint geom = building.geometry if geom: blat = sum(p[1] for p in geom) / len(geom) blon = sum(p[0] for p in geom) / len(geom) else: blat, blon = mid_lat, mid_lon dlat = (blat - mid_lat) * 111000 dlon = (blon - mid_lon) * 111000 * 0.7 # rough cos correction return dlat * dlat + dlon * dlon scored = [(b, dist_sq_to_midpoint(b)) for b in buildings] scored.sort(key=lambda x: x[1]) # Filter by max distance and take top N filtered = [b for b, d in scored if d <= max_dist_sq] return filtered[:max_count] # ── Vectorized dominant path (NumPy) ── _vec_log_count = 0 def _buildings_to_arrays(buildings: List[Building], ref_lat: float, ref_lon: float): """Convert Building objects to numpy arrays for vectorized geometry. Returns: walls_start: (W, 2) wall start points in local XY meters walls_end: (W, 2) wall end points in local XY meters wall_to_building: (W,) mapping wall index -> building index poly_x: flattened polygon X coords poly_y: flattened polygon Y coords poly_lengths: (num_polygons,) vertices per polygon """ all_walls_start = [] all_walls_end = [] wall_to_building = [] all_poly_x = [] all_poly_y = [] poly_lengths = [] for i, b in enumerate(buildings): geom = b.geometry # [[lon, lat], ...] if not geom or len(geom) < 3: poly_lengths.append(0) continue poly_lats = np.array([p[1] for p in geom]) poly_lons = np.array([p[0] for p in geom]) px, py = points_to_local_coords(ref_lat, ref_lon, poly_lats, poly_lons) all_poly_x.extend(px) all_poly_y.extend(py) poly_lengths.append(len(geom)) # Extract wall segments for j in range(len(geom) - 1): all_walls_start.append([px[j], py[j]]) all_walls_end.append([px[j + 1], py[j + 1]]) wall_to_building.append(i) return ( np.array(all_walls_start) if all_walls_start else np.zeros((0, 2)), np.array(all_walls_end) if all_walls_end else np.zeros((0, 2)), np.array(wall_to_building, dtype=int) if wall_to_building else np.zeros(0, dtype=int), np.array(all_poly_x) if all_poly_x else np.zeros(0), np.array(all_poly_y) if all_poly_y else np.zeros(0), np.array(poly_lengths, dtype=int), ) def find_dominant_paths_vectorized( tx_lat: float, tx_lon: float, tx_height: float, rx_lat: float, rx_lon: float, rx_height: float, frequency_mhz: float, buildings: List[Building], spatial_idx: 'Optional[SpatialIndex]' = None, ) -> Dict[str, Any]: """Vectorized dominant path finding using NumPy batch operations. Replaces the loop-based find_dominant_paths_sync() with: 1. Batch building-to-array conversion 2. Vectorized LOS polygon intersection check 3. Vectorized reflection point calculation 4. Simplified diffraction estimate Returns dict with: has_los, path_type, total_loss, path_length, reflection_point """ global _vec_log_count # Get nearby buildings via spatial index (same filtering as sync version) if spatial_idx: line_buildings = spatial_idx.query_line(tx_lat, tx_lon, rx_lat, rx_lon) else: line_buildings = buildings line_buildings = _filter_buildings_by_distance( line_buildings, (tx_lat, tx_lon), (rx_lat, rx_lon), max_count=MAX_BUILDINGS_FOR_LINE, max_distance=MAX_DISTANCE_FROM_PATH, ) # Reference point for local coordinate system ref_lat = (tx_lat + rx_lat) / 2 ref_lon = (tx_lon + rx_lon) / 2 # Convert TX/RX to local meters tx_xy = points_to_local_coords(ref_lat, ref_lon, np.array([tx_lat]), np.array([tx_lon])) rx_xy = points_to_local_coords(ref_lat, ref_lon, np.array([rx_lat]), np.array([rx_lon])) tx = np.array([tx_xy[0][0], tx_xy[1][0]]) rx = np.array([rx_xy[0][0], rx_xy[1][0]]) direct_dist = np.linalg.norm(rx - tx) # Convert buildings to arrays walls_start, walls_end, wall_to_bldg, poly_x, poly_y, poly_lengths = ( _buildings_to_arrays(line_buildings, ref_lat, ref_lon) ) # Diagnostic log for first few points _vec_log_count += 1 if _vec_log_count <= 3: print( f"[DOMINANT_PATH_VEC] Point #{_vec_log_count}: " f"buildings={len(line_buildings)}, walls={len(walls_start)}, " f"dist={direct_dist:.0f}m", flush=True, ) # No buildings → direct LOS if len(poly_lengths) == 0 or np.all(poly_lengths < 3): return { 'has_los': True, 'path_type': 'direct', 'total_loss': 0.0, 'path_length': direct_dist, 'reflection_point': None, } # Step 1: Vectorized direct LOS check intersects, _ = line_intersects_polygons_batch(tx, rx, poly_x, poly_y, poly_lengths) if not np.any(intersects): return { 'has_los': True, 'path_type': 'direct', 'total_loss': 0.0, 'path_length': direct_dist, 'reflection_point': None, } # Step 2: Vectorized reflection path finding # Use all line buildings for reflection walls if spatial_idx: mid_lat = (tx_lat + rx_lat) / 2 mid_lon = (tx_lon + rx_lon) / 2 refl_buildings = spatial_idx.query_point(mid_lat, mid_lon, buffer_cells=3) refl_buildings = _filter_buildings_by_distance( refl_buildings, (tx_lat, tx_lon), (rx_lat, rx_lon), max_count=MAX_BUILDINGS_FOR_REFLECTION, max_distance=MAX_DISTANCE_FROM_PATH, ) # Merge line + reflection buildings (deduplicate by id) seen_ids = {b.id for b in line_buildings} merged = list(line_buildings) for b in refl_buildings: if b.id not in seen_ids: merged.append(b) seen_ids.add(b.id) r_walls_start, r_walls_end, r_wall_to_bldg, r_poly_x, r_poly_y, r_poly_lengths = ( _buildings_to_arrays(merged, ref_lat, ref_lon) ) else: r_walls_start, r_walls_end, r_wall_to_bldg = walls_start, walls_end, wall_to_bldg r_poly_x, r_poly_y, r_poly_lengths = poly_x, poly_y, poly_lengths refl_point, refl_length, refl_loss = find_best_reflection_path_vectorized( tx, rx, r_walls_start, r_walls_end, r_wall_to_bldg, r_poly_x, r_poly_y, r_poly_lengths, max_candidates=30, max_walls=100, max_los_checks=10, ) if refl_point is not None: # Convert reflection point back to lat/lon cos_lat = np.cos(np.radians(ref_lat)) refl_lat = ref_lat + refl_point[1] / 110540.0 refl_lon = ref_lon + refl_point[0] / (111320.0 * cos_lat) return { 'has_los': False, 'path_type': 'reflection', 'total_loss': refl_loss, 'path_length': refl_length, 'reflection_point': (refl_lat, refl_lon), } # Step 3: Diffraction fallback num_blocking = int(np.sum(intersects)) diffraction_loss = 10.0 + 5.0 * min(num_blocking, 5) return { 'has_los': False, 'path_type': 'diffraction', 'total_loss': diffraction_loss, 'path_length': direct_dist, 'reflection_point': None, } class DominantPathService: """ Find dominant propagation paths (2-3 strongest) Path types: 1. Direct (LoS if available) 2. Single reflection off building 3. Over-roof diffraction 4. Around-corner diffraction """ MAX_REFLECTIONS = 2 MAX_PATHS = 3 _log_count = 0 # Counter for diagnostic logging async def find_dominant_paths( self, tx_lat: float, tx_lon: float, tx_height: float, rx_lat: float, rx_lon: float, rx_height: float, frequency_mhz: float, buildings: List[Building] ) -> List[RayPath]: """Find the dominant propagation paths""" paths = [] # 1. Try direct path direct = await self._check_direct_path( tx_lat, tx_lon, tx_height, rx_lat, rx_lon, rx_height, frequency_mhz, buildings ) if direct: paths.append(direct) # 2. Try single-bounce reflections reflections = await self._find_reflection_paths( tx_lat, tx_lon, tx_height, rx_lat, rx_lon, rx_height, frequency_mhz, buildings ) paths.extend(reflections[:2]) # Max 2 reflection paths # 3. Try over-roof diffraction (if direct blocked) if not direct or not direct.is_valid: diffracted = await self._find_diffraction_path( tx_lat, tx_lon, tx_height, rx_lat, rx_lon, rx_height, frequency_mhz, buildings ) if diffracted: paths.append(diffracted) # Sort by path loss (best first) and return top N paths.sort(key=lambda p: p.path_loss) return paths[:self.MAX_PATHS] async def _check_direct_path( self, tx_lat, tx_lon, tx_height, rx_lat, rx_lon, rx_height, frequency_mhz, buildings: List[Building] ) -> Optional[RayPath]: """Check if direct LoS path exists""" from app.services.los_service import los_service # Check terrain LoS los_result = await los_service.check_line_of_sight( tx_lat, tx_lon, tx_height, rx_lat, rx_lon, rx_height ) if not los_result["has_los"]: distance = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon) return RayPath( path_type="direct", total_distance=distance, path_loss=float('inf'), reflection_points=[], materials_crossed=[], is_valid=False ) # Check building intersections materials_crossed = [] for building in buildings: intersection = self._line_intersects_building_3d( tx_lat, tx_lon, tx_height, rx_lat, rx_lon, rx_height, building ) if intersection: material = materials_service.detect_material(building.tags) materials_crossed.append(material) # Calculate path loss distance = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon) path_loss = self._calculate_path_loss(distance, frequency_mhz, tx_height, rx_height) # Add material penetration losses for material in materials_crossed: path_loss += materials_service.get_penetration_loss(material, frequency_mhz) return RayPath( path_type="direct", total_distance=distance, path_loss=path_loss, reflection_points=[], materials_crossed=materials_crossed, is_valid=len(materials_crossed) < 3 # Too many walls = not viable ) async def _find_reflection_paths( self, tx_lat, tx_lon, tx_height, rx_lat, rx_lon, rx_height, frequency_mhz, buildings: List[Building] ) -> List[RayPath]: """Find viable single-bounce reflection paths""" reflection_paths = [] for building in buildings: # Find potential reflection points on building walls reflection_point = self._find_reflection_point( tx_lat, tx_lon, rx_lat, rx_lon, building ) if not reflection_point: continue ref_lat, ref_lon = reflection_point # Check if both segments are clear # TX -> Reflection point dist1 = terrain_service.haversine_distance(tx_lat, tx_lon, ref_lat, ref_lon) # Reflection point -> RX dist2 = terrain_service.haversine_distance(ref_lat, ref_lon, rx_lat, rx_lon) total_distance = dist1 + dist2 # Don't consider if much longer than direct path direct_distance = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon) if total_distance > direct_distance * 2: continue # Calculate path loss path_loss = self._calculate_path_loss(total_distance, frequency_mhz, tx_height, rx_height) # Add reflection loss material = materials_service.detect_material(building.tags) path_loss += materials_service.get_reflection_loss(material) reflection_paths.append(RayPath( path_type="reflected", total_distance=total_distance, path_loss=path_loss, reflection_points=[(ref_lat, ref_lon)], materials_crossed=[], is_valid=True )) return reflection_paths async def _find_diffraction_path( self, tx_lat, tx_lon, tx_height, rx_lat, rx_lon, rx_height, frequency_mhz, buildings: List[Building] ) -> Optional[RayPath]: """Find over-roof diffraction path""" # Find highest obstacle between TX and RX max_height = 0 obstacle_lat, obstacle_lon = None, None # Sample points along direct path num_samples = 20 for i in range(1, num_samples - 1): t = i / num_samples lat = tx_lat + t * (rx_lat - tx_lat) lon = tx_lon + t * (rx_lon - tx_lon) # Check terrain terrain_elev = await terrain_service.get_elevation(lat, lon) if terrain_elev > max_height: max_height = terrain_elev obstacle_lat, obstacle_lon = lat, lon # Check buildings at this point for building in buildings: if buildings_service.point_in_building(lat, lon, building): if building.height > max_height: max_height = building.height obstacle_lat, obstacle_lon = lat, lon if not obstacle_lat: return None # Calculate diffraction loss (simplified knife-edge) distance = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon) # Fresnel parameter tx_elev = await terrain_service.get_elevation(tx_lat, tx_lon) rx_elev = await terrain_service.get_elevation(rx_lat, rx_lon) tx_total = tx_elev + tx_height rx_total = rx_elev + rx_height # Height of LoS at obstacle point d1 = terrain_service.haversine_distance(tx_lat, tx_lon, obstacle_lat, obstacle_lon) los_height = tx_total + (rx_total - tx_total) * (d1 / distance) if distance > 0 else tx_total clearance = los_height - max_height # Knife-edge diffraction loss diffraction_loss = self._knife_edge_loss(clearance, frequency_mhz, distance, d1) path_loss = self._calculate_path_loss(distance, frequency_mhz, tx_height, rx_height) path_loss += diffraction_loss return RayPath( path_type="diffracted", total_distance=distance, path_loss=path_loss, reflection_points=[(obstacle_lat, obstacle_lon)], materials_crossed=[], is_valid=True ) def _find_reflection_point( self, tx_lat: float, tx_lon: float, rx_lat: float, rx_lon: float, building: Building ) -> Optional[Tuple[float, float]]: """Find specular reflection point on building wall""" # Simplified: find closest wall segment and calculate reflection geometry = building.geometry best_point = None best_score = float('inf') for i in range(len(geometry) - 1): wall_start = geometry[i] wall_end = geometry[i + 1] # Find reflection point on this wall segment ref_point = self._specular_reflection( tx_lon, tx_lat, rx_lon, rx_lat, wall_start[0], wall_start[1], wall_end[0], wall_end[1] ) if ref_point: # Score by total path length d1 = np.sqrt((ref_point[0] - tx_lon)**2 + (ref_point[1] - tx_lat)**2) d2 = np.sqrt((ref_point[0] - rx_lon)**2 + (ref_point[1] - rx_lat)**2) score = d1 + d2 if score < best_score: best_score = score best_point = (ref_point[1], ref_point[0]) # Return as (lat, lon) return best_point def _specular_reflection( self, tx_x, tx_y, rx_x, rx_y, wall_x1, wall_y1, wall_x2, wall_y2 ) -> Optional[Tuple[float, float]]: """Calculate specular reflection point on wall segment""" # Wall vector wall_dx = wall_x2 - wall_x1 wall_dy = wall_y2 - wall_y1 wall_len = np.sqrt(wall_dx**2 + wall_dy**2) if wall_len < 1e-10: return None # Wall normal normal_x = -wall_dy / wall_len normal_y = wall_dx / wall_len # Mirror TX across wall # Project TX onto wall tx_rel_x = tx_x - wall_x1 tx_rel_y = tx_y - wall_y1 dot = tx_rel_x * normal_x + tx_rel_y * normal_y mirror_x = tx_x - 2 * dot * normal_x mirror_y = tx_y - 2 * dot * normal_y # Find intersection of (mirror -> RX) with wall # Parametric line: mirror + t * (rx - mirror) dx = rx_x - mirror_x dy = rx_y - mirror_y # Wall parametric: wall1 + s * (wall2 - wall1) denom = dx * wall_dy - dy * wall_dx if abs(denom) < 1e-10: return None # Parallel t = ((wall_x1 - mirror_x) * wall_dy - (wall_y1 - mirror_y) * wall_dx) / denom s = ((wall_x1 - mirror_x) * dy - (wall_y1 - mirror_y) * dx) / (-denom) # Check if intersection is on wall segment and between mirror and RX if 0 <= s <= 1 and 0 <= t <= 1: ref_x = mirror_x + t * dx ref_y = mirror_y + t * dy return (ref_x, ref_y) return None def _line_intersects_building_3d( self, lat1, lon1, height1, lat2, lon2, height2, building: Building ) -> bool: """Check if 3D line intersects building volume""" # Sample along line for t in np.linspace(0, 1, 20): lat = lat1 + t * (lat2 - lat1) lon = lon1 + t * (lon2 - lon1) height = height1 + t * (height2 - height1) if buildings_service.point_in_building(lat, lon, building): if height < building.height: return True return False def _calculate_path_loss(self, distance, frequency_mhz, tx_height, rx_height) -> float: """Okumura-Hata path loss""" d_km = max(distance / 1000, 0.1) a_hm = (1.1 * np.log10(frequency_mhz) - 0.7) * rx_height - (1.56 * np.log10(frequency_mhz) - 0.8) L = (69.55 + 26.16 * np.log10(frequency_mhz) - 13.82 * np.log10(tx_height) - a_hm + (44.9 - 6.55 * np.log10(tx_height)) * np.log10(d_km)) return L def _knife_edge_loss(self, clearance, frequency_mhz, total_distance, d1) -> float: """Knife-edge diffraction loss""" if clearance >= 0: return 0.0 wavelength = 300 / frequency_mhz d2 = total_distance - d1 if d1 <= 0 or d2 <= 0 or wavelength <= 0: return 0.0 # Fresnel parameter v v = abs(clearance) * np.sqrt(2 * (d1 + d2) / (wavelength * d1 * d2)) # Lee's approximation if v <= -0.78: return 0 elif v < 0: return 6.02 + 9.11 * v - 1.27 * v**2 elif v < 2.4: return 6.02 + 9.11 * v + 1.27 * v**2 else: return 13 + 20 * np.log10(v) # ── Sync versions (terrain tiles must be pre-loaded) ── def find_dominant_paths_sync( self, tx_lat: float, tx_lon: float, tx_height: float, rx_lat: float, rx_lon: float, rx_height: float, frequency_mhz: float, buildings: List[Building], spatial_idx: 'Optional[SpatialIndex]' = None ) -> List[RayPath]: """Sync version - uses spatial index for O(1) building lookups. Args: buildings: fallback list (only used if spatial_idx is None) spatial_idx: grid-based spatial index for fast local queries """ paths = [] # Use spatial index to get only buildings along the TX→RX line if spatial_idx: line_buildings = spatial_idx.query_line(tx_lat, tx_lon, rx_lat, rx_lon) else: line_buildings = buildings # Filter to limit building count — prevents 600+ buildings per point original_line_count = len(line_buildings) line_buildings = _filter_buildings_by_distance( line_buildings, (tx_lat, tx_lon), (rx_lat, rx_lon), max_count=MAX_BUILDINGS_FOR_LINE, max_distance=MAX_DISTANCE_FROM_PATH, ) direct = self._check_direct_path_sync( tx_lat, tx_lon, tx_height, rx_lat, rx_lon, rx_height, frequency_mhz, line_buildings ) if direct: paths.append(direct) # Early termination: if direct path is valid and clear, skip expensive # reflection/diffraction — they won't produce a better path if direct and direct.is_valid and not direct.materials_crossed: return [direct] # For reflections, only check buildings near the midpoint (~500m) if spatial_idx: mid_lat = (tx_lat + rx_lat) / 2 mid_lon = (tx_lon + rx_lon) / 2 # buffer_cells=3 with 0.001° cell ≈ 333m radius reflection_buildings = spatial_idx.query_point(mid_lat, mid_lon, buffer_cells=3) else: reflection_buildings = buildings # Filter reflection buildings to limit count original_refl_count = len(reflection_buildings) reflection_buildings = _filter_buildings_by_distance( reflection_buildings, (tx_lat, tx_lon), (rx_lat, rx_lon), max_count=MAX_BUILDINGS_FOR_REFLECTION, max_distance=MAX_DISTANCE_FROM_PATH, ) # Log building counts for first 3 points so user can verify filtering DominantPathService._log_count += 1 if DominantPathService._log_count <= 3: import sys msg = (f"[DOMINANT_PATH] Point #{DominantPathService._log_count}: " f"line_bldgs={len(line_buildings)} (from {original_line_count}), " f"refl_bldgs={len(reflection_buildings)} (from {original_refl_count}), " f"total_available={len(buildings)}, " f"spatial_idx={'YES' if spatial_idx else 'NO'}, " f"early_exit={'YES' if direct and direct.is_valid and not direct.materials_crossed else 'NO'}") print(msg, flush=True) try: sys.stderr.write(msg + '\n') sys.stderr.flush() except Exception: pass reflections = self._find_reflection_paths_sync( tx_lat, tx_lon, tx_height, rx_lat, rx_lon, rx_height, frequency_mhz, reflection_buildings ) paths.extend(reflections[:2]) if not direct or not direct.is_valid: diffracted = self._find_diffraction_path_sync( tx_lat, tx_lon, tx_height, rx_lat, rx_lon, rx_height, frequency_mhz, spatial_idx=spatial_idx, buildings_fallback=buildings ) if diffracted: paths.append(diffracted) paths.sort(key=lambda p: p.path_loss) return paths[:self.MAX_PATHS] def _check_direct_path_sync( self, tx_lat, tx_lon, tx_height, rx_lat, rx_lon, rx_height, frequency_mhz, buildings: List[Building] ) -> Optional[RayPath]: """Sync direct path check using sync LOS. buildings should already be spatially filtered to the TX→RX line.""" from app.services.los_service import los_service los_result = los_service.check_line_of_sight_sync( tx_lat, tx_lon, tx_height, rx_lat, rx_lon, rx_height ) if not los_result["has_los"]: distance = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon) return RayPath( path_type="direct", total_distance=distance, path_loss=float('inf'), reflection_points=[], materials_crossed=[], is_valid=False ) materials_crossed = [] for building in buildings: intersection = self._line_intersects_building_3d( tx_lat, tx_lon, tx_height, rx_lat, rx_lon, rx_height, building ) if intersection: material = materials_service.detect_material(building.tags) materials_crossed.append(material) if len(materials_crossed) >= 3: break # Early termination — too many walls distance = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon) path_loss = self._calculate_path_loss(distance, frequency_mhz, tx_height, rx_height) for material in materials_crossed: path_loss += materials_service.get_penetration_loss(material, frequency_mhz) return RayPath( path_type="direct", total_distance=distance, path_loss=path_loss, reflection_points=[], materials_crossed=materials_crossed, is_valid=len(materials_crossed) < 3 ) def _find_reflection_paths_sync( self, tx_lat, tx_lon, tx_height, rx_lat, rx_lon, rx_height, frequency_mhz, buildings: List[Building] ) -> List[RayPath]: """Sync reflection paths. buildings should already be spatially filtered to nearby area.""" reflection_paths = [] direct_distance = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon) for building in buildings: reflection_point = self._find_reflection_point( tx_lat, tx_lon, rx_lat, rx_lon, building ) if not reflection_point: continue ref_lat, ref_lon = reflection_point dist1 = terrain_service.haversine_distance(tx_lat, tx_lon, ref_lat, ref_lon) dist2 = terrain_service.haversine_distance(ref_lat, ref_lon, rx_lat, rx_lon) total_distance = dist1 + dist2 if total_distance > direct_distance * 2: continue path_loss = self._calculate_path_loss(total_distance, frequency_mhz, tx_height, rx_height) material = materials_service.detect_material(building.tags) path_loss += materials_service.get_reflection_loss(material) reflection_paths.append(RayPath( path_type="reflected", total_distance=total_distance, path_loss=path_loss, reflection_points=[(ref_lat, ref_lon)], materials_crossed=[], is_valid=True )) return reflection_paths def _find_diffraction_path_sync( self, tx_lat, tx_lon, tx_height, rx_lat, rx_lon, rx_height, frequency_mhz, spatial_idx: 'Optional[SpatialIndex]' = None, buildings_fallback: Optional[List[Building]] = None ) -> Optional[RayPath]: """Sync diffraction path. Uses spatial_idx.query_point at each sample for O(1) building lookup.""" max_height = 0 obstacle_lat, obstacle_lon = None, None num_samples = 20 for i in range(1, num_samples - 1): t = i / num_samples lat = tx_lat + t * (rx_lat - tx_lat) lon = tx_lon + t * (rx_lon - tx_lon) terrain_elev = terrain_service.get_elevation_sync(lat, lon) if terrain_elev > max_height: max_height = terrain_elev obstacle_lat, obstacle_lon = lat, lon # Use spatial index for O(1) lookup at this sample point if spatial_idx: local_buildings = spatial_idx.query_point(lat, lon, buffer_cells=1) else: local_buildings = buildings_fallback or [] for building in local_buildings: if buildings_service.point_in_building(lat, lon, building): if building.height > max_height: max_height = building.height obstacle_lat, obstacle_lon = lat, lon if not obstacle_lat: return None distance = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon) tx_elev = terrain_service.get_elevation_sync(tx_lat, tx_lon) rx_elev = terrain_service.get_elevation_sync(rx_lat, rx_lon) tx_total = tx_elev + tx_height rx_total = rx_elev + rx_height d1 = terrain_service.haversine_distance(tx_lat, tx_lon, obstacle_lat, obstacle_lon) los_height = tx_total + (rx_total - tx_total) * (d1 / distance) if distance > 0 else tx_total clearance = los_height - max_height diffraction_loss = self._knife_edge_loss(clearance, frequency_mhz, distance, d1) path_loss = self._calculate_path_loss(distance, frequency_mhz, tx_height, rx_height) path_loss += diffraction_loss return RayPath( path_type="diffracted", total_distance=distance, path_loss=path_loss, reflection_points=[(obstacle_lat, obstacle_lon)], materials_crossed=[], is_valid=True ) dominant_path_service = DominantPathService()