""" Shared Memory Manager for parallel processing. Instead of copying building/terrain data to each worker, store data in shared memory that all workers can read. """ import multiprocessing.shared_memory as shm import numpy as np from dataclasses import dataclass from typing import List, Optional @dataclass class SharedTerrainData: """Reference to terrain data in shared memory.""" shm_name: str shape: tuple bounds: tuple # (min_lat, min_lon, max_lat, max_lon) resolution: float def get_array(self) -> np.ndarray: existing_shm = shm.SharedMemory(name=self.shm_name) return np.ndarray(self.shape, dtype=np.int16, buffer=existing_shm.buf) @dataclass class SharedBuildingData: """Reference to building data in shared memory.""" shm_centroids_name: str # (N, 2) float64 shm_heights_name: str # (N,) float32 shm_vertices_name: str # (total_verts, 2) float64 shm_offsets_name: str # (N+1,) int32 count: int total_vertices: int def get_centroids(self) -> np.ndarray: existing = shm.SharedMemory(name=self.shm_centroids_name) return np.ndarray((self.count, 2), dtype=np.float64, buffer=existing.buf) def get_heights(self) -> np.ndarray: existing = shm.SharedMemory(name=self.shm_heights_name) return np.ndarray((self.count,), dtype=np.float32, buffer=existing.buf) def get_offsets(self) -> np.ndarray: existing = shm.SharedMemory(name=self.shm_offsets_name) return np.ndarray((self.count + 1,), dtype=np.int32, buffer=existing.buf) def get_vertices(self) -> np.ndarray: existing = shm.SharedMemory(name=self.shm_vertices_name) return np.ndarray((self.total_vertices, 2), dtype=np.float64, buffer=existing.buf) def get_polygon(self, idx: int) -> np.ndarray: offsets = self.get_offsets() vertices = self.get_vertices() start, end = offsets[idx], offsets[idx + 1] return vertices[start:end] class SharedMemoryManager: """ Manages shared memory blocks for parallel processing. Usage: manager = SharedMemoryManager() terrain_ref = manager.store_terrain(heights, bounds, resolution) buildings_ref = manager.store_buildings(buildings) # Pass references (small dataclasses) to workers pool.map(worker_func, points, terrain_ref, buildings_ref) # Workers attach to shared memory — no copy! terrain = terrain_ref.get_array() # Cleanup when done manager.cleanup() """ def __init__(self): self._shm_blocks: list = [] def store_terrain( self, heights: np.ndarray, bounds: tuple, resolution: float, ) -> SharedTerrainData: """Store terrain heights in shared memory.""" shm_block = shm.SharedMemory(create=True, size=heights.nbytes) self._shm_blocks.append(shm_block) shm_array = np.ndarray(heights.shape, dtype=heights.dtype, buffer=shm_block.buf) shm_array[:] = heights[:] return SharedTerrainData( shm_name=shm_block.name, shape=heights.shape, bounds=bounds, resolution=resolution, ) def store_buildings(self, buildings: list) -> Optional[SharedBuildingData]: """Store building data in shared memory. Args: buildings: List of Building objects or dicts with geometry. Returns: SharedBuildingData reference, or None if no buildings. """ n = len(buildings) if n == 0: return None # Extract centroids centroids = np.zeros((n, 2), dtype=np.float64) heights = np.zeros(n, dtype=np.float32) all_vertices = [] offsets = [0] for i, b in enumerate(buildings): # Support both dict and object forms if hasattr(b, 'geometry'): geom = b.geometry h = getattr(b, 'height', 10.0) else: geom = b.get('geometry', []) h = b.get('height', 10.0) if geom: lats = [p[1] for p in geom] lons = [p[0] for p in geom] centroids[i] = [sum(lats) / len(lats), sum(lons) / len(lons)] for lon, lat in geom: all_vertices.append([lat, lon]) heights[i] = h or 10.0 offsets.append(len(all_vertices)) vertices = np.array(all_vertices, dtype=np.float64) if all_vertices else np.zeros((0, 2), dtype=np.float64) offsets = np.array(offsets, dtype=np.int32) # Create shared memory shm_centroids = shm.SharedMemory(create=True, size=max(centroids.nbytes, 1)) shm_heights = shm.SharedMemory(create=True, size=max(heights.nbytes, 1)) shm_vertices = shm.SharedMemory(create=True, size=max(vertices.nbytes, 1)) shm_offsets = shm.SharedMemory(create=True, size=max(offsets.nbytes, 1)) self._shm_blocks.extend([shm_centroids, shm_heights, shm_vertices, shm_offsets]) # Copy data if centroids.nbytes > 0: np.ndarray(centroids.shape, dtype=centroids.dtype, buffer=shm_centroids.buf)[:] = centroids if heights.nbytes > 0: np.ndarray(heights.shape, dtype=heights.dtype, buffer=shm_heights.buf)[:] = heights if vertices.nbytes > 0: np.ndarray(vertices.shape, dtype=vertices.dtype, buffer=shm_vertices.buf)[:] = vertices if offsets.nbytes > 0: np.ndarray(offsets.shape, dtype=offsets.dtype, buffer=shm_offsets.buf)[:] = offsets return SharedBuildingData( shm_centroids_name=shm_centroids.name, shm_heights_name=shm_heights.name, shm_vertices_name=shm_vertices.name, shm_offsets_name=shm_offsets.name, count=n, total_vertices=len(all_vertices), ) def cleanup(self): """Release all shared memory blocks.""" for block in self._shm_blocks: try: block.close() block.unlink() except Exception: pass self._shm_blocks.clear()