Major refactoring of RFCP backend: - Modular propagation models (8 models) - SharedMemoryManager for terrain data - ProcessPoolExecutor parallel processing - WebSocket progress streaming - Building filtering pipeline (351k → 15k) - 82 unit tests Performance: Standard preset 38s → 5s (7.6x speedup) Known issue: Detailed preset timeout (fix in 3.1.0)
164 lines
5.2 KiB
Python
164 lines
5.2 KiB
Python
"""
|
|
Vectorized reflection point calculations using mirror-image method.
|
|
"""
|
|
|
|
import numpy as np
|
|
from typing import Tuple, Optional
|
|
from app.geometry.intersection import line_intersects_polygons_batch
|
|
|
|
|
|
def calculate_reflection_points_batch(
|
|
tx: np.ndarray, rx: np.ndarray,
|
|
wall_starts: np.ndarray, wall_ends: np.ndarray,
|
|
) -> Tuple[np.ndarray, np.ndarray]:
|
|
"""Calculate reflection points on N walls via mirror-image method.
|
|
|
|
Args:
|
|
tx, rx: shape (2,)
|
|
wall_starts, wall_ends: shape (N, 2)
|
|
|
|
Returns:
|
|
reflection_points: (N, 2)
|
|
valid: bool (N,)
|
|
"""
|
|
wall_vec = wall_ends - wall_starts
|
|
wall_length = np.linalg.norm(wall_vec, axis=1, keepdims=True)
|
|
wall_unit = wall_vec / np.maximum(wall_length, 1e-10)
|
|
|
|
normals = np.stack([-wall_unit[:, 1], wall_unit[:, 0]], axis=1)
|
|
|
|
tx_to_wall = tx - wall_starts
|
|
tx_dist_to_wall = np.sum(tx_to_wall * normals, axis=1, keepdims=True)
|
|
tx_mirror = tx - 2 * tx_dist_to_wall * normals
|
|
|
|
rx_to_mirror = tx_mirror - rx
|
|
|
|
cross_denom = (rx_to_mirror[:, 0] * wall_vec[:, 1] -
|
|
rx_to_mirror[:, 1] * wall_vec[:, 0])
|
|
|
|
valid_denom = np.abs(cross_denom) > 1e-10
|
|
cross_denom_safe = np.where(valid_denom, cross_denom, 1.0)
|
|
|
|
rx_to_start = wall_starts - rx
|
|
t = (rx_to_start[:, 0] * rx_to_mirror[:, 1] -
|
|
rx_to_start[:, 1] * rx_to_mirror[:, 0]) / cross_denom_safe
|
|
|
|
reflection_points = wall_starts + t[:, np.newaxis] * wall_vec
|
|
|
|
valid = valid_denom & (t >= 0) & (t <= 1) & (tx_dist_to_wall[:, 0] > 0)
|
|
|
|
return reflection_points, valid
|
|
|
|
|
|
def find_best_reflection_path(
|
|
tx: np.ndarray, rx: np.ndarray,
|
|
building_walls_start: np.ndarray,
|
|
building_walls_end: np.ndarray,
|
|
wall_to_building: np.ndarray,
|
|
obstacle_polygons_x: np.ndarray,
|
|
obstacle_polygons_y: np.ndarray,
|
|
obstacle_lengths: np.ndarray,
|
|
max_candidates: int = 50,
|
|
max_walls: int = 100,
|
|
max_los_checks: int = 10,
|
|
) -> Tuple[Optional[np.ndarray], float, float]:
|
|
"""Find best single-reflection path using vectorized ops.
|
|
|
|
Args:
|
|
max_walls: Only consider closest N walls for reflection candidates.
|
|
max_los_checks: Only verify LOS for top N shortest reflection paths.
|
|
|
|
Returns:
|
|
best_reflection_point: (2,) or None
|
|
best_path_length: meters
|
|
best_reflection_loss: dB
|
|
"""
|
|
num_walls = len(building_walls_start)
|
|
if num_walls == 0:
|
|
return None, np.inf, 0.0
|
|
|
|
# Limit walls by distance to path midpoint
|
|
if num_walls > max_walls:
|
|
midpoint = (tx + rx) / 2
|
|
wall_midpoints = (building_walls_start + building_walls_end) / 2
|
|
wall_distances = np.linalg.norm(wall_midpoints - midpoint, axis=1)
|
|
closest = np.argpartition(wall_distances, max_walls)[:max_walls]
|
|
building_walls_start = building_walls_start[closest]
|
|
building_walls_end = building_walls_end[closest]
|
|
wall_to_building = wall_to_building[closest]
|
|
|
|
refl_points, valid = calculate_reflection_points_batch(
|
|
tx, rx, building_walls_start, building_walls_end,
|
|
)
|
|
|
|
if not np.any(valid):
|
|
return None, np.inf, 0.0
|
|
|
|
valid_indices = np.where(valid)[0]
|
|
valid_refl = refl_points[valid]
|
|
|
|
tx_to_refl = np.linalg.norm(valid_refl - tx, axis=1)
|
|
refl_to_rx = np.linalg.norm(rx - valid_refl, axis=1)
|
|
path_lengths = tx_to_refl + refl_to_rx
|
|
|
|
# Direct distance filter
|
|
direct_dist = np.linalg.norm(rx - tx)
|
|
within_range = path_lengths <= direct_dist * 2.0
|
|
if not np.any(within_range):
|
|
return None, np.inf, 0.0
|
|
|
|
valid_indices = valid_indices[within_range]
|
|
valid_refl = valid_refl[within_range]
|
|
path_lengths = path_lengths[within_range]
|
|
|
|
# Keep top candidates by shortest path
|
|
if len(valid_indices) > max_candidates:
|
|
top_idx = np.argpartition(path_lengths, max_candidates)[:max_candidates]
|
|
valid_indices = valid_indices[top_idx]
|
|
valid_refl = valid_refl[top_idx]
|
|
path_lengths = path_lengths[top_idx]
|
|
|
|
# Sort by path length for early exit
|
|
sort_order = np.argsort(path_lengths)
|
|
valid_refl = valid_refl[sort_order]
|
|
path_lengths = path_lengths[sort_order]
|
|
|
|
# Check LOS only for top N shortest candidates
|
|
check_count = min(len(valid_refl), max_los_checks)
|
|
best_idx = -1
|
|
best_length = np.inf
|
|
|
|
for i in range(check_count):
|
|
length = path_lengths[i]
|
|
if length >= best_length:
|
|
continue
|
|
|
|
refl_pt = valid_refl[i]
|
|
|
|
intersects1, _ = line_intersects_polygons_batch(
|
|
tx, refl_pt, obstacle_polygons_x, obstacle_polygons_y, obstacle_lengths,
|
|
)
|
|
if np.any(intersects1):
|
|
continue
|
|
|
|
intersects2, _ = line_intersects_polygons_batch(
|
|
refl_pt, rx, obstacle_polygons_x, obstacle_polygons_y, obstacle_lengths,
|
|
)
|
|
if np.any(intersects2):
|
|
continue
|
|
|
|
best_idx = i
|
|
best_length = length
|
|
break # sorted by length, first valid is best
|
|
|
|
if best_idx < 0:
|
|
return None, np.inf, 0.0
|
|
|
|
best_point = valid_refl[best_idx]
|
|
|
|
# Reflection loss: 3-10 dB depending on path ratio
|
|
path_ratio = best_length / max(direct_dist, 1.0)
|
|
reflection_loss = 3.0 + 7.0 * min(1.0, (path_ratio - 1.0) * 2)
|
|
|
|
return best_point, best_length, reflection_loss
|