import numpy as np from typing import List, Tuple, Optional from dataclasses import dataclass import httpx from pathlib import Path import json @dataclass class Street: """Street segment from OSM""" id: int name: Optional[str] geometry: List[Tuple[float, float]] # [(lat, lon), ...] width: float # meters highway_type: str # residential, primary, secondary, etc. class StreetCanyonService: """ Street canyon propagation model (ITU-R P.1411) Signal propagates along streets with reflections from building walls. Loss increases at corners/turns. """ OVERPASS_URL = "https://overpass-api.de/api/interpreter" # Default street widths by type STREET_WIDTHS = { "motorway": 25.0, "trunk": 20.0, "primary": 15.0, "secondary": 12.0, "tertiary": 10.0, "residential": 8.0, "unclassified": 6.0, "service": 5.0, "footway": 2.0, "path": 1.5, } # Corner/turn loss CORNER_LOSS_90 = 10.0 # dB for 90-degree turn CORNER_LOSS_45 = 4.0 # dB for 45-degree turn def __init__(self): import os self.data_path = Path(os.environ.get('RFCP_DATA_PATH', './data')) self.cache_dir = self.data_path / 'osm' / 'streets' self.cache_dir.mkdir(exist_ok=True, parents=True) self._cache: dict[str, List[Street]] = {} async def fetch_streets( self, min_lat: float, min_lon: float, max_lat: float, max_lon: float ) -> List[Street]: """Fetch street network from OSM""" cache_key = f"{min_lat:.2f}_{min_lon:.2f}_{max_lat:.2f}_{max_lon:.2f}" # Check memory cache if cache_key in self._cache: return self._cache[cache_key] # Check disk cache cache_file = self.cache_dir / f"{cache_key}.json" if cache_file.exists(): try: with open(cache_file) as f: data = json.load(f) streets = [Street(**s) for s in data] self._cache[cache_key] = streets print(f"[Streets] Cache hit for {cache_key}") return streets except Exception: pass # Fetch from Overpass print(f"[Streets] Fetching from Overpass API for {cache_key}...") query = f""" [out:json][timeout:30]; way["highway"]({min_lat},{min_lon},{max_lat},{max_lon}); out body; >; out skel qt; """ try: async with httpx.AsyncClient(timeout=60.0) as client: response = await client.post(self.OVERPASS_URL, data={"data": query}) response.raise_for_status() data = response.json() except Exception as e: print(f"[Streets] Fetch error: {e}") return [] streets = self._parse_streets(data) # Cache to disk if streets: with open(cache_file, 'w') as f: json.dump([{ "id": s.id, "name": s.name, "geometry": s.geometry, "width": s.width, "highway_type": s.highway_type } for s in streets], f) self._cache[cache_key] = streets return streets def _parse_streets(self, data: dict) -> List[Street]: """Parse Overpass response into Street objects""" nodes = {} for element in data.get("elements", []): if element["type"] == "node": nodes[element["id"]] = (element["lat"], element["lon"]) streets = [] for element in data.get("elements", []): if element["type"] != "way": continue tags = element.get("tags", {}) if "highway" not in tags: continue highway_type = tags["highway"] # Skip non-road types if highway_type in ["bus_stop", "crossing", "traffic_signals"]: continue geometry = [] for node_id in element.get("nodes", []): if node_id in nodes: geometry.append(nodes[node_id]) if len(geometry) < 2: continue # Get width width = self._get_street_width(tags) streets.append(Street( id=element["id"], name=tags.get("name"), geometry=geometry, width=width, highway_type=highway_type )) return streets def _get_street_width(self, tags: dict) -> float: """Estimate street width from OSM tags""" # Explicit width if "width" in tags: try: return float(tags["width"].replace("m", "").strip()) except (ValueError, TypeError): pass # Calculate from lanes if "lanes" in tags: try: lanes = int(tags["lanes"]) return lanes * 3.5 # ~3.5m per lane except (ValueError, TypeError): pass # Default by highway type highway_type = tags.get("highway", "residential") return self.STREET_WIDTHS.get(highway_type, 8.0) async def calculate_street_canyon_loss( self, tx_lat: float, tx_lon: float, tx_height: float, rx_lat: float, rx_lon: float, rx_height: float, frequency_mhz: float, streets: List[Street] ) -> Tuple[float, List[Tuple[float, float]]]: """ Calculate path loss through street canyon Returns: (path_loss_db, street_path as list of points) """ # Find path along streets from TX to RX street_path = self._find_street_path(tx_lat, tx_lon, rx_lat, rx_lon, streets) if not street_path: return float('inf'), [] # No street path found # Calculate loss along path total_loss = 0.0 total_distance = 0.0 for i in range(len(street_path) - 1): p1 = street_path[i] p2 = street_path[i + 1] # Segment distance from app.services.terrain_service import TerrainService segment_dist = TerrainService.haversine_distance(p1[0], p1[1], p2[0], p2[1]) total_distance += segment_dist # Street canyon loss (ITU-R P.1411 simplified) # L = 32.4 + 20*log10(f_MHz) + 20*log10(d_km) if segment_dist > 0: segment_loss = 32.4 + 20 * np.log10(frequency_mhz) + 20 * np.log10(segment_dist / 1000 + 0.001) total_loss += segment_loss * (segment_dist / total_distance) if total_distance > 0 else 0 # Corner loss if i > 0: corner_angle = self._calculate_corner_angle( street_path[i - 1], p1, p2 ) corner_loss = self._corner_loss(corner_angle) total_loss += corner_loss return total_loss, street_path def _find_street_path( self, start_lat: float, start_lon: float, end_lat: float, end_lon: float, streets: List[Street] ) -> List[Tuple[float, float]]: """ Find path along streets (simplified A* / greedy) Returns list of (lat, lon) waypoints """ # Find nearest street point to start and end start_point = self._nearest_street_point(start_lat, start_lon, streets) end_point = self._nearest_street_point(end_lat, end_lon, streets) if not start_point or not end_point: return [] # Simplified: just return direct street segments # Full implementation would use A* pathfinding path = [(start_lat, start_lon), start_point] # Add intermediate points along streets toward destination current = start_point visited = set() for _ in range(50): # Max iterations if self._distance(current, end_point) < 50: # Within 50m break # Find next street segment toward destination next_point = self._next_street_point(current, end_point, streets, visited) if not next_point: break path.append(next_point) visited.add((round(current[0], 5), round(current[1], 5))) current = next_point path.append(end_point) path.append((end_lat, end_lon)) return path def _nearest_street_point( self, lat: float, lon: float, streets: List[Street] ) -> Optional[Tuple[float, float]]: """Find nearest point on any street""" best_point = None best_dist = float('inf') for street in streets: for point in street.geometry: dist = self._distance((lat, lon), point) if dist < best_dist: best_dist = dist best_point = point return best_point if best_dist < 200 else None # Max 200m to street def _next_street_point( self, current: Tuple[float, float], target: Tuple[float, float], streets: List[Street], visited: set ) -> Optional[Tuple[float, float]]: """Find next street point toward target""" best_point = None best_score = float('inf') for street in streets: for i, point in enumerate(street.geometry): if (round(point[0], 5), round(point[1], 5)) in visited: continue dist_from_current = self._distance(current, point) dist_to_target = self._distance(point, target) # Must be close to current position if dist_from_current > 100: continue # Score: prefer points closer to target score = dist_to_target + dist_from_current * 0.5 if score < best_score: best_score = score best_point = point return best_point def _distance(self, p1: Tuple[float, float], p2: Tuple[float, float]) -> float: """Quick distance approximation (meters)""" lat_diff = (p1[0] - p2[0]) * 111000 lon_diff = (p1[1] - p2[1]) * 111000 * np.cos(np.radians(p1[0])) return np.sqrt(lat_diff**2 + lon_diff**2) def _calculate_corner_angle( self, p1: Tuple[float, float], p2: Tuple[float, float], p3: Tuple[float, float] ) -> float: """Calculate angle at corner (degrees)""" v1 = (p1[0] - p2[0], p1[1] - p2[1]) v2 = (p3[0] - p2[0], p3[1] - p2[1]) dot = v1[0] * v2[0] + v1[1] * v2[1] mag1 = np.sqrt(v1[0]**2 + v1[1]**2) mag2 = np.sqrt(v2[0]**2 + v2[1]**2) if mag1 * mag2 < 1e-10: return 180.0 cos_angle = dot / (mag1 * mag2) cos_angle = max(-1, min(1, cos_angle)) return np.degrees(np.arccos(cos_angle)) def _corner_loss(self, angle_degrees: float) -> float: """Calculate loss due to corner/turn""" # Straight = 180 deg, right angle = 90 deg turn_angle = abs(180 - angle_degrees) if turn_angle < 15: return 0.0 elif turn_angle < 45: return self.CORNER_LOSS_45 * (turn_angle / 45) elif turn_angle < 90: return self.CORNER_LOSS_45 + (self.CORNER_LOSS_90 - self.CORNER_LOSS_45) * ((turn_angle - 45) / 45) else: return self.CORNER_LOSS_90 + (turn_angle - 90) * 0.2 # Extra loss for sharp turns def calculate_street_canyon_loss_sync( self, tx_lat: float, tx_lon: float, tx_height: float, rx_lat: float, rx_lon: float, rx_height: float, frequency_mhz: float, streets: List[Street] ) -> Tuple[float, List[Tuple[float, float]]]: """Sync version (no I/O in the async original)""" street_path = self._find_street_path(tx_lat, tx_lon, rx_lat, rx_lon, streets) if not street_path: return float('inf'), [] total_loss = 0.0 total_distance = 0.0 for i in range(len(street_path) - 1): p1 = street_path[i] p2 = street_path[i + 1] from app.services.terrain_service import TerrainService segment_dist = TerrainService.haversine_distance(p1[0], p1[1], p2[0], p2[1]) total_distance += segment_dist if segment_dist > 0: segment_loss = 32.4 + 20 * np.log10(frequency_mhz) + 20 * np.log10(segment_dist / 1000 + 0.001) total_loss += segment_loss * (segment_dist / total_distance) if total_distance > 0 else 0 if i > 0: corner_angle = self._calculate_corner_angle( street_path[i - 1], p1, p2 ) corner_loss = self._corner_loss(corner_angle) total_loss += corner_loss return total_loss, street_path street_canyon_service = StreetCanyonService()