From b21fa9b9cbea487b13f0be22f5568120a9fd2b50 Mon Sep 17 00:00:00 2001 From: mytec Date: Sat, 31 Jan 2026 00:14:57 +0200 Subject: [PATCH] @mytec: iter1.3 ready for test --- backend/app/api/routes/coverage.py | 104 +++++++ backend/app/main.py | 7 +- backend/app/services/buildings_service.py | 255 +++++++++++++++++ backend/app/services/coverage_service.py | 331 ++++++++++++++++++++++ 4 files changed, 694 insertions(+), 3 deletions(-) create mode 100644 backend/app/api/routes/coverage.py create mode 100644 backend/app/services/buildings_service.py create mode 100644 backend/app/services/coverage_service.py diff --git a/backend/app/api/routes/coverage.py b/backend/app/api/routes/coverage.py new file mode 100644 index 0000000..5383221 --- /dev/null +++ b/backend/app/api/routes/coverage.py @@ -0,0 +1,104 @@ +from fastapi import APIRouter, HTTPException, BackgroundTasks +from typing import List, Optional +from pydantic import BaseModel +from app.services.coverage_service import ( + coverage_service, + CoverageSettings, + SiteParams, + CoveragePoint +) + +router = APIRouter() + + +class CoverageRequest(BaseModel): + """Request body for coverage calculation""" + sites: List[SiteParams] + settings: CoverageSettings = CoverageSettings() + + +class CoverageResponse(BaseModel): + """Coverage calculation response""" + points: List[CoveragePoint] + count: int + settings: CoverageSettings + stats: dict + + +@router.post("/calculate") +async def calculate_coverage(request: CoverageRequest) -> CoverageResponse: + """ + Calculate RF coverage for one or more sites + + Returns grid of RSRP values with terrain and building effects + """ + if not request.sites: + raise HTTPException(400, "At least one site required") + + if len(request.sites) > 10: + raise HTTPException(400, "Maximum 10 sites per request") + + # Validate settings + if request.settings.radius > 50000: + raise HTTPException(400, "Maximum radius 50km") + + if request.settings.resolution < 50: + raise HTTPException(400, "Minimum resolution 50m") + + # Calculate + if len(request.sites) == 1: + points = await coverage_service.calculate_coverage( + request.sites[0], + request.settings + ) + else: + points = await coverage_service.calculate_multi_site_coverage( + request.sites, + request.settings + ) + + # Calculate stats + rsrp_values = [p.rsrp for p in points] + los_count = sum(1 for p in points if p.has_los) + + stats = { + "min_rsrp": min(rsrp_values) if rsrp_values else 0, + "max_rsrp": max(rsrp_values) if rsrp_values else 0, + "avg_rsrp": sum(rsrp_values) / len(rsrp_values) if rsrp_values else 0, + "los_percentage": (los_count / len(points) * 100) if points else 0, + "points_with_buildings": sum(1 for p in points if p.building_loss > 0), + "points_with_terrain_loss": sum(1 for p in points if p.terrain_loss > 0), + } + + return CoverageResponse( + points=points, + count=len(points), + settings=request.settings, + stats=stats + ) + + +@router.get("/buildings") +async def get_buildings( + min_lat: float, + min_lon: float, + max_lat: float, + max_lon: float +): + """ + Get buildings in bounding box (for debugging/visualization) + """ + from app.services.buildings_service import buildings_service + + # Limit bbox size + if (max_lat - min_lat) > 0.1 or (max_lon - min_lon) > 0.1: + raise HTTPException(400, "Bbox too large (max 0.1 degrees)") + + buildings = await buildings_service.fetch_buildings( + min_lat, min_lon, max_lat, max_lon + ) + + return { + "count": len(buildings), + "buildings": [b.model_dump() for b in buildings] + } diff --git a/backend/app/main.py b/backend/app/main.py index 90a5776..09c9265 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -4,7 +4,7 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from app.core.database import connect_to_mongo, close_mongo_connection -from app.api.routes import health, projects, terrain +from app.api.routes import health, projects, terrain, coverage @asynccontextmanager @@ -17,7 +17,7 @@ async def lifespan(app: FastAPI): app = FastAPI( title="RFCP Backend API", description="RF Coverage Planning Backend", - version="1.2.0", + version="1.3.0", lifespan=lifespan, ) @@ -34,11 +34,12 @@ app.add_middleware( app.include_router(health.router, prefix="/api/health", tags=["health"]) app.include_router(projects.router, prefix="/api/projects", tags=["projects"]) app.include_router(terrain.router, prefix="/api/terrain", tags=["terrain"]) +app.include_router(coverage.router, prefix="/api/coverage", tags=["coverage"]) @app.get("/") async def root(): - return {"message": "RFCP Backend API", "version": "1.2.0"} + return {"message": "RFCP Backend API", "version": "1.3.0"} if __name__ == "__main__": diff --git a/backend/app/services/buildings_service.py b/backend/app/services/buildings_service.py new file mode 100644 index 0000000..b533e9f --- /dev/null +++ b/backend/app/services/buildings_service.py @@ -0,0 +1,255 @@ +import httpx +import asyncio +from typing import List, Optional +from pydantic import BaseModel +from functools import lru_cache +import hashlib +import json +from pathlib import Path + + +class Building(BaseModel): + """Single building footprint""" + id: int + geometry: List[List[float]] # [[lon, lat], ...] + height: float # meters + levels: Optional[int] = None + building_type: Optional[str] = None + + +class BuildingsService: + """ + OpenStreetMap buildings via Overpass API + """ + + OVERPASS_URL = "https://overpass-api.de/api/interpreter" + DEFAULT_LEVEL_HEIGHT = 3.0 # meters per floor + DEFAULT_BUILDING_HEIGHT = 9.0 # 3 floors if unknown + + def __init__(self, cache_dir: str = "/opt/rfcp/backend/data/buildings"): + self.cache_dir = Path(cache_dir) + self.cache_dir.mkdir(exist_ok=True, parents=True) + self._memory_cache: dict[str, List[Building]] = {} + self._max_cache_size = 50 # bbox regions + + def _bbox_key(self, min_lat: float, min_lon: float, max_lat: float, max_lon: float) -> str: + """Generate cache key for bbox""" + # Round to 0.01 degree (~1km) grid for cache efficiency + key = f"{min_lat:.2f},{min_lon:.2f},{max_lat:.2f},{max_lon:.2f}" + return hashlib.md5(key.encode()).hexdigest()[:12] + + async def fetch_buildings( + self, + min_lat: float, min_lon: float, + max_lat: float, max_lon: float, + use_cache: bool = True + ) -> List[Building]: + """ + Fetch buildings in bounding box from OSM + + Args: + min_lat, min_lon, max_lat, max_lon: Bounding box + use_cache: Whether to use cached results + + Returns: + List of Building objects with height estimates + """ + cache_key = self._bbox_key(min_lat, min_lon, max_lat, max_lon) + + # Check memory cache + if use_cache and cache_key in self._memory_cache: + return self._memory_cache[cache_key] + + # Check disk cache + cache_file = self.cache_dir / f"{cache_key}.json" + if use_cache and cache_file.exists(): + try: + with open(cache_file, 'r') as f: + data = json.load(f) + buildings = [Building(**b) for b in data] + self._memory_cache[cache_key] = buildings + return buildings + except Exception: + pass # Fetch fresh if cache corrupted + + # Fetch from Overpass API + query = f""" + [out:json][timeout:30]; + ( + way["building"]({min_lat},{min_lon},{max_lat},{max_lon}); + relation["building"]({min_lat},{min_lon},{max_lat},{max_lon}); + ); + out body; + >; + out skel qt; + """ + + try: + async with httpx.AsyncClient(timeout=60.0) as client: + response = await client.post( + self.OVERPASS_URL, + data={"data": query} + ) + response.raise_for_status() + data = response.json() + except Exception as e: + print(f"Overpass API error: {e}") + return [] + + # Parse response + buildings = self._parse_overpass_response(data) + + # Cache results + if buildings: + # Disk cache + with open(cache_file, 'w') as f: + json.dump([b.model_dump() for b in buildings], f) + + # Memory cache (with size limit) + if len(self._memory_cache) >= self._max_cache_size: + oldest = next(iter(self._memory_cache)) + del self._memory_cache[oldest] + self._memory_cache[cache_key] = buildings + + return buildings + + def _parse_overpass_response(self, data: dict) -> List[Building]: + """Parse Overpass JSON response into Building objects""" + buildings = [] + + # Build node lookup + nodes = {} + for element in data.get("elements", []): + if element["type"] == "node": + nodes[element["id"]] = (element["lon"], element["lat"]) + + # Process ways (building footprints) + for element in data.get("elements", []): + if element["type"] != "way": + continue + + tags = element.get("tags", {}) + if "building" not in tags: + continue + + # Get geometry + geometry = [] + for node_id in element.get("nodes", []): + if node_id in nodes: + geometry.append(list(nodes[node_id])) + + if len(geometry) < 3: + continue # Invalid polygon + + # Estimate height + height = self._estimate_height(tags) + + buildings.append(Building( + id=element["id"], + geometry=geometry, + height=height, + levels=int(tags.get("building:levels", 0)) or None, + building_type=tags.get("building") + )) + + return buildings + + def _estimate_height(self, tags: dict) -> float: + """Estimate building height from OSM tags""" + # Explicit height tag + if "height" in tags: + try: + h = tags["height"] + # Handle "10 m" or "10m" format + if isinstance(h, str): + h = h.replace("m", "").replace(" ", "") + return float(h) + except (ValueError, TypeError): + pass + + # Calculate from levels + if "building:levels" in tags: + try: + levels = int(tags["building:levels"]) + return levels * self.DEFAULT_LEVEL_HEIGHT + except (ValueError, TypeError): + pass + + # Default based on building type + building_type = tags.get("building", "yes") + type_heights = { + "house": 6.0, + "residential": 12.0, + "apartments": 18.0, + "commercial": 12.0, + "industrial": 8.0, + "warehouse": 6.0, + "garage": 3.0, + "shed": 2.5, + "roof": 3.0, + "church": 15.0, + "cathedral": 30.0, + "hospital": 15.0, + "school": 12.0, + "university": 15.0, + "office": 20.0, + "retail": 6.0, + } + + return type_heights.get(building_type, self.DEFAULT_BUILDING_HEIGHT) + + def point_in_building(self, lat: float, lon: float, building: Building) -> bool: + """Check if point is inside building footprint (ray casting)""" + x, y = lon, lat + polygon = building.geometry + n = len(polygon) + inside = False + + j = n - 1 + for i in range(n): + xi, yi = polygon[i] + xj, yj = polygon[j] + + if ((yi > y) != (yj > y)) and (x < (xj - xi) * (y - yi) / (yj - yi) + xi): + inside = not inside + j = i + + return inside + + def line_intersects_building( + self, + lat1: float, lon1: float, height1: float, + lat2: float, lon2: float, height2: float, + building: Building + ) -> Optional[float]: + """ + Check if line segment intersects building + + Returns: + Distance along path where intersection occurs, or None + """ + # Simplified 2D check + height comparison + # For accurate 3D intersection, would need proper ray-polygon intersection + + from app.services.terrain_service import TerrainService + + # Sample points along line + num_samples = 20 + for i in range(num_samples): + t = i / num_samples + lat = lat1 + t * (lat2 - lat1) + lon = lon1 + t * (lon2 - lon1) + height = height1 + t * (height2 - height1) + + if self.point_in_building(lat, lon, building): + # Check if signal height is below building + if height < building.height: + # Calculate distance + dist = t * TerrainService.haversine_distance(lat1, lon1, lat2, lon2) + return dist + + return None + + +# Singleton instance +buildings_service = BuildingsService() diff --git a/backend/app/services/coverage_service.py b/backend/app/services/coverage_service.py new file mode 100644 index 0000000..8c88674 --- /dev/null +++ b/backend/app/services/coverage_service.py @@ -0,0 +1,331 @@ +import numpy as np +import asyncio +from typing import List, Optional, Tuple +from pydantic import BaseModel +from app.services.terrain_service import terrain_service, TerrainService +from app.services.los_service import los_service +from app.services.buildings_service import buildings_service, Building + + +class CoveragePoint(BaseModel): + lat: float + lon: float + rsrp: float # dBm + distance: float # meters from site + has_los: bool + terrain_loss: float # dB + building_loss: float # dB + + +class CoverageSettings(BaseModel): + radius: float = 10000 # meters + resolution: float = 200 # meters + min_signal: float = -120 # dBm threshold + use_terrain: bool = True + use_buildings: bool = True + + +class SiteParams(BaseModel): + lat: float + lon: float + height: float = 30 # antenna height meters + power: float = 43 # dBm (20W) + gain: float = 15 # dBi + frequency: float = 1800 # MHz + azimuth: Optional[float] = None # degrees, None = omni + beamwidth: Optional[float] = 65 # degrees + + +class CoverageService: + """ + RF Coverage calculation with terrain and buildings + """ + + EARTH_RADIUS = 6371000 + + def __init__(self): + self.terrain = terrain_service + self.buildings = buildings_service + self.los = los_service + + async def calculate_coverage( + self, + site: SiteParams, + settings: CoverageSettings + ) -> List[CoveragePoint]: + """ + Calculate coverage grid for a single site + + Returns list of CoveragePoint with RSRP values + """ + points = [] + + # Generate grid + grid = self._generate_grid( + site.lat, site.lon, + settings.radius, + settings.resolution + ) + + # Fetch buildings for coverage area (if enabled) + buildings = [] + if settings.use_buildings: + # Calculate bbox with margin + lat_delta = settings.radius / 111000 # ~111km per degree + lon_delta = settings.radius / (111000 * np.cos(np.radians(site.lat))) + + buildings = await self.buildings.fetch_buildings( + site.lat - lat_delta, site.lon - lon_delta, + site.lat + lat_delta, site.lon + lon_delta + ) + + # Calculate coverage for each point + for lat, lon in grid: + point = await self._calculate_point( + site, lat, lon, + settings, buildings + ) + + if point.rsrp >= settings.min_signal: + points.append(point) + + return points + + async def calculate_multi_site_coverage( + self, + sites: List[SiteParams], + settings: CoverageSettings + ) -> List[CoveragePoint]: + """ + Calculate combined coverage from multiple sites + Best server (strongest signal) wins at each point + """ + if not sites: + return [] + + # Get all individual coverages + all_coverages = await asyncio.gather(*[ + self.calculate_coverage(site, settings) + for site in sites + ]) + + # Combine by best signal + point_map: dict[Tuple[float, float], CoveragePoint] = {} + + for coverage in all_coverages: + for point in coverage: + key = (round(point.lat, 6), round(point.lon, 6)) + + if key not in point_map or point.rsrp > point_map[key].rsrp: + point_map[key] = point + + return list(point_map.values()) + + def _generate_grid( + self, + center_lat: float, center_lon: float, + radius: float, resolution: float + ) -> List[Tuple[float, float]]: + """Generate coverage grid points""" + points = [] + + # Convert resolution to degrees + lat_step = resolution / 111000 + lon_step = resolution / (111000 * np.cos(np.radians(center_lat))) + + # Calculate grid bounds + lat_delta = radius / 111000 + lon_delta = radius / (111000 * np.cos(np.radians(center_lat))) + + lat = center_lat - lat_delta + while lat <= center_lat + lat_delta: + lon = center_lon - lon_delta + while lon <= center_lon + lon_delta: + # Check if within radius (circular, not square) + dist = TerrainService.haversine_distance(center_lat, center_lon, lat, lon) + if dist <= radius: + points.append((lat, lon)) + lon += lon_step + lat += lat_step + + return points + + async def _calculate_point( + self, + site: SiteParams, + lat: float, lon: float, + settings: CoverageSettings, + buildings: List[Building] + ) -> CoveragePoint: + """Calculate RSRP at a single point""" + + # Distance + distance = TerrainService.haversine_distance(site.lat, site.lon, lat, lon) + + if distance < 1: + distance = 1 # Avoid division by zero + + # Base path loss (Okumura-Hata for urban) + path_loss = self._okumura_hata( + distance, site.frequency, site.height, 1.5 # 1.5m receiver height + ) + + # Antenna pattern loss (if directional) + antenna_loss = 0.0 + if site.azimuth is not None and site.beamwidth: + antenna_loss = self._antenna_pattern_loss( + site.lat, site.lon, lat, lon, + site.azimuth, site.beamwidth + ) + + # Terrain loss (LoS check) + terrain_loss = 0.0 + has_los = True + + if settings.use_terrain: + los_result = await self.los.check_line_of_sight( + site.lat, site.lon, site.height, + lat, lon, 1.5 # receiver at 1.5m + ) + has_los = los_result["has_los"] + + if not has_los: + # Add diffraction loss based on clearance + clearance = los_result["clearance"] + terrain_loss = self._diffraction_loss(clearance, site.frequency) + + # Building loss + building_loss = 0.0 + + if settings.use_buildings and buildings: + for building in buildings: + intersection = self.buildings.line_intersects_building( + site.lat, site.lon, site.height + await self.terrain.get_elevation(site.lat, site.lon), + lat, lon, 1.5 + await self.terrain.get_elevation(lat, lon), + building + ) + if intersection is not None: + # Building penetration loss (~20dB for concrete) + building_loss += 20.0 + has_los = False + break # One building is enough + + # Calculate RSRP + # RSRP = Tx Power + Tx Gain - Path Loss - Antenna Loss - Terrain Loss - Building Loss + rsrp = site.power + site.gain - path_loss - antenna_loss - terrain_loss - building_loss + + return CoveragePoint( + lat=lat, + lon=lon, + rsrp=rsrp, + distance=distance, + has_los=has_los, + terrain_loss=terrain_loss, + building_loss=building_loss + ) + + def _okumura_hata( + self, + distance: float, # meters + frequency: float, # MHz + tx_height: float, # meters + rx_height: float # meters + ) -> float: + """ + Okumura-Hata path loss model (urban) + + Returns path loss in dB + """ + d_km = distance / 1000 + + if d_km < 0.1: + d_km = 0.1 # Minimum distance + + # Mobile antenna height correction (urban) + a_hm = (1.1 * np.log10(frequency) - 0.7) * rx_height - (1.56 * np.log10(frequency) - 0.8) + + # Path loss + L = (69.55 + 26.16 * np.log10(frequency) - 13.82 * np.log10(tx_height) - a_hm + + (44.9 - 6.55 * np.log10(tx_height)) * np.log10(d_km)) + + return L + + def _antenna_pattern_loss( + self, + site_lat: float, site_lon: float, + point_lat: float, point_lon: float, + azimuth: float, beamwidth: float + ) -> float: + """Calculate antenna pattern attenuation""" + # Calculate bearing from site to point + bearing = self._calculate_bearing(site_lat, site_lon, point_lat, point_lon) + + # Angle difference from main lobe + angle_diff = abs(bearing - azimuth) + if angle_diff > 180: + angle_diff = 360 - angle_diff + + # Simple cosine pattern approximation + # 3dB beamwidth = angle where power drops to half + half_beamwidth = beamwidth / 2 + + if angle_diff <= half_beamwidth: + # Within main lobe - minimal loss + loss = 3 * (angle_diff / half_beamwidth) ** 2 + else: + # Outside main lobe - significant loss + loss = 3 + 12 * ((angle_diff - half_beamwidth) / half_beamwidth) ** 2 + loss = min(loss, 25) # Cap at 25dB (back lobe) + + return loss + + def _calculate_bearing( + self, + lat1: float, lon1: float, + lat2: float, lon2: float + ) -> float: + """Calculate bearing from point 1 to point 2 (degrees)""" + lat1, lon1, lat2, lon2 = map(np.radians, [lat1, lon1, lat2, lon2]) + + dlon = lon2 - lon1 + + x = np.sin(dlon) * np.cos(lat2) + y = np.cos(lat1) * np.sin(lat2) - np.sin(lat1) * np.cos(lat2) * np.cos(dlon) + + bearing = np.degrees(np.arctan2(x, y)) + + return (bearing + 360) % 360 + + def _diffraction_loss(self, clearance: float, frequency: float) -> float: + """ + Knife-edge diffraction loss + + Args: + clearance: Clearance in meters (negative = obstructed) + frequency: Frequency in MHz + + Returns: + Additional loss in dB + """ + if clearance >= 0: + return 0.0 # No obstruction + + # Fresnel parameter approximation + # v ~ clearance * sqrt(2 / (lambda * d)) + # Simplified: use clearance directly + + v = abs(clearance) / 10 # Normalize + + # Knife-edge loss approximation + if v <= 0: + loss = 0 + elif v < 2.4: + loss = 6.02 + 9.11 * v - 1.27 * v**2 + else: + loss = 13.0 + 20 * np.log10(v) + + return min(loss, 40) # Cap at 40dB + + +# Singleton +coverage_service = CoverageService()