""" Vectorized reflection point calculations using mirror-image method. """ import numpy as np from typing import Tuple, Optional from app.geometry.intersection import line_intersects_polygons_batch def calculate_reflection_points_batch( tx: np.ndarray, rx: np.ndarray, wall_starts: np.ndarray, wall_ends: np.ndarray, ) -> Tuple[np.ndarray, np.ndarray]: """Calculate reflection points on N walls via mirror-image method. Args: tx, rx: shape (2,) wall_starts, wall_ends: shape (N, 2) Returns: reflection_points: (N, 2) valid: bool (N,) """ wall_vec = wall_ends - wall_starts wall_length = np.linalg.norm(wall_vec, axis=1, keepdims=True) wall_unit = wall_vec / np.maximum(wall_length, 1e-10) normals = np.stack([-wall_unit[:, 1], wall_unit[:, 0]], axis=1) tx_to_wall = tx - wall_starts tx_dist_to_wall = np.sum(tx_to_wall * normals, axis=1, keepdims=True) tx_mirror = tx - 2 * tx_dist_to_wall * normals rx_to_mirror = tx_mirror - rx cross_denom = (rx_to_mirror[:, 0] * wall_vec[:, 1] - rx_to_mirror[:, 1] * wall_vec[:, 0]) valid_denom = np.abs(cross_denom) > 1e-10 cross_denom_safe = np.where(valid_denom, cross_denom, 1.0) rx_to_start = wall_starts - rx t = (rx_to_start[:, 0] * rx_to_mirror[:, 1] - rx_to_start[:, 1] * rx_to_mirror[:, 0]) / cross_denom_safe reflection_points = wall_starts + t[:, np.newaxis] * wall_vec valid = valid_denom & (t >= 0) & (t <= 1) & (tx_dist_to_wall[:, 0] > 0) return reflection_points, valid def find_best_reflection_path( tx: np.ndarray, rx: np.ndarray, building_walls_start: np.ndarray, building_walls_end: np.ndarray, wall_to_building: np.ndarray, obstacle_polygons_x: np.ndarray, obstacle_polygons_y: np.ndarray, obstacle_lengths: np.ndarray, max_candidates: int = 50, max_walls: int = 100, max_los_checks: int = 10, ) -> Tuple[Optional[np.ndarray], float, float]: """Find best single-reflection path using vectorized ops. Args: max_walls: Only consider closest N walls for reflection candidates. max_los_checks: Only verify LOS for top N shortest reflection paths. Returns: best_reflection_point: (2,) or None best_path_length: meters best_reflection_loss: dB """ num_walls = len(building_walls_start) if num_walls == 0: return None, np.inf, 0.0 # Limit walls by distance to path midpoint if num_walls > max_walls: midpoint = (tx + rx) / 2 wall_midpoints = (building_walls_start + building_walls_end) / 2 wall_distances = np.linalg.norm(wall_midpoints - midpoint, axis=1) closest = np.argpartition(wall_distances, max_walls)[:max_walls] building_walls_start = building_walls_start[closest] building_walls_end = building_walls_end[closest] wall_to_building = wall_to_building[closest] refl_points, valid = calculate_reflection_points_batch( tx, rx, building_walls_start, building_walls_end, ) if not np.any(valid): return None, np.inf, 0.0 valid_indices = np.where(valid)[0] valid_refl = refl_points[valid] tx_to_refl = np.linalg.norm(valid_refl - tx, axis=1) refl_to_rx = np.linalg.norm(rx - valid_refl, axis=1) path_lengths = tx_to_refl + refl_to_rx # Direct distance filter direct_dist = np.linalg.norm(rx - tx) within_range = path_lengths <= direct_dist * 2.0 if not np.any(within_range): return None, np.inf, 0.0 valid_indices = valid_indices[within_range] valid_refl = valid_refl[within_range] path_lengths = path_lengths[within_range] # Keep top candidates by shortest path if len(valid_indices) > max_candidates: top_idx = np.argpartition(path_lengths, max_candidates)[:max_candidates] valid_indices = valid_indices[top_idx] valid_refl = valid_refl[top_idx] path_lengths = path_lengths[top_idx] # Sort by path length for early exit sort_order = np.argsort(path_lengths) valid_refl = valid_refl[sort_order] path_lengths = path_lengths[sort_order] # Check LOS only for top N shortest candidates check_count = min(len(valid_refl), max_los_checks) best_idx = -1 best_length = np.inf for i in range(check_count): length = path_lengths[i] if length >= best_length: continue refl_pt = valid_refl[i] intersects1, _ = line_intersects_polygons_batch( tx, refl_pt, obstacle_polygons_x, obstacle_polygons_y, obstacle_lengths, ) if np.any(intersects1): continue intersects2, _ = line_intersects_polygons_batch( refl_pt, rx, obstacle_polygons_x, obstacle_polygons_y, obstacle_lengths, ) if np.any(intersects2): continue best_idx = i best_length = length break # sorted by length, first valid is best if best_idx < 0: return None, np.inf, 0.0 best_point = valid_refl[best_idx] # Reflection loss: 3-10 dB depending on path ratio path_ratio = best_length / max(direct_dist, 1.0) reflection_loss = 3.0 + 7.0 * min(1.0, (path_ratio - 1.0) * 2) return best_point, best_length, reflection_loss