Files
rfcp/RFCP-Iteration-1.4-Advanced-Propagation.md

1575 lines
51 KiB
Markdown

# RFCP Backend - Iteration 1.4: Advanced Propagation Models
**Date:** January 31, 2025
**Type:** Backend Development
**Estimated:** 14-18 hours
**Location:** `/opt/rfcp/backend/`
---
## 🎯 Goal
Implement realistic RF propagation with building materials, dominant path analysis, street canyon model, and reflections. Add UI toggles for model selection.
---
## 📋 Pre-reading
1. `RFCP-Iteration-1.3-Coverage-OSM-Buildings.md` — current state
2. ITU-R P.1411 — Propagation for short-range outdoor
3. ITU-R P.2040 — Building material properties
---
## 📊 Current State
```bash
# Backend 1.3 complete
/opt/rfcp/backend/app/services/
├── terrain_service.py # SRTM elevation ✅
├── los_service.py # Line-of-sight + Fresnel ✅
├── buildings_service.py # OSM buildings fetch ✅
└── coverage_service.py # Basic Okumura-Hata ✅
# Issues:
# - points_with_buildings: 0 (intersection bug)
# - No material consideration
# - No reflections
# - Slow (2 min for 1km radius)
```
---
## 🏗️ Architecture
### Propagation Model Layers
```
┌─────────────────────────────────────────┐
│ RSRP Calculation │
├─────────────────────────────────────────┤
│ Base: Okumura-Hata / COST-231 │
├─────────────────────────────────────────┤
│ [1] Dominant Path - find best 2-3 rays │
├─────────────────────────────────────────┤
│ [2] Materials - wall penetration loss │
├─────────────────────────────────────────┤
│ [3] Street Canyon - signal along roads │
├─────────────────────────────────────────┤
│ [4] Reflections - bounced signals │
├─────────────────────────────────────────┤
│ Terrain (SRTM) + Buildings (OSM) │
└─────────────────────────────────────────┘
```
### Settings Model
```python
class PropagationSettings(BaseModel):
# Existing
use_terrain: bool = True
use_buildings: bool = True
# New toggles
use_materials: bool = True
use_dominant_path: bool = False # Slower but more accurate
use_street_canyon: bool = False # Requires road network
use_reflections: bool = False # Adds reflection paths
# Presets
preset: str = "standard" # fast, standard, detailed, full
# Presets mapping
PRESETS = {
"fast": {
"use_terrain": True,
"use_buildings": False,
"use_materials": False,
"use_dominant_path": False,
"use_street_canyon": False,
"use_reflections": False,
},
"standard": {
"use_terrain": True,
"use_buildings": True,
"use_materials": True,
"use_dominant_path": False,
"use_street_canyon": False,
"use_reflections": False,
},
"detailed": {
"use_terrain": True,
"use_buildings": True,
"use_materials": True,
"use_dominant_path": True,
"use_street_canyon": False,
"use_reflections": False,
},
"full": {
"use_terrain": True,
"use_buildings": True,
"use_materials": True,
"use_dominant_path": True,
"use_street_canyon": True,
"use_reflections": True,
},
}
```
---
## ✅ Tasks
### Task 1: Building Materials Service (2-3 hours)
**app/services/materials_service.py:**
```python
from enum import Enum
from typing import Optional
class BuildingMaterial(Enum):
"""Building materials with RF properties"""
CONCRETE = "concrete"
BRICK = "brick"
GLASS = "glass"
WOOD = "wood"
METAL = "metal"
STONE = "stone"
PLASTER = "plaster"
UNKNOWN = "unknown"
# ITU-R P.2040 based attenuation (dB per wall at 1-3 GHz)
MATERIAL_LOSS = {
BuildingMaterial.CONCRETE: 15.0,
BuildingMaterial.BRICK: 10.0,
BuildingMaterial.GLASS: 3.0,
BuildingMaterial.WOOD: 5.0,
BuildingMaterial.METAL: 25.0, # Or full reflection
BuildingMaterial.STONE: 12.0,
BuildingMaterial.PLASTER: 4.0,
BuildingMaterial.UNKNOWN: 10.0, # Default assumption
}
# Reflection coefficient (0-1, portion of signal reflected)
MATERIAL_REFLECTION = {
BuildingMaterial.CONCRETE: 0.6,
BuildingMaterial.BRICK: 0.5,
BuildingMaterial.GLASS: 0.3,
BuildingMaterial.WOOD: 0.2,
BuildingMaterial.METAL: 0.9,
BuildingMaterial.STONE: 0.55,
BuildingMaterial.PLASTER: 0.3,
BuildingMaterial.UNKNOWN: 0.4,
}
class MaterialsService:
"""Building material detection and RF properties"""
# OSM building:material tag mapping
OSM_MATERIAL_MAP = {
"concrete": BuildingMaterial.CONCRETE,
"brick": BuildingMaterial.BRICK,
"glass": BuildingMaterial.GLASS,
"wood": BuildingMaterial.WOOD,
"metal": BuildingMaterial.METAL,
"steel": BuildingMaterial.METAL,
"stone": BuildingMaterial.STONE,
"plaster": BuildingMaterial.PLASTER,
"cement_block": BuildingMaterial.CONCRETE,
"timber": BuildingMaterial.WOOD,
}
# Fallback by building type
BUILDING_TYPE_MATERIAL = {
"industrial": BuildingMaterial.METAL,
"warehouse": BuildingMaterial.METAL,
"garage": BuildingMaterial.METAL,
"shed": BuildingMaterial.WOOD,
"house": BuildingMaterial.BRICK,
"residential": BuildingMaterial.CONCRETE,
"apartments": BuildingMaterial.CONCRETE,
"commercial": BuildingMaterial.GLASS, # Often glass facades
"office": BuildingMaterial.GLASS,
"retail": BuildingMaterial.GLASS,
"church": BuildingMaterial.STONE,
"cathedral": BuildingMaterial.STONE,
"school": BuildingMaterial.BRICK,
"hospital": BuildingMaterial.CONCRETE,
"university": BuildingMaterial.CONCRETE,
}
def detect_material(self, building_tags: dict) -> BuildingMaterial:
"""Detect building material from OSM tags"""
# Direct material tag
if "building:material" in building_tags:
material_str = building_tags["building:material"].lower()
if material_str in self.OSM_MATERIAL_MAP:
return self.OSM_MATERIAL_MAP[material_str]
# Facade material (often more relevant for RF)
if "building:facade:material" in building_tags:
material_str = building_tags["building:facade:material"].lower()
if material_str in self.OSM_MATERIAL_MAP:
return self.OSM_MATERIAL_MAP[material_str]
# Fallback by building type
building_type = building_tags.get("building", "yes").lower()
if building_type in self.BUILDING_TYPE_MATERIAL:
return self.BUILDING_TYPE_MATERIAL[building_type]
return BuildingMaterial.UNKNOWN
def get_penetration_loss(self, material: BuildingMaterial, frequency_mhz: float = 1800) -> float:
"""
Get RF penetration loss through wall
Frequency correction: +2dB per octave above 1GHz
"""
base_loss = MATERIAL_LOSS[material]
# Frequency correction (simplified)
freq_factor = max(0, (frequency_mhz - 1000) / 1000) * 2
return base_loss + freq_factor
def get_reflection_coefficient(self, material: BuildingMaterial) -> float:
"""Get reflection coefficient (0-1)"""
return MATERIAL_REFLECTION[material]
def get_reflection_loss(self, material: BuildingMaterial) -> float:
"""Get loss due to reflection (dB)"""
coeff = MATERIAL_REFLECTION[material]
if coeff <= 0:
return 30.0 # Effectively no reflection
# Reflection loss in dB = -10 * log10(coefficient)
import math
return -10 * math.log10(coeff)
materials_service = MaterialsService()
```
**Update buildings_service.py:**
```python
# Add to Building model
class Building(BaseModel):
id: int
geometry: List[List[float]]
height: float
levels: Optional[int] = None
building_type: Optional[str] = None
material: Optional[str] = None # NEW
tags: dict = {} # NEW - store all tags for material detection
```
---
### Task 2: Dominant Path Service (4-5 hours)
**app/services/dominant_path_service.py:**
```python
import numpy as np
from typing import List, Tuple, Optional
from dataclasses import dataclass
from app.services.terrain_service import terrain_service
from app.services.buildings_service import buildings_service, Building
from app.services.materials_service import materials_service, BuildingMaterial
@dataclass
class RayPath:
"""Single ray path from TX to RX"""
path_type: str # "direct", "reflected", "diffracted", "street"
total_distance: float # meters
path_loss: float # dB
reflection_points: List[Tuple[float, float]] # [(lat, lon), ...]
materials_crossed: List[BuildingMaterial]
is_valid: bool # Does this path exist?
class DominantPathService:
"""
Find dominant propagation paths (2-3 strongest)
Path types:
1. Direct (LoS if available)
2. Single reflection off building
3. Over-roof diffraction
4. Around-corner diffraction
"""
MAX_REFLECTIONS = 2
MAX_PATHS = 3
async def find_dominant_paths(
self,
tx_lat: float, tx_lon: float, tx_height: float,
rx_lat: float, rx_lon: float, rx_height: float,
frequency_mhz: float,
buildings: List[Building]
) -> List[RayPath]:
"""Find the dominant propagation paths"""
paths = []
# 1. Try direct path
direct = await self._check_direct_path(
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz, buildings
)
if direct:
paths.append(direct)
# 2. Try single-bounce reflections
reflections = await self._find_reflection_paths(
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz, buildings
)
paths.extend(reflections[:2]) # Max 2 reflection paths
# 3. Try over-roof diffraction (if direct blocked)
if not direct or not direct.is_valid:
diffracted = await self._find_diffraction_path(
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz, buildings
)
if diffracted:
paths.append(diffracted)
# Sort by path loss (best first) and return top N
paths.sort(key=lambda p: p.path_loss)
return paths[:self.MAX_PATHS]
async def _check_direct_path(
self,
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz,
buildings: List[Building]
) -> Optional[RayPath]:
"""Check if direct LoS path exists"""
from app.services.los_service import los_service
# Check terrain LoS
los_result = await los_service.check_line_of_sight(
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height
)
if not los_result["has_los"]:
return RayPath(
path_type="direct",
total_distance=los_result.get("distance", 0),
path_loss=float('inf'),
reflection_points=[],
materials_crossed=[],
is_valid=False
)
# Check building intersections
materials_crossed = []
for building in buildings:
intersection = self._line_intersects_building_3d(
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
building
)
if intersection:
material = materials_service.detect_material(building.tags)
materials_crossed.append(material)
# Calculate path loss
distance = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon)
path_loss = self._calculate_path_loss(distance, frequency_mhz, tx_height, rx_height)
# Add material penetration losses
for material in materials_crossed:
path_loss += materials_service.get_penetration_loss(material, frequency_mhz)
return RayPath(
path_type="direct",
total_distance=distance,
path_loss=path_loss,
reflection_points=[],
materials_crossed=materials_crossed,
is_valid=len(materials_crossed) < 3 # Too many walls = not viable
)
async def _find_reflection_paths(
self,
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz,
buildings: List[Building]
) -> List[RayPath]:
"""Find viable single-bounce reflection paths"""
reflection_paths = []
for building in buildings:
# Find potential reflection points on building walls
reflection_point = self._find_reflection_point(
tx_lat, tx_lon, rx_lat, rx_lon, building
)
if not reflection_point:
continue
ref_lat, ref_lon = reflection_point
# Check if both segments are clear
# TX -> Reflection point
dist1 = terrain_service.haversine_distance(tx_lat, tx_lon, ref_lat, ref_lon)
# Reflection point -> RX
dist2 = terrain_service.haversine_distance(ref_lat, ref_lon, rx_lat, rx_lon)
total_distance = dist1 + dist2
# Don't consider if much longer than direct path
direct_distance = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon)
if total_distance > direct_distance * 2:
continue
# Calculate path loss
path_loss = self._calculate_path_loss(total_distance, frequency_mhz, tx_height, rx_height)
# Add reflection loss
material = materials_service.detect_material(building.tags)
path_loss += materials_service.get_reflection_loss(material)
reflection_paths.append(RayPath(
path_type="reflected",
total_distance=total_distance,
path_loss=path_loss,
reflection_points=[(ref_lat, ref_lon)],
materials_crossed=[],
is_valid=True
))
return reflection_paths
async def _find_diffraction_path(
self,
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz,
buildings: List[Building]
) -> Optional[RayPath]:
"""Find over-roof diffraction path"""
# Find highest obstacle between TX and RX
max_height = 0
obstacle_lat, obstacle_lon = None, None
# Sample points along direct path
num_samples = 20
for i in range(1, num_samples - 1):
t = i / num_samples
lat = tx_lat + t * (rx_lat - tx_lat)
lon = tx_lon + t * (rx_lon - tx_lon)
# Check terrain
terrain_elev = await terrain_service.get_elevation(lat, lon)
if terrain_elev > max_height:
max_height = terrain_elev
obstacle_lat, obstacle_lon = lat, lon
# Check buildings at this point
for building in buildings:
if buildings_service.point_in_building(lat, lon, building):
if building.height > max_height:
max_height = building.height
obstacle_lat, obstacle_lon = lat, lon
if not obstacle_lat:
return None
# Calculate diffraction loss (simplified knife-edge)
distance = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon)
# Fresnel parameter
tx_elev = await terrain_service.get_elevation(tx_lat, tx_lon)
rx_elev = await terrain_service.get_elevation(rx_lat, rx_lon)
tx_total = tx_elev + tx_height
rx_total = rx_elev + rx_height
# Height of LoS at obstacle point
d1 = terrain_service.haversine_distance(tx_lat, tx_lon, obstacle_lat, obstacle_lon)
los_height = tx_total + (rx_total - tx_total) * (d1 / distance)
clearance = los_height - max_height
# Knife-edge diffraction loss
diffraction_loss = self._knife_edge_loss(clearance, frequency_mhz, distance, d1)
path_loss = self._calculate_path_loss(distance, frequency_mhz, tx_height, rx_height)
path_loss += diffraction_loss
return RayPath(
path_type="diffracted",
total_distance=distance,
path_loss=path_loss,
reflection_points=[(obstacle_lat, obstacle_lon)],
materials_crossed=[],
is_valid=True
)
def _find_reflection_point(
self,
tx_lat: float, tx_lon: float,
rx_lat: float, rx_lon: float,
building: Building
) -> Optional[Tuple[float, float]]:
"""Find specular reflection point on building wall"""
# Simplified: find closest wall segment and calculate reflection
geometry = building.geometry
best_point = None
best_score = float('inf')
for i in range(len(geometry) - 1):
wall_start = geometry[i]
wall_end = geometry[i + 1]
# Find reflection point on this wall segment
ref_point = self._specular_reflection(
tx_lon, tx_lat, rx_lon, rx_lat,
wall_start[0], wall_start[1],
wall_end[0], wall_end[1]
)
if ref_point:
# Score by total path length
d1 = np.sqrt((ref_point[0] - tx_lon)**2 + (ref_point[1] - tx_lat)**2)
d2 = np.sqrt((ref_point[0] - rx_lon)**2 + (ref_point[1] - rx_lat)**2)
score = d1 + d2
if score < best_score:
best_score = score
best_point = (ref_point[1], ref_point[0]) # Return as (lat, lon)
return best_point
def _specular_reflection(
self,
tx_x, tx_y, rx_x, rx_y,
wall_x1, wall_y1, wall_x2, wall_y2
) -> Optional[Tuple[float, float]]:
"""Calculate specular reflection point on wall segment"""
# Wall vector
wall_dx = wall_x2 - wall_x1
wall_dy = wall_y2 - wall_y1
wall_len = np.sqrt(wall_dx**2 + wall_dy**2)
if wall_len < 1e-10:
return None
# Wall normal
normal_x = -wall_dy / wall_len
normal_y = wall_dx / wall_len
# Mirror TX across wall
# Project TX onto wall
tx_rel_x = tx_x - wall_x1
tx_rel_y = tx_y - wall_y1
dot = tx_rel_x * normal_x + tx_rel_y * normal_y
mirror_x = tx_x - 2 * dot * normal_x
mirror_y = tx_y - 2 * dot * normal_y
# Find intersection of (mirror -> RX) with wall
# Parametric line: mirror + t * (rx - mirror)
dx = rx_x - mirror_x
dy = rx_y - mirror_y
# Wall parametric: wall1 + s * (wall2 - wall1)
denom = dx * wall_dy - dy * wall_dx
if abs(denom) < 1e-10:
return None # Parallel
t = ((wall_x1 - mirror_x) * wall_dy - (wall_y1 - mirror_y) * wall_dx) / denom
s = ((wall_x1 - mirror_x) * dy - (wall_y1 - mirror_y) * dx) / (-denom)
# Check if intersection is on wall segment and between mirror and RX
if 0 <= s <= 1 and 0 <= t <= 1:
ref_x = mirror_x + t * dx
ref_y = mirror_y + t * dy
return (ref_x, ref_y)
return None
def _line_intersects_building_3d(
self,
lat1, lon1, height1,
lat2, lon2, height2,
building: Building
) -> bool:
"""Check if 3D line intersects building volume"""
# Sample along line
for t in np.linspace(0, 1, 20):
lat = lat1 + t * (lat2 - lat1)
lon = lon1 + t * (lon2 - lon1)
height = height1 + t * (height2 - height1)
if buildings_service.point_in_building(lat, lon, building):
if height < building.height:
return True
return False
def _calculate_path_loss(self, distance, frequency_mhz, tx_height, rx_height) -> float:
"""Okumura-Hata path loss"""
d_km = max(distance / 1000, 0.1)
a_hm = (1.1 * np.log10(frequency_mhz) - 0.7) * rx_height - (1.56 * np.log10(frequency_mhz) - 0.8)
L = (69.55 + 26.16 * np.log10(frequency_mhz) - 13.82 * np.log10(tx_height) - a_hm +
(44.9 - 6.55 * np.log10(tx_height)) * np.log10(d_km))
return L
def _knife_edge_loss(self, clearance, frequency_mhz, total_distance, d1) -> float:
"""Knife-edge diffraction loss"""
if clearance >= 0:
return 0.0
wavelength = 300 / frequency_mhz
d2 = total_distance - d1
# Fresnel parameter v
v = abs(clearance) * np.sqrt(2 * (d1 + d2) / (wavelength * d1 * d2))
# Lee's approximation
if v <= -0.78:
return 0
elif v < 0:
return 6.02 + 9.11 * v - 1.27 * v**2
elif v < 2.4:
return 6.02 + 9.11 * v + 1.27 * v**2
else:
return 13 + 20 * np.log10(v)
dominant_path_service = DominantPathService()
```
---
### Task 3: Street Canyon Service (4-5 hours)
**app/services/street_canyon_service.py:**
```python
import numpy as np
from typing import List, Tuple, Optional
from dataclasses import dataclass
import httpx
from pathlib import Path
import json
@dataclass
class Street:
"""Street segment from OSM"""
id: int
name: Optional[str]
geometry: List[Tuple[float, float]] # [(lat, lon), ...]
width: float # meters
highway_type: str # residential, primary, secondary, etc.
class StreetCanyonService:
"""
Street canyon propagation model (ITU-R P.1411)
Signal propagates along streets with reflections from building walls.
Loss increases at corners/turns.
"""
OVERPASS_URL = "https://overpass-api.de/api/interpreter"
# Default street widths by type
STREET_WIDTHS = {
"motorway": 25.0,
"trunk": 20.0,
"primary": 15.0,
"secondary": 12.0,
"tertiary": 10.0,
"residential": 8.0,
"unclassified": 6.0,
"service": 5.0,
"footway": 2.0,
"path": 1.5,
}
# Corner/turn loss
CORNER_LOSS_90 = 10.0 # dB for 90-degree turn
CORNER_LOSS_45 = 4.0 # dB for 45-degree turn
def __init__(self, cache_dir: str = "/opt/rfcp/backend/data/streets"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True, parents=True)
self._cache: dict[str, List[Street]] = {}
async def fetch_streets(
self,
min_lat: float, min_lon: float,
max_lat: float, max_lon: float
) -> List[Street]:
"""Fetch street network from OSM"""
cache_key = f"{min_lat:.3f}_{min_lon:.3f}_{max_lat:.3f}_{max_lon:.3f}"
# Check cache
if cache_key in self._cache:
return self._cache[cache_key]
cache_file = self.cache_dir / f"{cache_key}.json"
if cache_file.exists():
with open(cache_file) as f:
data = json.load(f)
streets = [Street(**s) for s in data]
self._cache[cache_key] = streets
return streets
# Fetch from Overpass
query = f"""
[out:json][timeout:30];
way["highway"]({min_lat},{min_lon},{max_lat},{max_lon});
out body;
>;
out skel qt;
"""
try:
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(self.OVERPASS_URL, data={"data": query})
response.raise_for_status()
data = response.json()
except Exception as e:
print(f"Street fetch error: {e}")
return []
streets = self._parse_streets(data)
# Cache
with open(cache_file, 'w') as f:
json.dump([{
"id": s.id,
"name": s.name,
"geometry": s.geometry,
"width": s.width,
"highway_type": s.highway_type
} for s in streets], f)
self._cache[cache_key] = streets
return streets
def _parse_streets(self, data: dict) -> List[Street]:
"""Parse Overpass response into Street objects"""
nodes = {}
for element in data.get("elements", []):
if element["type"] == "node":
nodes[element["id"]] = (element["lat"], element["lon"])
streets = []
for element in data.get("elements", []):
if element["type"] != "way":
continue
tags = element.get("tags", {})
if "highway" not in tags:
continue
highway_type = tags["highway"]
# Skip non-road types
if highway_type in ["bus_stop", "crossing", "traffic_signals"]:
continue
geometry = []
for node_id in element.get("nodes", []):
if node_id in nodes:
geometry.append(nodes[node_id])
if len(geometry) < 2:
continue
# Get width
width = self._get_street_width(tags)
streets.append(Street(
id=element["id"],
name=tags.get("name"),
geometry=geometry,
width=width,
highway_type=highway_type
))
return streets
def _get_street_width(self, tags: dict) -> float:
"""Estimate street width from OSM tags"""
# Explicit width
if "width" in tags:
try:
return float(tags["width"].replace("m", "").strip())
except:
pass
# Calculate from lanes
if "lanes" in tags:
try:
lanes = int(tags["lanes"])
return lanes * 3.5 # ~3.5m per lane
except:
pass
# Default by highway type
highway_type = tags.get("highway", "residential")
return self.STREET_WIDTHS.get(highway_type, 8.0)
async def calculate_street_canyon_loss(
self,
tx_lat: float, tx_lon: float, tx_height: float,
rx_lat: float, rx_lon: float, rx_height: float,
frequency_mhz: float,
streets: List[Street]
) -> Tuple[float, List[Tuple[float, float]]]:
"""
Calculate path loss through street canyon
Returns:
(path_loss_db, street_path as list of points)
"""
# Find path along streets from TX to RX
street_path = self._find_street_path(tx_lat, tx_lon, rx_lat, rx_lon, streets)
if not street_path:
return float('inf'), [] # No street path found
# Calculate loss along path
total_loss = 0.0
total_distance = 0.0
for i in range(len(street_path) - 1):
p1 = street_path[i]
p2 = street_path[i + 1]
# Segment distance
from app.services.terrain_service import TerrainService
segment_dist = TerrainService.haversine_distance(p1[0], p1[1], p2[0], p2[1])
total_distance += segment_dist
# Street canyon loss (ITU-R P.1411 simplified)
# L = 32.4 + 20*log10(f_MHz) + 20*log10(d_km)
if segment_dist > 0:
segment_loss = 32.4 + 20 * np.log10(frequency_mhz) + 20 * np.log10(segment_dist / 1000 + 0.001)
total_loss += segment_loss * (segment_dist / total_distance) if total_distance > 0 else 0
# Corner loss
if i > 0:
corner_angle = self._calculate_corner_angle(
street_path[i-1], p1, p2
)
corner_loss = self._corner_loss(corner_angle)
total_loss += corner_loss
return total_loss, street_path
def _find_street_path(
self,
start_lat: float, start_lon: float,
end_lat: float, end_lon: float,
streets: List[Street]
) -> List[Tuple[float, float]]:
"""
Find path along streets (simplified A* / greedy)
Returns list of (lat, lon) waypoints
"""
# Find nearest street point to start and end
start_point = self._nearest_street_point(start_lat, start_lon, streets)
end_point = self._nearest_street_point(end_lat, end_lon, streets)
if not start_point or not end_point:
return []
# Simplified: just return direct street segments
# Full implementation would use A* pathfinding
path = [(start_lat, start_lon), start_point]
# Add intermediate points along streets toward destination
current = start_point
visited = set()
for _ in range(50): # Max iterations
if self._distance(current, end_point) < 50: # Within 50m
break
# Find next street segment toward destination
next_point = self._next_street_point(current, end_point, streets, visited)
if not next_point:
break
path.append(next_point)
visited.add((round(current[0], 5), round(current[1], 5)))
current = next_point
path.append(end_point)
path.append((end_lat, end_lon))
return path
def _nearest_street_point(
self,
lat: float, lon: float,
streets: List[Street]
) -> Optional[Tuple[float, float]]:
"""Find nearest point on any street"""
best_point = None
best_dist = float('inf')
for street in streets:
for point in street.geometry:
dist = self._distance((lat, lon), point)
if dist < best_dist:
best_dist = dist
best_point = point
return best_point if best_dist < 200 else None # Max 200m to street
def _next_street_point(
self,
current: Tuple[float, float],
target: Tuple[float, float],
streets: List[Street],
visited: set
) -> Optional[Tuple[float, float]]:
"""Find next street point toward target"""
best_point = None
best_score = float('inf')
for street in streets:
for i, point in enumerate(street.geometry):
if (round(point[0], 5), round(point[1], 5)) in visited:
continue
dist_from_current = self._distance(current, point)
dist_to_target = self._distance(point, target)
# Must be close to current position
if dist_from_current > 100:
continue
# Score: prefer points closer to target
score = dist_to_target + dist_from_current * 0.5
if score < best_score:
best_score = score
best_point = point
return best_point
def _distance(self, p1: Tuple[float, float], p2: Tuple[float, float]) -> float:
"""Quick distance approximation (meters)"""
lat_diff = (p1[0] - p2[0]) * 111000
lon_diff = (p1[1] - p2[1]) * 111000 * np.cos(np.radians(p1[0]))
return np.sqrt(lat_diff**2 + lon_diff**2)
def _calculate_corner_angle(
self,
p1: Tuple[float, float],
p2: Tuple[float, float],
p3: Tuple[float, float]
) -> float:
"""Calculate angle at corner (degrees)"""
v1 = (p1[0] - p2[0], p1[1] - p2[1])
v2 = (p3[0] - p2[0], p3[1] - p2[1])
dot = v1[0] * v2[0] + v1[1] * v2[1]
mag1 = np.sqrt(v1[0]**2 + v1[1]**2)
mag2 = np.sqrt(v2[0]**2 + v2[1]**2)
if mag1 * mag2 < 1e-10:
return 180.0
cos_angle = dot / (mag1 * mag2)
cos_angle = max(-1, min(1, cos_angle))
return np.degrees(np.arccos(cos_angle))
def _corner_loss(self, angle_degrees: float) -> float:
"""Calculate loss due to corner/turn"""
# Straight = 180°, right angle = 90°
turn_angle = abs(180 - angle_degrees)
if turn_angle < 15:
return 0.0
elif turn_angle < 45:
return self.CORNER_LOSS_45 * (turn_angle / 45)
elif turn_angle < 90:
return self.CORNER_LOSS_45 + (self.CORNER_LOSS_90 - self.CORNER_LOSS_45) * ((turn_angle - 45) / 45)
else:
return self.CORNER_LOSS_90 + (turn_angle - 90) * 0.2 # Extra loss for sharp turns
street_canyon_service = StreetCanyonService()
```
---
### Task 4: Reflection Service (3-4 hours)
**app/services/reflection_service.py:**
```python
import numpy as np
from typing import List, Tuple, Optional
from dataclasses import dataclass
from app.services.buildings_service import Building
from app.services.materials_service import materials_service
@dataclass
class ReflectionPath:
"""A reflection path with one or more bounces"""
points: List[Tuple[float, float]] # [TX, reflection1, reflection2, ..., RX]
total_distance: float
total_loss: float
reflection_count: int
materials: List[str]
class ReflectionService:
"""
Calculate reflection paths for RF propagation
- Single bounce (most common)
- Double bounce (around corners)
- Ground reflection
"""
MAX_BOUNCES = 2
GROUND_REFLECTION_COEFF = 0.3 # Depends on surface
async def find_reflection_paths(
self,
tx_lat: float, tx_lon: float, tx_height: float,
rx_lat: float, rx_lon: float, rx_height: float,
frequency_mhz: float,
buildings: List[Building],
include_ground: bool = True
) -> List[ReflectionPath]:
"""Find all viable reflection paths"""
paths = []
# Single-bounce building reflections
for building in buildings:
path = self._find_single_bounce(
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz, building
)
if path:
paths.append(path)
# Ground reflection
if include_ground:
ground_path = self._calculate_ground_reflection(
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz
)
if ground_path:
paths.append(ground_path)
# Sort by loss (best first)
paths.sort(key=lambda p: p.total_loss)
return paths[:5] # Return top 5
def _find_single_bounce(
self,
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz,
building: Building
) -> Optional[ReflectionPath]:
"""Find single-bounce reflection off building"""
# Find reflection point on building walls
geometry = building.geometry
for i in range(len(geometry) - 1):
wall_start = geometry[i]
wall_end = geometry[i + 1]
ref_point = self._specular_reflection_point(
(tx_lon, tx_lat), (rx_lon, rx_lat),
wall_start, wall_end
)
if not ref_point:
continue
ref_lat, ref_lon = ref_point[1], ref_point[0]
# Calculate distances
from app.services.terrain_service import TerrainService
d1 = TerrainService.haversine_distance(tx_lat, tx_lon, ref_lat, ref_lon)
d2 = TerrainService.haversine_distance(ref_lat, ref_lon, rx_lat, rx_lon)
total_dist = d1 + d2
# Direct distance check - reflection shouldn't be much longer
direct_dist = TerrainService.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon)
if total_dist > direct_dist * 1.5:
continue
# Path loss
path_loss = self._free_space_loss(total_dist, frequency_mhz)
# Reflection loss
material = materials_service.detect_material(building.tags)
reflection_loss = materials_service.get_reflection_loss(material)
total_loss = path_loss + reflection_loss
return ReflectionPath(
points=[(tx_lat, tx_lon), (ref_lat, ref_lon), (rx_lat, rx_lon)],
total_distance=total_dist,
total_loss=total_loss,
reflection_count=1,
materials=[material.value]
)
return None
def _calculate_ground_reflection(
self,
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz
) -> Optional[ReflectionPath]:
"""Calculate ground reflection path"""
from app.services.terrain_service import TerrainService
# Reflection point (simplified - midpoint for flat ground)
mid_lat = (tx_lat + rx_lat) / 2
mid_lon = (tx_lon + rx_lon) / 2
# Path lengths
d1 = TerrainService.haversine_distance(tx_lat, tx_lon, mid_lat, mid_lon)
d2 = TerrainService.haversine_distance(mid_lat, mid_lon, rx_lat, rx_lon)
# Actual path length considering heights
path1 = np.sqrt(d1**2 + tx_height**2)
path2 = np.sqrt(d2**2 + rx_height**2)
total_dist = path1 + path2
# Path loss
path_loss = self._free_space_loss(total_dist, frequency_mhz)
# Ground reflection loss (~5-10 dB typically)
ground_reflection_loss = -10 * np.log10(self.GROUND_REFLECTION_COEFF)
# Phase difference can cause constructive or destructive interference
# Simplified: assume average case
total_loss = path_loss + ground_reflection_loss
return ReflectionPath(
points=[(tx_lat, tx_lon), (mid_lat, mid_lon), (rx_lat, rx_lon)],
total_distance=total_dist,
total_loss=total_loss,
reflection_count=1,
materials=["ground"]
)
def _specular_reflection_point(
self,
tx: Tuple[float, float], # (lon, lat)
rx: Tuple[float, float],
wall_start: List[float], # [lon, lat]
wall_end: List[float]
) -> Optional[Tuple[float, float]]:
"""Calculate specular reflection point on wall"""
# Wall vector
wx = wall_end[0] - wall_start[0]
wy = wall_end[1] - wall_start[1]
wall_len = np.sqrt(wx**2 + wy**2)
if wall_len < 1e-10:
return None
# Normalize
wx /= wall_len
wy /= wall_len
# Wall normal (perpendicular)
nx = -wy
ny = wx
# Vector from wall start to TX
tx_rel_x = tx[0] - wall_start[0]
tx_rel_y = tx[1] - wall_start[1]
# Distance from TX to wall line
dist_to_wall = tx_rel_x * nx + tx_rel_y * ny
# Mirror TX across wall
mirror_x = tx[0] - 2 * dist_to_wall * nx
mirror_y = tx[1] - 2 * dist_to_wall * ny
# Line from mirror to RX
dx = rx[0] - mirror_x
dy = rx[1] - mirror_y
# Find intersection with wall
# Parametric: wall_start + t * wall_dir
# Parametric: mirror + s * (rx - mirror)
denom = dx * wy - dy * wx
if abs(denom) < 1e-10:
return None
t = ((wall_start[0] - mirror_x) * wy - (wall_start[1] - mirror_y) * wx) / denom
s = ((wall_start[0] - mirror_x) * dy - (wall_start[1] - mirror_y) * dx) / (-denom) if denom != 0 else 0
# Check if on wall segment and between mirror and RX
if 0 <= s <= 1 and 0 <= t <= 1:
ref_x = mirror_x + t * dx
ref_y = mirror_y + t * dy
return (ref_x, ref_y)
return None
def _free_space_loss(self, distance: float, frequency_mhz: float) -> float:
"""Free space path loss (dB)"""
if distance <= 0:
distance = 1
# FSPL = 20*log10(d) + 20*log10(f) + 20*log10(4*pi/c)
# Simplified: FSPL = 32.45 + 20*log10(f_MHz) + 20*log10(d_km)
d_km = distance / 1000
return 32.45 + 20 * np.log10(frequency_mhz) + 20 * np.log10(d_km + 0.001)
def combine_paths(
self,
direct_power_dbm: float,
reflection_paths: List[ReflectionPath],
tx_power_dbm: float
) -> float:
"""
Combine direct and reflected signals (power sum)
Returns total received power in dBm
"""
# Convert to linear power
powers = []
if direct_power_dbm > -150: # Valid direct signal
powers.append(10 ** (direct_power_dbm / 10))
for path in reflection_paths:
reflected_power_dbm = tx_power_dbm - path.total_loss
if reflected_power_dbm > -150:
powers.append(10 ** (reflected_power_dbm / 10))
if not powers:
return -150.0 # No signal
# Sum powers (incoherent addition - conservative estimate)
total_power = sum(powers)
return 10 * np.log10(total_power)
reflection_service = ReflectionService()
```
---
### Task 5: Update Coverage Service (2-3 hours)
**Update app/services/coverage_service.py:**
```python
# Add imports
from app.services.materials_service import materials_service
from app.services.dominant_path_service import dominant_path_service
from app.services.street_canyon_service import street_canyon_service
from app.services.reflection_service import reflection_service
# Update CoverageSettings
class CoverageSettings(BaseModel):
radius: float = 10000
resolution: float = 200
min_signal: float = -120
# Layer toggles
use_terrain: bool = True
use_buildings: bool = True
use_materials: bool = True
use_dominant_path: bool = False
use_street_canyon: bool = False
use_reflections: bool = False
# Preset
preset: Optional[str] = None # fast, standard, detailed, full
# Add preset application
PRESETS = {
"fast": {...},
"standard": {...},
"detailed": {...},
"full": {...},
}
def apply_preset(settings: CoverageSettings) -> CoverageSettings:
if settings.preset and settings.preset in PRESETS:
for key, value in PRESETS[settings.preset].items():
setattr(settings, key, value)
return settings
# Update _calculate_point method
async def _calculate_point(self, site, lat, lon, settings, buildings, streets):
"""Enhanced point calculation with all propagation models"""
# ... existing code ...
# Material-aware building loss
if settings.use_buildings and settings.use_materials:
for building in buildings:
if self._intersects_building(site, lat, lon, building):
material = materials_service.detect_material(building.tags)
building_loss += materials_service.get_penetration_loss(
material, site.frequency
)
# Dominant path (find best route)
if settings.use_dominant_path:
paths = await dominant_path_service.find_dominant_paths(
site.lat, site.lon, site.height,
lat, lon, 1.5,
site.frequency, buildings
)
if paths:
best_path = paths[0]
# Use best path's loss instead of simple calculation
path_loss = best_path.path_loss
# Street canyon
if settings.use_street_canyon and streets:
canyon_loss, street_path = await street_canyon_service.calculate_street_canyon_loss(
site.lat, site.lon, site.height,
lat, lon, 1.5,
site.frequency, streets
)
# Use canyon loss if better than direct
if canyon_loss < path_loss + terrain_loss + building_loss:
path_loss = canyon_loss
terrain_loss = 0
building_loss = 0
# Reflections
reflection_gain = 0.0
if settings.use_reflections:
reflection_paths = await reflection_service.find_reflection_paths(
site.lat, site.lon, site.height,
lat, lon, 1.5,
site.frequency, buildings
)
if reflection_paths:
# Combine direct and reflected
direct_rsrp = site.power + site.gain - path_loss - terrain_loss - building_loss
combined_rsrp = reflection_service.combine_paths(
direct_rsrp, reflection_paths, site.power + site.gain
)
reflection_gain = combined_rsrp - direct_rsrp
# Final RSRP
rsrp = site.power + site.gain - path_loss - antenna_loss - terrain_loss - building_loss + reflection_gain
return CoveragePoint(
lat=lat, lon=lon, rsrp=rsrp,
distance=distance, has_los=has_los,
terrain_loss=terrain_loss,
building_loss=building_loss,
reflection_gain=reflection_gain # NEW
)
```
---
### Task 6: Update API & Add Presets Endpoint (1-2 hours)
**Update app/api/routes/coverage.py:**
```python
@router.get("/presets")
async def get_presets():
"""Get available propagation model presets"""
return {
"presets": {
"fast": {
"description": "Quick calculation - terrain only",
"use_terrain": True,
"use_buildings": False,
"use_materials": False,
"use_dominant_path": False,
"use_street_canyon": False,
"use_reflections": False,
"estimated_speed": "~5 seconds for 5km radius"
},
"standard": {
"description": "Balanced - terrain + buildings with materials",
"use_terrain": True,
"use_buildings": True,
"use_materials": True,
"use_dominant_path": False,
"use_street_canyon": False,
"use_reflections": False,
"estimated_speed": "~30 seconds for 5km radius"
},
"detailed": {
"description": "Accurate - adds dominant path analysis",
"use_terrain": True,
"use_buildings": True,
"use_materials": True,
"use_dominant_path": True,
"use_street_canyon": False,
"use_reflections": False,
"estimated_speed": "~2 minutes for 5km radius"
},
"full": {
"description": "Maximum realism - all models enabled",
"use_terrain": True,
"use_buildings": True,
"use_materials": True,
"use_dominant_path": True,
"use_street_canyon": True,
"use_reflections": True,
"estimated_speed": "~5 minutes for 5km radius"
}
}
}
# Update CoverageResponse
class CoverageResponse(BaseModel):
points: List[CoveragePoint]
count: int
settings: CoverageSettings
stats: dict
computation_time: float # NEW - seconds
models_used: List[str] # NEW - which models were active
```
---
## 📁 Files Summary
### New Files:
```
app/services/
├── materials_service.py # Building material RF properties
├── dominant_path_service.py # Multi-path ray analysis
├── street_canyon_service.py # Street network propagation
└── reflection_service.py # Reflection calculations
data/streets/ # Cached street network
```
### Modified Files:
```
app/services/
├── buildings_service.py # Add material & tags fields
└── coverage_service.py # Integrate all models
app/api/routes/
└── coverage.py # Add presets endpoint, timing
app/models/
└── coverage.py # Extended settings model
```
---
## ✅ Success Criteria
- [ ] `/api/coverage/presets` returns all 4 presets with descriptions
- [ ] `preset: "full"` enables all toggles
- [ ] Material detection working (check logs for detected materials)
- [ ] Reflection paths found (check `reflection_gain` in response)
- [ ] Street canyon reduces loss along roads
- [ ] `computation_time` reported in response
- [ ] `models_used` shows active models
- [ ] Performance: "fast" < 10s, "standard" < 1min, "detailed" < 3min, "full" < 10min for 5km
---
## 🧪 Test Commands
```bash
# Test presets endpoint
curl https://api.rfcp.eliah.one/api/coverage/presets | jq
# Fast preset
curl -X POST "https://api.rfcp.eliah.one/api/coverage/calculate" \
-H "Content-Type: application/json" \
-d '{"sites":[{"lat":48.46,"lon":35.05,"height":30,"power":43,"gain":15,"frequency":1800}],"settings":{"radius":2000,"resolution":100,"preset":"fast"}}' | jq '.stats, .computation_time, .models_used'
# Full preset (will be slow!)
curl -X POST "https://api.rfcp.eliah.one/api/coverage/calculate" \
-H "Content-Type: application/json" \
-d '{"sites":[{"lat":48.46,"lon":35.05,"height":30,"power":43,"gain":15,"frequency":1800}],"settings":{"radius":1000,"resolution":100,"preset":"full"}}' | jq '.stats, .computation_time'
# Check material detection
curl "https://api.rfcp.eliah.one/api/coverage/buildings?min_lat=48.455&min_lon=35.045&max_lat=48.465&max_lon=35.055" | jq '.buildings[:3] | .[].material'
```
---
## 📝 Notes
- Street canyon requires road network fetch (additional Overpass query)
- Reflection calculations are CPU-intensive — consider caching
- Full model on large areas may timeout — implement background tasks later (1.4.1)
- Material detection fallback chain: explicit tag → facade tag → building type → default
---
## 🔜 Next Iterations
**1.4.1 — Enhanced Environment (Should have):**
- Spatial indexing (R-tree) for buildings
- Ground reflection improvements
- Water body reflection (OSM natural=water)
- Vegetation loss (OSM landuse=forest)
**1.4.2 — Extra Factors (Could have):**
- Weather/rain attenuation (ITU-R P.838)
- Indoor penetration layer
- Seasonal vegetation toggle
---
**Ready for Claude Code** 🚀