""" Worker functions for parallel coverage calculation. These run in separate processes and access shared memory data. """ from typing import List, Dict, Optional from app.parallel.manager import SharedTerrainData, SharedBuildingData def process_chunk( chunk: List[tuple], terrain_cache: dict, buildings: list, osm_data: dict, config: dict, ) -> List[dict]: """ Process a chunk of grid points. This is the standard worker function used by both Ray and ProcessPoolExecutor. It re-uses the existing coverage calculation logic. """ # Inject terrain cache into the module-level singleton from app.services.terrain_service import terrain_service terrain_service._tile_cache = terrain_cache # Build spatial index from app.services.spatial_index import SpatialIndex spatial_idx = SpatialIndex() if buildings: spatial_idx.build(buildings) # Process points using existing calculator from app.services.coverage_service import CoverageService, SiteParams, CoverageSettings site = SiteParams(**config['site_dict']) settings = CoverageSettings(**config['settings_dict']) svc = CoverageService() timing = { "los": 0.0, "buildings": 0.0, "antenna": 0.0, "dominant_path": 0.0, "street_canyon": 0.0, "reflection": 0.0, "vegetation": 0.0, } precomputed = config.get('precomputed') results = [] for lat, lon, point_elev in chunk: pre = precomputed.get((lat, lon)) if precomputed else None point = svc._calculate_point_sync( site, lat, lon, settings, buildings, osm_data.get('streets', []), spatial_idx, osm_data.get('water_bodies', []), osm_data.get('vegetation_areas', []), config['site_elevation'], point_elev, timing, precomputed_distance=pre.get('distance') if pre else None, precomputed_path_loss=pre.get('path_loss') if pre else None, ) if point.rsrp >= settings.min_signal: results.append(point.model_dump()) return results