123 lines
3.9 KiB
Python
123 lines
3.9 KiB
Python
"""
|
|
Coverage boundary calculation service.
|
|
|
|
Computes concave hull (alpha shape) from coverage points to generate
|
|
a realistic boundary that follows actual coverage contour.
|
|
"""
|
|
|
|
import logging
|
|
from typing import Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def calculate_coverage_boundary(
|
|
points: list[dict],
|
|
threshold_dbm: float = -100,
|
|
simplify_tolerance: float = 0.001,
|
|
) -> list[dict]:
|
|
"""
|
|
Calculate coverage boundary as concave hull of points above threshold.
|
|
|
|
Args:
|
|
points: List of coverage points with 'lat', 'lon', 'rsrp' keys
|
|
threshold_dbm: RSRP threshold - points below this are excluded
|
|
simplify_tolerance: Simplification tolerance in degrees (~100m per 0.001)
|
|
|
|
Returns:
|
|
List of {'lat': float, 'lon': float} coordinates forming boundary polygon.
|
|
Empty list if boundary cannot be computed.
|
|
"""
|
|
try:
|
|
from shapely.geometry import MultiPoint
|
|
from shapely import concave_hull
|
|
except ImportError:
|
|
logger.warning("Shapely not installed - boundary calculation disabled")
|
|
return []
|
|
|
|
# Filter points above threshold
|
|
valid_coords = [
|
|
(p['lon'], p['lat']) # Shapely uses (x, y) = (lon, lat)
|
|
for p in points
|
|
if p.get('rsrp', -999) >= threshold_dbm
|
|
]
|
|
|
|
if len(valid_coords) < 3:
|
|
logger.debug(f"Not enough points for boundary: {len(valid_coords)}")
|
|
return []
|
|
|
|
try:
|
|
# Create MultiPoint geometry
|
|
mp = MultiPoint(valid_coords)
|
|
|
|
# Compute concave hull (alpha shape)
|
|
# ratio: 0 = convex hull, 1 = very tight fit
|
|
# 0.3-0.5 gives good balance between detail and smoothness
|
|
hull = concave_hull(mp, ratio=0.3)
|
|
|
|
if hull.is_empty:
|
|
logger.debug("Concave hull is empty")
|
|
return []
|
|
|
|
# Simplify to reduce points (0.001 deg ≈ 100m)
|
|
if simplify_tolerance > 0:
|
|
hull = hull.simplify(simplify_tolerance, preserve_topology=True)
|
|
|
|
# Extract coordinates based on geometry type
|
|
if hull.geom_type == 'Polygon':
|
|
coords = list(hull.exterior.coords)
|
|
return [{'lat': c[1], 'lon': c[0]} for c in coords]
|
|
|
|
elif hull.geom_type == 'MultiPolygon':
|
|
# Return largest polygon's exterior
|
|
largest = max(hull.geoms, key=lambda g: g.area)
|
|
coords = list(largest.exterior.coords)
|
|
return [{'lat': c[1], 'lon': c[0]} for c in coords]
|
|
|
|
elif hull.geom_type == 'GeometryCollection':
|
|
# Find polygons in collection
|
|
polygons = [g for g in hull.geoms if g.geom_type == 'Polygon']
|
|
if polygons:
|
|
largest = max(polygons, key=lambda g: g.area)
|
|
coords = list(largest.exterior.coords)
|
|
return [{'lat': c[1], 'lon': c[0]} for c in coords]
|
|
|
|
logger.debug(f"Unexpected hull geometry type: {hull.geom_type}")
|
|
return []
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Boundary calculation error: {e}")
|
|
return []
|
|
|
|
|
|
def calculate_multi_site_boundaries(
|
|
points: list[dict],
|
|
threshold_dbm: float = -100,
|
|
) -> dict[str, list[dict]]:
|
|
"""
|
|
Calculate separate boundaries for each site's coverage area.
|
|
|
|
Args:
|
|
points: Coverage points with 'lat', 'lon', 'rsrp', 'site_id' keys
|
|
threshold_dbm: RSRP threshold
|
|
|
|
Returns:
|
|
Dict mapping site_id to boundary coordinates list.
|
|
"""
|
|
# Group points by site_id
|
|
by_site: dict[str, list[dict]] = {}
|
|
for p in points:
|
|
site_id = p.get('site_id', 'default')
|
|
if site_id not in by_site:
|
|
by_site[site_id] = []
|
|
by_site[site_id].append(p)
|
|
|
|
# Calculate boundary for each site
|
|
boundaries = {}
|
|
for site_id, site_points in by_site.items():
|
|
boundary = calculate_coverage_boundary(site_points, threshold_dbm)
|
|
if boundary:
|
|
boundaries[site_id] = boundary
|
|
|
|
return boundaries
|