@mytec: 1.4iter ready for testing

This commit is contained in:
2026-01-31 00:59:30 +02:00
parent 1ffac9f510
commit 61e113965c
8 changed files with 1398 additions and 36 deletions

View File

@@ -1,3 +1,5 @@
import time
from fastapi import APIRouter, HTTPException, BackgroundTasks
from typing import List, Optional
from pydantic import BaseModel
@@ -5,7 +7,9 @@ from app.services.coverage_service import (
coverage_service,
CoverageSettings,
SiteParams,
CoveragePoint
CoveragePoint,
apply_preset,
PRESETS,
)
router = APIRouter()
@@ -23,6 +27,8 @@ class CoverageResponse(BaseModel):
count: int
settings: CoverageSettings
stats: dict
computation_time: float # seconds
models_used: List[str] # which models were active
@router.post("/calculate")
@@ -30,7 +36,8 @@ async def calculate_coverage(request: CoverageRequest) -> CoverageResponse:
"""
Calculate RF coverage for one or more sites
Returns grid of RSRP values with terrain and building effects
Returns grid of RSRP values with terrain and building effects.
Supports propagation model presets: fast, standard, detailed, full.
"""
if not request.sites:
raise HTTPException(400, "At least one site required")
@@ -45,6 +52,13 @@ async def calculate_coverage(request: CoverageRequest) -> CoverageResponse:
if request.settings.resolution < 50:
raise HTTPException(400, "Minimum resolution 50m")
# Apply preset and determine active models
effective_settings = apply_preset(request.settings.model_copy())
models_used = _get_active_models(effective_settings)
# Time the calculation
start_time = time.time()
# Calculate
if len(request.sites) == 1:
points = await coverage_service.calculate_coverage(
@@ -57,6 +71,8 @@ async def calculate_coverage(request: CoverageRequest) -> CoverageResponse:
request.settings
)
computation_time = time.time() - start_time
# Calculate stats
rsrp_values = [p.rsrp for p in points]
los_count = sum(1 for p in points if p.has_los)
@@ -68,16 +84,48 @@ async def calculate_coverage(request: CoverageRequest) -> CoverageResponse:
"los_percentage": (los_count / len(points) * 100) if points else 0,
"points_with_buildings": sum(1 for p in points if p.building_loss > 0),
"points_with_terrain_loss": sum(1 for p in points if p.terrain_loss > 0),
"points_with_reflection_gain": sum(1 for p in points if p.reflection_gain > 0),
}
return CoverageResponse(
points=points,
count=len(points),
settings=request.settings,
stats=stats
settings=effective_settings,
stats=stats,
computation_time=round(computation_time, 2),
models_used=models_used
)
@router.get("/presets")
async def get_presets():
"""Get available propagation model presets"""
return {
"presets": {
"fast": {
"description": "Quick calculation - terrain only",
**PRESETS["fast"],
"estimated_speed": "~5 seconds for 5km radius"
},
"standard": {
"description": "Balanced - terrain + buildings with materials",
**PRESETS["standard"],
"estimated_speed": "~30 seconds for 5km radius"
},
"detailed": {
"description": "Accurate - adds dominant path analysis",
**PRESETS["detailed"],
"estimated_speed": "~2 minutes for 5km radius"
},
"full": {
"description": "Maximum realism - all models enabled",
**PRESETS["full"],
"estimated_speed": "~5 minutes for 5km radius"
}
}
}
@router.get("/buildings")
async def get_buildings(
min_lat: float,
@@ -102,3 +150,23 @@ async def get_buildings(
"count": len(buildings),
"buildings": [b.model_dump() for b in buildings]
}
def _get_active_models(settings: CoverageSettings) -> List[str]:
"""Determine which propagation models are active"""
models = ["okumura_hata"] # Always active as base model
if settings.use_terrain:
models.append("terrain_los")
if settings.use_buildings:
models.append("buildings")
if settings.use_materials:
models.append("materials")
if settings.use_dominant_path:
models.append("dominant_path")
if settings.use_street_canyon:
models.append("street_canyon")
if settings.use_reflections:
models.append("reflections")
return models

View File

@@ -17,7 +17,7 @@ async def lifespan(app: FastAPI):
app = FastAPI(
title="RFCP Backend API",
description="RF Coverage Planning Backend",
version="1.3.0",
version="1.4.0",
lifespan=lifespan,
)
@@ -39,7 +39,7 @@ app.include_router(coverage.router, prefix="/api/coverage", tags=["coverage"])
@app.get("/")
async def root():
return {"message": "RFCP Backend API", "version": "1.3.0"}
return {"message": "RFCP Backend API", "version": "1.4.0"}
if __name__ == "__main__":

View File

@@ -15,6 +15,8 @@ class Building(BaseModel):
height: float # meters
levels: Optional[int] = None
building_type: Optional[str] = None
material: Optional[str] = None # Detected material type
tags: dict = {} # Store all OSM tags for material detection
class BuildingsService:
@@ -144,12 +146,21 @@ class BuildingsService:
# Estimate height
height = self._estimate_height(tags)
# Detect material from tags
material_str = None
if "building:material" in tags:
material_str = tags["building:material"]
elif "building:facade:material" in tags:
material_str = tags["building:facade:material"]
buildings.append(Building(
id=element["id"],
geometry=geometry,
height=height,
levels=int(tags.get("building:levels", 0)) or None,
building_type=tags.get("building")
building_type=tags.get("building"),
material=material_str,
tags=tags
))
return buildings

View File

@@ -5,6 +5,10 @@ from pydantic import BaseModel
from app.services.terrain_service import terrain_service, TerrainService
from app.services.los_service import los_service
from app.services.buildings_service import buildings_service, Building
from app.services.materials_service import materials_service
from app.services.dominant_path_service import dominant_path_service
from app.services.street_canyon_service import street_canyon_service, Street
from app.services.reflection_service import reflection_service
class CoveragePoint(BaseModel):
@@ -15,14 +19,69 @@ class CoveragePoint(BaseModel):
has_los: bool
terrain_loss: float # dB
building_loss: float # dB
reflection_gain: float = 0.0 # dB (NEW)
class CoverageSettings(BaseModel):
radius: float = 10000 # meters
resolution: float = 200 # meters
min_signal: float = -120 # dBm threshold
# Layer toggles
use_terrain: bool = True
use_buildings: bool = True
use_materials: bool = True
use_dominant_path: bool = False
use_street_canyon: bool = False
use_reflections: bool = False
# Preset
preset: Optional[str] = None # fast, standard, detailed, full
# Propagation model presets
PRESETS = {
"fast": {
"use_terrain": True,
"use_buildings": False,
"use_materials": False,
"use_dominant_path": False,
"use_street_canyon": False,
"use_reflections": False,
},
"standard": {
"use_terrain": True,
"use_buildings": True,
"use_materials": True,
"use_dominant_path": False,
"use_street_canyon": False,
"use_reflections": False,
},
"detailed": {
"use_terrain": True,
"use_buildings": True,
"use_materials": True,
"use_dominant_path": True,
"use_street_canyon": False,
"use_reflections": False,
},
"full": {
"use_terrain": True,
"use_buildings": True,
"use_materials": True,
"use_dominant_path": True,
"use_street_canyon": True,
"use_reflections": True,
},
}
def apply_preset(settings: CoverageSettings) -> CoverageSettings:
"""Apply preset configuration to settings"""
if settings.preset and settings.preset in PRESETS:
for key, value in PRESETS[settings.preset].items():
setattr(settings, key, value)
return settings
class SiteParams(BaseModel):
@@ -38,7 +97,8 @@ class SiteParams(BaseModel):
class CoverageService:
"""
RF Coverage calculation with terrain and buildings
RF Coverage calculation with terrain, buildings, materials,
dominant path, street canyon, and reflections
"""
EARTH_RADIUS = 6371000
@@ -58,6 +118,9 @@ class CoverageService:
Returns list of CoveragePoint with RSRP values
"""
# Apply preset if specified
settings = apply_preset(settings)
points = []
# Generate grid
@@ -67,23 +130,31 @@ class CoverageService:
settings.resolution
)
# Fetch buildings for coverage area (if enabled)
buildings = []
if settings.use_buildings:
# Calculate bbox with margin
lat_delta = settings.radius / 111000 # ~111km per degree
lon_delta = settings.radius / (111000 * np.cos(np.radians(site.lat)))
# Calculate bbox for data fetching
lat_delta = settings.radius / 111000
lon_delta = settings.radius / (111000 * np.cos(np.radians(site.lat)))
# Fetch buildings for coverage area (if enabled)
buildings: List[Building] = []
if settings.use_buildings:
buildings = await self.buildings.fetch_buildings(
site.lat - lat_delta, site.lon - lon_delta,
site.lat + lat_delta, site.lon + lon_delta
)
# Fetch streets (if street canyon enabled)
streets: List[Street] = []
if settings.use_street_canyon:
streets = await street_canyon_service.fetch_streets(
site.lat - lat_delta, site.lon - lon_delta,
site.lat + lat_delta, site.lon + lon_delta
)
# Calculate coverage for each point
for lat, lon in grid:
point = await self._calculate_point(
site, lat, lon,
settings, buildings
settings, buildings, streets
)
if point.rsrp >= settings.min_signal:
@@ -103,6 +174,9 @@ class CoverageService:
if not sites:
return []
# Apply preset once
settings = apply_preset(settings)
# Get all individual coverages
all_coverages = await asyncio.gather(*[
self.calculate_coverage(site, settings)
@@ -155,9 +229,10 @@ class CoverageService:
site: SiteParams,
lat: float, lon: float,
settings: CoverageSettings,
buildings: List[Building]
buildings: List[Building],
streets: List[Street]
) -> CoveragePoint:
"""Calculate RSRP at a single point"""
"""Calculate RSRP at a single point with all propagation models"""
# Distance
distance = TerrainService.haversine_distance(site.lat, site.lon, lat, lon)
@@ -194,25 +269,85 @@ class CoverageService:
clearance = los_result["clearance"]
terrain_loss = self._diffraction_loss(clearance, site.frequency)
# Building loss
# Building loss (with optional material awareness)
building_loss = 0.0
if settings.use_buildings and buildings:
for building in buildings:
intersection = self.buildings.line_intersects_building(
site.lat, site.lon, site.height + await self.terrain.get_elevation(site.lat, site.lon),
lat, lon, 1.5 + await self.terrain.get_elevation(lat, lon),
building
)
if intersection is not None:
# Building penetration loss (~20dB for concrete)
building_loss += 20.0
has_los = False
break # One building is enough
if settings.use_materials:
# Material-aware building loss
for building in buildings:
intersection = self.buildings.line_intersects_building(
site.lat, site.lon, site.height + await self.terrain.get_elevation(site.lat, site.lon),
lat, lon, 1.5 + await self.terrain.get_elevation(lat, lon),
building
)
if intersection is not None:
material = materials_service.detect_material(building.tags)
building_loss += materials_service.get_penetration_loss(
material, site.frequency
)
has_los = False
break # One building is enough
else:
# Simple building loss (legacy behavior)
for building in buildings:
intersection = self.buildings.line_intersects_building(
site.lat, site.lon, site.height + await self.terrain.get_elevation(site.lat, site.lon),
lat, lon, 1.5 + await self.terrain.get_elevation(lat, lon),
building
)
if intersection is not None:
building_loss += 20.0 # Default concrete
has_los = False
break
# Calculate RSRP
# RSRP = Tx Power + Tx Gain - Path Loss - Antenna Loss - Terrain Loss - Building Loss
rsrp = site.power + site.gain - path_loss - antenna_loss - terrain_loss - building_loss
# Dominant path analysis (find best route)
if settings.use_dominant_path and buildings:
paths = await dominant_path_service.find_dominant_paths(
site.lat, site.lon, site.height,
lat, lon, 1.5,
site.frequency, buildings
)
if paths:
best_path = paths[0]
# Use best path's loss if it's better
if best_path.is_valid and best_path.path_loss < (path_loss + terrain_loss + building_loss):
path_loss = best_path.path_loss
terrain_loss = 0
building_loss = 0
has_los = best_path.path_type == "direct" and not best_path.materials_crossed
# Street canyon model
if settings.use_street_canyon and streets:
canyon_loss, street_path = await street_canyon_service.calculate_street_canyon_loss(
site.lat, site.lon, site.height,
lat, lon, 1.5,
site.frequency, streets
)
# Use canyon loss if better than current total
if canyon_loss < (path_loss + terrain_loss + building_loss):
path_loss = canyon_loss
terrain_loss = 0
building_loss = 0
# Reflections
reflection_gain = 0.0
if settings.use_reflections and buildings:
reflection_paths = await reflection_service.find_reflection_paths(
site.lat, site.lon, site.height,
lat, lon, 1.5,
site.frequency, buildings
)
if reflection_paths:
# Combine direct and reflected signals
direct_rsrp = site.power + site.gain - path_loss - antenna_loss - terrain_loss - building_loss
combined_rsrp = reflection_service.combine_paths(
direct_rsrp, reflection_paths, site.power + site.gain
)
reflection_gain = max(0, combined_rsrp - direct_rsrp)
# Final RSRP
rsrp = site.power + site.gain - path_loss - antenna_loss - terrain_loss - building_loss + reflection_gain
return CoveragePoint(
lat=lat,
@@ -221,7 +356,8 @@ class CoverageService:
distance=distance,
has_los=has_los,
terrain_loss=terrain_loss,
building_loss=building_loss
building_loss=building_loss,
reflection_gain=reflection_gain
)
def _okumura_hata(
@@ -311,9 +447,6 @@ class CoverageService:
return 0.0 # No obstruction
# Fresnel parameter approximation
# v ~ clearance * sqrt(2 / (lambda * d))
# Simplified: use clearance directly
v = abs(clearance) / 10 # Normalize
# Knife-edge loss approximation

View File

@@ -0,0 +1,394 @@
import numpy as np
from typing import List, Tuple, Optional
from dataclasses import dataclass
from app.services.terrain_service import terrain_service
from app.services.buildings_service import buildings_service, Building
from app.services.materials_service import materials_service, BuildingMaterial
@dataclass
class RayPath:
"""Single ray path from TX to RX"""
path_type: str # "direct", "reflected", "diffracted", "street"
total_distance: float # meters
path_loss: float # dB
reflection_points: List[Tuple[float, float]] # [(lat, lon), ...]
materials_crossed: List[BuildingMaterial]
is_valid: bool # Does this path exist?
class DominantPathService:
"""
Find dominant propagation paths (2-3 strongest)
Path types:
1. Direct (LoS if available)
2. Single reflection off building
3. Over-roof diffraction
4. Around-corner diffraction
"""
MAX_REFLECTIONS = 2
MAX_PATHS = 3
async def find_dominant_paths(
self,
tx_lat: float, tx_lon: float, tx_height: float,
rx_lat: float, rx_lon: float, rx_height: float,
frequency_mhz: float,
buildings: List[Building]
) -> List[RayPath]:
"""Find the dominant propagation paths"""
paths = []
# 1. Try direct path
direct = await self._check_direct_path(
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz, buildings
)
if direct:
paths.append(direct)
# 2. Try single-bounce reflections
reflections = await self._find_reflection_paths(
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz, buildings
)
paths.extend(reflections[:2]) # Max 2 reflection paths
# 3. Try over-roof diffraction (if direct blocked)
if not direct or not direct.is_valid:
diffracted = await self._find_diffraction_path(
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz, buildings
)
if diffracted:
paths.append(diffracted)
# Sort by path loss (best first) and return top N
paths.sort(key=lambda p: p.path_loss)
return paths[:self.MAX_PATHS]
async def _check_direct_path(
self,
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz,
buildings: List[Building]
) -> Optional[RayPath]:
"""Check if direct LoS path exists"""
from app.services.los_service import los_service
# Check terrain LoS
los_result = await los_service.check_line_of_sight(
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height
)
if not los_result["has_los"]:
distance = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon)
return RayPath(
path_type="direct",
total_distance=distance,
path_loss=float('inf'),
reflection_points=[],
materials_crossed=[],
is_valid=False
)
# Check building intersections
materials_crossed = []
for building in buildings:
intersection = self._line_intersects_building_3d(
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
building
)
if intersection:
material = materials_service.detect_material(building.tags)
materials_crossed.append(material)
# Calculate path loss
distance = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon)
path_loss = self._calculate_path_loss(distance, frequency_mhz, tx_height, rx_height)
# Add material penetration losses
for material in materials_crossed:
path_loss += materials_service.get_penetration_loss(material, frequency_mhz)
return RayPath(
path_type="direct",
total_distance=distance,
path_loss=path_loss,
reflection_points=[],
materials_crossed=materials_crossed,
is_valid=len(materials_crossed) < 3 # Too many walls = not viable
)
async def _find_reflection_paths(
self,
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz,
buildings: List[Building]
) -> List[RayPath]:
"""Find viable single-bounce reflection paths"""
reflection_paths = []
for building in buildings:
# Find potential reflection points on building walls
reflection_point = self._find_reflection_point(
tx_lat, tx_lon, rx_lat, rx_lon, building
)
if not reflection_point:
continue
ref_lat, ref_lon = reflection_point
# Check if both segments are clear
# TX -> Reflection point
dist1 = terrain_service.haversine_distance(tx_lat, tx_lon, ref_lat, ref_lon)
# Reflection point -> RX
dist2 = terrain_service.haversine_distance(ref_lat, ref_lon, rx_lat, rx_lon)
total_distance = dist1 + dist2
# Don't consider if much longer than direct path
direct_distance = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon)
if total_distance > direct_distance * 2:
continue
# Calculate path loss
path_loss = self._calculate_path_loss(total_distance, frequency_mhz, tx_height, rx_height)
# Add reflection loss
material = materials_service.detect_material(building.tags)
path_loss += materials_service.get_reflection_loss(material)
reflection_paths.append(RayPath(
path_type="reflected",
total_distance=total_distance,
path_loss=path_loss,
reflection_points=[(ref_lat, ref_lon)],
materials_crossed=[],
is_valid=True
))
return reflection_paths
async def _find_diffraction_path(
self,
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz,
buildings: List[Building]
) -> Optional[RayPath]:
"""Find over-roof diffraction path"""
# Find highest obstacle between TX and RX
max_height = 0
obstacle_lat, obstacle_lon = None, None
# Sample points along direct path
num_samples = 20
for i in range(1, num_samples - 1):
t = i / num_samples
lat = tx_lat + t * (rx_lat - tx_lat)
lon = tx_lon + t * (rx_lon - tx_lon)
# Check terrain
terrain_elev = await terrain_service.get_elevation(lat, lon)
if terrain_elev > max_height:
max_height = terrain_elev
obstacle_lat, obstacle_lon = lat, lon
# Check buildings at this point
for building in buildings:
if buildings_service.point_in_building(lat, lon, building):
if building.height > max_height:
max_height = building.height
obstacle_lat, obstacle_lon = lat, lon
if not obstacle_lat:
return None
# Calculate diffraction loss (simplified knife-edge)
distance = terrain_service.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon)
# Fresnel parameter
tx_elev = await terrain_service.get_elevation(tx_lat, tx_lon)
rx_elev = await terrain_service.get_elevation(rx_lat, rx_lon)
tx_total = tx_elev + tx_height
rx_total = rx_elev + rx_height
# Height of LoS at obstacle point
d1 = terrain_service.haversine_distance(tx_lat, tx_lon, obstacle_lat, obstacle_lon)
los_height = tx_total + (rx_total - tx_total) * (d1 / distance) if distance > 0 else tx_total
clearance = los_height - max_height
# Knife-edge diffraction loss
diffraction_loss = self._knife_edge_loss(clearance, frequency_mhz, distance, d1)
path_loss = self._calculate_path_loss(distance, frequency_mhz, tx_height, rx_height)
path_loss += diffraction_loss
return RayPath(
path_type="diffracted",
total_distance=distance,
path_loss=path_loss,
reflection_points=[(obstacle_lat, obstacle_lon)],
materials_crossed=[],
is_valid=True
)
def _find_reflection_point(
self,
tx_lat: float, tx_lon: float,
rx_lat: float, rx_lon: float,
building: Building
) -> Optional[Tuple[float, float]]:
"""Find specular reflection point on building wall"""
# Simplified: find closest wall segment and calculate reflection
geometry = building.geometry
best_point = None
best_score = float('inf')
for i in range(len(geometry) - 1):
wall_start = geometry[i]
wall_end = geometry[i + 1]
# Find reflection point on this wall segment
ref_point = self._specular_reflection(
tx_lon, tx_lat, rx_lon, rx_lat,
wall_start[0], wall_start[1],
wall_end[0], wall_end[1]
)
if ref_point:
# Score by total path length
d1 = np.sqrt((ref_point[0] - tx_lon)**2 + (ref_point[1] - tx_lat)**2)
d2 = np.sqrt((ref_point[0] - rx_lon)**2 + (ref_point[1] - rx_lat)**2)
score = d1 + d2
if score < best_score:
best_score = score
best_point = (ref_point[1], ref_point[0]) # Return as (lat, lon)
return best_point
def _specular_reflection(
self,
tx_x, tx_y, rx_x, rx_y,
wall_x1, wall_y1, wall_x2, wall_y2
) -> Optional[Tuple[float, float]]:
"""Calculate specular reflection point on wall segment"""
# Wall vector
wall_dx = wall_x2 - wall_x1
wall_dy = wall_y2 - wall_y1
wall_len = np.sqrt(wall_dx**2 + wall_dy**2)
if wall_len < 1e-10:
return None
# Wall normal
normal_x = -wall_dy / wall_len
normal_y = wall_dx / wall_len
# Mirror TX across wall
# Project TX onto wall
tx_rel_x = tx_x - wall_x1
tx_rel_y = tx_y - wall_y1
dot = tx_rel_x * normal_x + tx_rel_y * normal_y
mirror_x = tx_x - 2 * dot * normal_x
mirror_y = tx_y - 2 * dot * normal_y
# Find intersection of (mirror -> RX) with wall
# Parametric line: mirror + t * (rx - mirror)
dx = rx_x - mirror_x
dy = rx_y - mirror_y
# Wall parametric: wall1 + s * (wall2 - wall1)
denom = dx * wall_dy - dy * wall_dx
if abs(denom) < 1e-10:
return None # Parallel
t = ((wall_x1 - mirror_x) * wall_dy - (wall_y1 - mirror_y) * wall_dx) / denom
s = ((wall_x1 - mirror_x) * dy - (wall_y1 - mirror_y) * dx) / (-denom)
# Check if intersection is on wall segment and between mirror and RX
if 0 <= s <= 1 and 0 <= t <= 1:
ref_x = mirror_x + t * dx
ref_y = mirror_y + t * dy
return (ref_x, ref_y)
return None
def _line_intersects_building_3d(
self,
lat1, lon1, height1,
lat2, lon2, height2,
building: Building
) -> bool:
"""Check if 3D line intersects building volume"""
# Sample along line
for t in np.linspace(0, 1, 20):
lat = lat1 + t * (lat2 - lat1)
lon = lon1 + t * (lon2 - lon1)
height = height1 + t * (height2 - height1)
if buildings_service.point_in_building(lat, lon, building):
if height < building.height:
return True
return False
def _calculate_path_loss(self, distance, frequency_mhz, tx_height, rx_height) -> float:
"""Okumura-Hata path loss"""
d_km = max(distance / 1000, 0.1)
a_hm = (1.1 * np.log10(frequency_mhz) - 0.7) * rx_height - (1.56 * np.log10(frequency_mhz) - 0.8)
L = (69.55 + 26.16 * np.log10(frequency_mhz) - 13.82 * np.log10(tx_height) - a_hm +
(44.9 - 6.55 * np.log10(tx_height)) * np.log10(d_km))
return L
def _knife_edge_loss(self, clearance, frequency_mhz, total_distance, d1) -> float:
"""Knife-edge diffraction loss"""
if clearance >= 0:
return 0.0
wavelength = 300 / frequency_mhz
d2 = total_distance - d1
if d1 <= 0 or d2 <= 0 or wavelength <= 0:
return 0.0
# Fresnel parameter v
v = abs(clearance) * np.sqrt(2 * (d1 + d2) / (wavelength * d1 * d2))
# Lee's approximation
if v <= -0.78:
return 0
elif v < 0:
return 6.02 + 9.11 * v - 1.27 * v**2
elif v < 2.4:
return 6.02 + 9.11 * v + 1.27 * v**2
else:
return 13 + 20 * np.log10(v)
dominant_path_service = DominantPathService()

View File

@@ -0,0 +1,128 @@
import math
from enum import Enum
from typing import Optional
class BuildingMaterial(Enum):
"""Building materials with RF properties"""
CONCRETE = "concrete"
BRICK = "brick"
GLASS = "glass"
WOOD = "wood"
METAL = "metal"
STONE = "stone"
PLASTER = "plaster"
UNKNOWN = "unknown"
# ITU-R P.2040 based attenuation (dB per wall at 1-3 GHz)
MATERIAL_LOSS = {
BuildingMaterial.CONCRETE: 15.0,
BuildingMaterial.BRICK: 10.0,
BuildingMaterial.GLASS: 3.0,
BuildingMaterial.WOOD: 5.0,
BuildingMaterial.METAL: 25.0, # Or full reflection
BuildingMaterial.STONE: 12.0,
BuildingMaterial.PLASTER: 4.0,
BuildingMaterial.UNKNOWN: 10.0, # Default assumption
}
# Reflection coefficient (0-1, portion of signal reflected)
MATERIAL_REFLECTION = {
BuildingMaterial.CONCRETE: 0.6,
BuildingMaterial.BRICK: 0.5,
BuildingMaterial.GLASS: 0.3,
BuildingMaterial.WOOD: 0.2,
BuildingMaterial.METAL: 0.9,
BuildingMaterial.STONE: 0.55,
BuildingMaterial.PLASTER: 0.3,
BuildingMaterial.UNKNOWN: 0.4,
}
class MaterialsService:
"""Building material detection and RF properties"""
# OSM building:material tag mapping
OSM_MATERIAL_MAP = {
"concrete": BuildingMaterial.CONCRETE,
"brick": BuildingMaterial.BRICK,
"glass": BuildingMaterial.GLASS,
"wood": BuildingMaterial.WOOD,
"metal": BuildingMaterial.METAL,
"steel": BuildingMaterial.METAL,
"stone": BuildingMaterial.STONE,
"plaster": BuildingMaterial.PLASTER,
"cement_block": BuildingMaterial.CONCRETE,
"timber": BuildingMaterial.WOOD,
}
# Fallback by building type
BUILDING_TYPE_MATERIAL = {
"industrial": BuildingMaterial.METAL,
"warehouse": BuildingMaterial.METAL,
"garage": BuildingMaterial.METAL,
"shed": BuildingMaterial.WOOD,
"house": BuildingMaterial.BRICK,
"residential": BuildingMaterial.CONCRETE,
"apartments": BuildingMaterial.CONCRETE,
"commercial": BuildingMaterial.GLASS, # Often glass facades
"office": BuildingMaterial.GLASS,
"retail": BuildingMaterial.GLASS,
"church": BuildingMaterial.STONE,
"cathedral": BuildingMaterial.STONE,
"school": BuildingMaterial.BRICK,
"hospital": BuildingMaterial.CONCRETE,
"university": BuildingMaterial.CONCRETE,
}
def detect_material(self, building_tags: dict) -> BuildingMaterial:
"""Detect building material from OSM tags"""
# Direct material tag
if "building:material" in building_tags:
material_str = building_tags["building:material"].lower()
if material_str in self.OSM_MATERIAL_MAP:
return self.OSM_MATERIAL_MAP[material_str]
# Facade material (often more relevant for RF)
if "building:facade:material" in building_tags:
material_str = building_tags["building:facade:material"].lower()
if material_str in self.OSM_MATERIAL_MAP:
return self.OSM_MATERIAL_MAP[material_str]
# Fallback by building type
building_type = building_tags.get("building", "yes").lower()
if building_type in self.BUILDING_TYPE_MATERIAL:
return self.BUILDING_TYPE_MATERIAL[building_type]
return BuildingMaterial.UNKNOWN
def get_penetration_loss(self, material: BuildingMaterial, frequency_mhz: float = 1800) -> float:
"""
Get RF penetration loss through wall
Frequency correction: +2dB per octave above 1GHz
"""
base_loss = MATERIAL_LOSS[material]
# Frequency correction (simplified)
freq_factor = max(0, (frequency_mhz - 1000) / 1000) * 2
return base_loss + freq_factor
def get_reflection_coefficient(self, material: BuildingMaterial) -> float:
"""Get reflection coefficient (0-1)"""
return MATERIAL_REFLECTION[material]
def get_reflection_loss(self, material: BuildingMaterial) -> float:
"""Get loss due to reflection (dB)"""
coeff = MATERIAL_REFLECTION[material]
if coeff <= 0:
return 30.0 # Effectively no reflection
# Reflection loss in dB = -10 * log10(coefficient)
return -10 * math.log10(coeff)
materials_service = MaterialsService()

View File

@@ -0,0 +1,265 @@
import numpy as np
from typing import List, Tuple, Optional
from dataclasses import dataclass
from app.services.buildings_service import Building
from app.services.materials_service import materials_service
@dataclass
class ReflectionPath:
"""A reflection path with one or more bounces"""
points: List[Tuple[float, float]] # [TX, reflection1, reflection2, ..., RX]
total_distance: float
total_loss: float
reflection_count: int
materials: List[str]
class ReflectionService:
"""
Calculate reflection paths for RF propagation
- Single bounce (most common)
- Double bounce (around corners)
- Ground reflection
"""
MAX_BOUNCES = 2
GROUND_REFLECTION_COEFF = 0.3 # Depends on surface
async def find_reflection_paths(
self,
tx_lat: float, tx_lon: float, tx_height: float,
rx_lat: float, rx_lon: float, rx_height: float,
frequency_mhz: float,
buildings: List[Building],
include_ground: bool = True
) -> List[ReflectionPath]:
"""Find all viable reflection paths"""
paths = []
# Single-bounce building reflections
for building in buildings:
path = self._find_single_bounce(
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz, building
)
if path:
paths.append(path)
# Ground reflection
if include_ground:
ground_path = self._calculate_ground_reflection(
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz
)
if ground_path:
paths.append(ground_path)
# Sort by loss (best first)
paths.sort(key=lambda p: p.total_loss)
return paths[:5] # Return top 5
def _find_single_bounce(
self,
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz,
building: Building
) -> Optional[ReflectionPath]:
"""Find single-bounce reflection off building"""
# Find reflection point on building walls
geometry = building.geometry
for i in range(len(geometry) - 1):
wall_start = geometry[i]
wall_end = geometry[i + 1]
ref_point = self._specular_reflection_point(
(tx_lon, tx_lat), (rx_lon, rx_lat),
wall_start, wall_end
)
if not ref_point:
continue
ref_lat, ref_lon = ref_point[1], ref_point[0]
# Calculate distances
from app.services.terrain_service import TerrainService
d1 = TerrainService.haversine_distance(tx_lat, tx_lon, ref_lat, ref_lon)
d2 = TerrainService.haversine_distance(ref_lat, ref_lon, rx_lat, rx_lon)
total_dist = d1 + d2
# Direct distance check - reflection shouldn't be much longer
direct_dist = TerrainService.haversine_distance(tx_lat, tx_lon, rx_lat, rx_lon)
if total_dist > direct_dist * 1.5:
continue
# Path loss
path_loss = self._free_space_loss(total_dist, frequency_mhz)
# Reflection loss
material = materials_service.detect_material(building.tags)
reflection_loss = materials_service.get_reflection_loss(material)
total_loss = path_loss + reflection_loss
return ReflectionPath(
points=[(tx_lat, tx_lon), (ref_lat, ref_lon), (rx_lat, rx_lon)],
total_distance=total_dist,
total_loss=total_loss,
reflection_count=1,
materials=[material.value]
)
return None
def _calculate_ground_reflection(
self,
tx_lat, tx_lon, tx_height,
rx_lat, rx_lon, rx_height,
frequency_mhz
) -> Optional[ReflectionPath]:
"""Calculate ground reflection path"""
from app.services.terrain_service import TerrainService
# Reflection point (simplified - midpoint for flat ground)
mid_lat = (tx_lat + rx_lat) / 2
mid_lon = (tx_lon + rx_lon) / 2
# Path lengths
d1 = TerrainService.haversine_distance(tx_lat, tx_lon, mid_lat, mid_lon)
d2 = TerrainService.haversine_distance(mid_lat, mid_lon, rx_lat, rx_lon)
# Actual path length considering heights
path1 = np.sqrt(d1**2 + tx_height**2)
path2 = np.sqrt(d2**2 + rx_height**2)
total_dist = path1 + path2
# Path loss
path_loss = self._free_space_loss(total_dist, frequency_mhz)
# Ground reflection loss (~5-10 dB typically)
ground_reflection_loss = -10 * np.log10(self.GROUND_REFLECTION_COEFF)
# Phase difference can cause constructive or destructive interference
# Simplified: assume average case
total_loss = path_loss + ground_reflection_loss
return ReflectionPath(
points=[(tx_lat, tx_lon), (mid_lat, mid_lon), (rx_lat, rx_lon)],
total_distance=total_dist,
total_loss=total_loss,
reflection_count=1,
materials=["ground"]
)
def _specular_reflection_point(
self,
tx: Tuple[float, float], # (lon, lat)
rx: Tuple[float, float],
wall_start: List[float], # [lon, lat]
wall_end: List[float]
) -> Optional[Tuple[float, float]]:
"""Calculate specular reflection point on wall"""
# Wall vector
wx = wall_end[0] - wall_start[0]
wy = wall_end[1] - wall_start[1]
wall_len = np.sqrt(wx**2 + wy**2)
if wall_len < 1e-10:
return None
# Normalize
wx /= wall_len
wy /= wall_len
# Wall normal (perpendicular)
nx = -wy
ny = wx
# Vector from wall start to TX
tx_rel_x = tx[0] - wall_start[0]
tx_rel_y = tx[1] - wall_start[1]
# Distance from TX to wall line
dist_to_wall = tx_rel_x * nx + tx_rel_y * ny
# Mirror TX across wall
mirror_x = tx[0] - 2 * dist_to_wall * nx
mirror_y = tx[1] - 2 * dist_to_wall * ny
# Line from mirror to RX
dx = rx[0] - mirror_x
dy = rx[1] - mirror_y
# Find intersection with wall
# Parametric: wall_start + t * wall_dir
# Parametric: mirror + s * (rx - mirror)
denom = dx * wy - dy * wx
if abs(denom) < 1e-10:
return None
t = ((wall_start[0] - mirror_x) * wy - (wall_start[1] - mirror_y) * wx) / denom
s = ((wall_start[0] - mirror_x) * dy - (wall_start[1] - mirror_y) * dx) / (-denom) if denom != 0 else 0
# Check if on wall segment and between mirror and RX
if 0 <= s <= 1 and 0 <= t <= 1:
ref_x = mirror_x + t * dx
ref_y = mirror_y + t * dy
return (ref_x, ref_y)
return None
def _free_space_loss(self, distance: float, frequency_mhz: float) -> float:
"""Free space path loss (dB)"""
if distance <= 0:
distance = 1
# FSPL = 20*log10(d) + 20*log10(f) + 20*log10(4*pi/c)
# Simplified: FSPL = 32.45 + 20*log10(f_MHz) + 20*log10(d_km)
d_km = distance / 1000
return 32.45 + 20 * np.log10(frequency_mhz) + 20 * np.log10(d_km + 0.001)
def combine_paths(
self,
direct_power_dbm: float,
reflection_paths: List[ReflectionPath],
tx_power_dbm: float
) -> float:
"""
Combine direct and reflected signals (power sum)
Returns total received power in dBm
"""
# Convert to linear power
powers = []
if direct_power_dbm > -150: # Valid direct signal
powers.append(10 ** (direct_power_dbm / 10))
for path in reflection_paths:
reflected_power_dbm = tx_power_dbm - path.total_loss
if reflected_power_dbm > -150:
powers.append(10 ** (reflected_power_dbm / 10))
if not powers:
return -150.0 # No signal
# Sum powers (incoherent addition - conservative estimate)
total_power = sum(powers)
return 10 * np.log10(total_power)
reflection_service = ReflectionService()

View File

@@ -0,0 +1,363 @@
import numpy as np
from typing import List, Tuple, Optional
from dataclasses import dataclass
import httpx
from pathlib import Path
import json
@dataclass
class Street:
"""Street segment from OSM"""
id: int
name: Optional[str]
geometry: List[Tuple[float, float]] # [(lat, lon), ...]
width: float # meters
highway_type: str # residential, primary, secondary, etc.
class StreetCanyonService:
"""
Street canyon propagation model (ITU-R P.1411)
Signal propagates along streets with reflections from building walls.
Loss increases at corners/turns.
"""
OVERPASS_URL = "https://overpass-api.de/api/interpreter"
# Default street widths by type
STREET_WIDTHS = {
"motorway": 25.0,
"trunk": 20.0,
"primary": 15.0,
"secondary": 12.0,
"tertiary": 10.0,
"residential": 8.0,
"unclassified": 6.0,
"service": 5.0,
"footway": 2.0,
"path": 1.5,
}
# Corner/turn loss
CORNER_LOSS_90 = 10.0 # dB for 90-degree turn
CORNER_LOSS_45 = 4.0 # dB for 45-degree turn
def __init__(self, cache_dir: str = "/opt/rfcp/backend/data/streets"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True, parents=True)
self._cache: dict[str, List[Street]] = {}
async def fetch_streets(
self,
min_lat: float, min_lon: float,
max_lat: float, max_lon: float
) -> List[Street]:
"""Fetch street network from OSM"""
cache_key = f"{min_lat:.3f}_{min_lon:.3f}_{max_lat:.3f}_{max_lon:.3f}"
# Check cache
if cache_key in self._cache:
return self._cache[cache_key]
cache_file = self.cache_dir / f"{cache_key}.json"
if cache_file.exists():
with open(cache_file) as f:
data = json.load(f)
streets = [Street(**s) for s in data]
self._cache[cache_key] = streets
return streets
# Fetch from Overpass
query = f"""
[out:json][timeout:30];
way["highway"]({min_lat},{min_lon},{max_lat},{max_lon});
out body;
>;
out skel qt;
"""
try:
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(self.OVERPASS_URL, data={"data": query})
response.raise_for_status()
data = response.json()
except Exception as e:
print(f"Street fetch error: {e}")
return []
streets = self._parse_streets(data)
# Cache
with open(cache_file, 'w') as f:
json.dump([{
"id": s.id,
"name": s.name,
"geometry": s.geometry,
"width": s.width,
"highway_type": s.highway_type
} for s in streets], f)
self._cache[cache_key] = streets
return streets
def _parse_streets(self, data: dict) -> List[Street]:
"""Parse Overpass response into Street objects"""
nodes = {}
for element in data.get("elements", []):
if element["type"] == "node":
nodes[element["id"]] = (element["lat"], element["lon"])
streets = []
for element in data.get("elements", []):
if element["type"] != "way":
continue
tags = element.get("tags", {})
if "highway" not in tags:
continue
highway_type = tags["highway"]
# Skip non-road types
if highway_type in ["bus_stop", "crossing", "traffic_signals"]:
continue
geometry = []
for node_id in element.get("nodes", []):
if node_id in nodes:
geometry.append(nodes[node_id])
if len(geometry) < 2:
continue
# Get width
width = self._get_street_width(tags)
streets.append(Street(
id=element["id"],
name=tags.get("name"),
geometry=geometry,
width=width,
highway_type=highway_type
))
return streets
def _get_street_width(self, tags: dict) -> float:
"""Estimate street width from OSM tags"""
# Explicit width
if "width" in tags:
try:
return float(tags["width"].replace("m", "").strip())
except (ValueError, TypeError):
pass
# Calculate from lanes
if "lanes" in tags:
try:
lanes = int(tags["lanes"])
return lanes * 3.5 # ~3.5m per lane
except (ValueError, TypeError):
pass
# Default by highway type
highway_type = tags.get("highway", "residential")
return self.STREET_WIDTHS.get(highway_type, 8.0)
async def calculate_street_canyon_loss(
self,
tx_lat: float, tx_lon: float, tx_height: float,
rx_lat: float, rx_lon: float, rx_height: float,
frequency_mhz: float,
streets: List[Street]
) -> Tuple[float, List[Tuple[float, float]]]:
"""
Calculate path loss through street canyon
Returns:
(path_loss_db, street_path as list of points)
"""
# Find path along streets from TX to RX
street_path = self._find_street_path(tx_lat, tx_lon, rx_lat, rx_lon, streets)
if not street_path:
return float('inf'), [] # No street path found
# Calculate loss along path
total_loss = 0.0
total_distance = 0.0
for i in range(len(street_path) - 1):
p1 = street_path[i]
p2 = street_path[i + 1]
# Segment distance
from app.services.terrain_service import TerrainService
segment_dist = TerrainService.haversine_distance(p1[0], p1[1], p2[0], p2[1])
total_distance += segment_dist
# Street canyon loss (ITU-R P.1411 simplified)
# L = 32.4 + 20*log10(f_MHz) + 20*log10(d_km)
if segment_dist > 0:
segment_loss = 32.4 + 20 * np.log10(frequency_mhz) + 20 * np.log10(segment_dist / 1000 + 0.001)
total_loss += segment_loss * (segment_dist / total_distance) if total_distance > 0 else 0
# Corner loss
if i > 0:
corner_angle = self._calculate_corner_angle(
street_path[i - 1], p1, p2
)
corner_loss = self._corner_loss(corner_angle)
total_loss += corner_loss
return total_loss, street_path
def _find_street_path(
self,
start_lat: float, start_lon: float,
end_lat: float, end_lon: float,
streets: List[Street]
) -> List[Tuple[float, float]]:
"""
Find path along streets (simplified A* / greedy)
Returns list of (lat, lon) waypoints
"""
# Find nearest street point to start and end
start_point = self._nearest_street_point(start_lat, start_lon, streets)
end_point = self._nearest_street_point(end_lat, end_lon, streets)
if not start_point or not end_point:
return []
# Simplified: just return direct street segments
# Full implementation would use A* pathfinding
path = [(start_lat, start_lon), start_point]
# Add intermediate points along streets toward destination
current = start_point
visited = set()
for _ in range(50): # Max iterations
if self._distance(current, end_point) < 50: # Within 50m
break
# Find next street segment toward destination
next_point = self._next_street_point(current, end_point, streets, visited)
if not next_point:
break
path.append(next_point)
visited.add((round(current[0], 5), round(current[1], 5)))
current = next_point
path.append(end_point)
path.append((end_lat, end_lon))
return path
def _nearest_street_point(
self,
lat: float, lon: float,
streets: List[Street]
) -> Optional[Tuple[float, float]]:
"""Find nearest point on any street"""
best_point = None
best_dist = float('inf')
for street in streets:
for point in street.geometry:
dist = self._distance((lat, lon), point)
if dist < best_dist:
best_dist = dist
best_point = point
return best_point if best_dist < 200 else None # Max 200m to street
def _next_street_point(
self,
current: Tuple[float, float],
target: Tuple[float, float],
streets: List[Street],
visited: set
) -> Optional[Tuple[float, float]]:
"""Find next street point toward target"""
best_point = None
best_score = float('inf')
for street in streets:
for i, point in enumerate(street.geometry):
if (round(point[0], 5), round(point[1], 5)) in visited:
continue
dist_from_current = self._distance(current, point)
dist_to_target = self._distance(point, target)
# Must be close to current position
if dist_from_current > 100:
continue
# Score: prefer points closer to target
score = dist_to_target + dist_from_current * 0.5
if score < best_score:
best_score = score
best_point = point
return best_point
def _distance(self, p1: Tuple[float, float], p2: Tuple[float, float]) -> float:
"""Quick distance approximation (meters)"""
lat_diff = (p1[0] - p2[0]) * 111000
lon_diff = (p1[1] - p2[1]) * 111000 * np.cos(np.radians(p1[0]))
return np.sqrt(lat_diff**2 + lon_diff**2)
def _calculate_corner_angle(
self,
p1: Tuple[float, float],
p2: Tuple[float, float],
p3: Tuple[float, float]
) -> float:
"""Calculate angle at corner (degrees)"""
v1 = (p1[0] - p2[0], p1[1] - p2[1])
v2 = (p3[0] - p2[0], p3[1] - p2[1])
dot = v1[0] * v2[0] + v1[1] * v2[1]
mag1 = np.sqrt(v1[0]**2 + v1[1]**2)
mag2 = np.sqrt(v2[0]**2 + v2[1]**2)
if mag1 * mag2 < 1e-10:
return 180.0
cos_angle = dot / (mag1 * mag2)
cos_angle = max(-1, min(1, cos_angle))
return np.degrees(np.arccos(cos_angle))
def _corner_loss(self, angle_degrees: float) -> float:
"""Calculate loss due to corner/turn"""
# Straight = 180 deg, right angle = 90 deg
turn_angle = abs(180 - angle_degrees)
if turn_angle < 15:
return 0.0
elif turn_angle < 45:
return self.CORNER_LOSS_45 * (turn_angle / 45)
elif turn_angle < 90:
return self.CORNER_LOSS_45 + (self.CORNER_LOSS_90 - self.CORNER_LOSS_45) * ((turn_angle - 45) / 45)
else:
return self.CORNER_LOSS_90 + (turn_angle - 90) * 0.2 # Extra loss for sharp turns
street_canyon_service = StreetCanyonService()