""" Coverage boundary calculation service. Computes concave hull (alpha shape) from coverage points to generate a realistic boundary that follows actual coverage contour. """ import logging from typing import Optional logger = logging.getLogger(__name__) def calculate_coverage_boundary( points: list[dict], threshold_dbm: float = -100, simplify_tolerance: float = 0.001, ) -> list[dict]: """ Calculate coverage boundary as concave hull of points above threshold. Args: points: List of coverage points with 'lat', 'lon', 'rsrp' keys threshold_dbm: RSRP threshold - points below this are excluded simplify_tolerance: Simplification tolerance in degrees (~100m per 0.001) Returns: List of {'lat': float, 'lon': float} coordinates forming boundary polygon. Empty list if boundary cannot be computed. """ try: from shapely.geometry import MultiPoint from shapely import concave_hull except ImportError: logger.warning("Shapely not installed - boundary calculation disabled") return [] # Filter points above threshold valid_coords = [ (p['lon'], p['lat']) # Shapely uses (x, y) = (lon, lat) for p in points if p.get('rsrp', -999) >= threshold_dbm ] if len(valid_coords) < 3: logger.debug(f"Not enough points for boundary: {len(valid_coords)}") return [] try: # Create MultiPoint geometry mp = MultiPoint(valid_coords) # Compute concave hull (alpha shape) # ratio: 0 = convex hull, 1 = very tight fit # 0.3-0.5 gives good balance between detail and smoothness hull = concave_hull(mp, ratio=0.3) if hull.is_empty: logger.debug("Concave hull is empty") return [] # Simplify to reduce points (0.001 deg ≈ 100m) if simplify_tolerance > 0: hull = hull.simplify(simplify_tolerance, preserve_topology=True) # Extract coordinates based on geometry type if hull.geom_type == 'Polygon': coords = list(hull.exterior.coords) return [{'lat': c[1], 'lon': c[0]} for c in coords] elif hull.geom_type == 'MultiPolygon': # Return largest polygon's exterior largest = max(hull.geoms, key=lambda g: g.area) coords = list(largest.exterior.coords) return [{'lat': c[1], 'lon': c[0]} for c in coords] elif hull.geom_type == 'GeometryCollection': # Find polygons in collection polygons = [g for g in hull.geoms if g.geom_type == 'Polygon'] if polygons: largest = max(polygons, key=lambda g: g.area) coords = list(largest.exterior.coords) return [{'lat': c[1], 'lon': c[0]} for c in coords] logger.debug(f"Unexpected hull geometry type: {hull.geom_type}") return [] except Exception as e: logger.warning(f"Boundary calculation error: {e}") return [] def calculate_multi_site_boundaries( points: list[dict], threshold_dbm: float = -100, ) -> dict[str, list[dict]]: """ Calculate separate boundaries for each site's coverage area. Args: points: Coverage points with 'lat', 'lon', 'rsrp', 'site_id' keys threshold_dbm: RSRP threshold Returns: Dict mapping site_id to boundary coordinates list. """ # Group points by site_id by_site: dict[str, list[dict]] = {} for p in points: site_id = p.get('site_id', 'default') if site_id not in by_site: by_site[site_id] = [] by_site[site_id].append(p) # Calculate boundary for each site boundaries = {} for site_id, site_points in by_site.items(): boundary = calculate_coverage_boundary(site_points, threshold_dbm) if boundary: boundaries[site_id] = boundary return boundaries