@mytec: feat: Phase 3.0 Architecture Refactor

Major refactoring of RFCP backend:
- Modular propagation models (8 models)
- SharedMemoryManager for terrain data
- ProcessPoolExecutor parallel processing
- WebSocket progress streaming
- Building filtering pipeline (351k → 15k)
- 82 unit tests

Performance: Standard preset 38s → 5s (7.6x speedup)

Known issue: Detailed preset timeout (fix in 3.1.0)
This commit is contained in:
2026-02-01 23:12:26 +02:00
parent 1dde56705a
commit defa3ad440
71 changed files with 7134 additions and 256 deletions

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,127 @@
"""
Integration tests for the PointCalculator.
Verifies end-to-end point calculation with various
propagation models and environmental conditions.
"""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..'))
from app.core.calculator import PointCalculator
from app.propagation.free_space import FreeSpaceModel
from app.propagation.okumura_hata import OkumuraHataModel
from app.propagation.cost231_hata import Cost231HataModel
class TestPointCalculatorFSPL:
def test_basic_calculation(self):
calc = PointCalculator(FreeSpaceModel())
result = calc.calculate_point(
site_lat=50.0, site_lon=30.0, site_height=30,
site_power=43, site_gain=18, site_frequency=1800,
point_lat=50.001, point_lon=30.0,
distance=111,
)
assert result.rsrp > -50 # Strong signal at short range
assert result.has_los is True
assert result.model_used == "Free-Space"
assert result.path_loss > 0
assert result.terrain_loss == 0
assert result.building_loss == 0
def test_signal_decreases_with_distance(self):
calc = PointCalculator(FreeSpaceModel())
near = calc.calculate_point(
site_lat=50.0, site_lon=30.0, site_height=30,
site_power=43, site_gain=18, site_frequency=1800,
point_lat=50.001, point_lon=30.0, distance=100,
)
far = calc.calculate_point(
site_lat=50.0, site_lon=30.0, site_height=30,
site_power=43, site_gain=18, site_frequency=1800,
point_lat=50.01, point_lon=30.0, distance=1000,
)
assert near.rsrp > far.rsrp
def test_terrain_obstruction(self):
calc = PointCalculator(FreeSpaceModel())
los = calc.calculate_point(
site_lat=50.0, site_lon=30.0, site_height=30,
site_power=43, site_gain=18, site_frequency=1800,
point_lat=50.01, point_lon=30.0, distance=1000,
)
nlos = calc.calculate_point(
site_lat=50.0, site_lon=30.0, site_height=30,
site_power=43, site_gain=18, site_frequency=1800,
point_lat=50.01, point_lon=30.0, distance=1000,
terrain_clearance=-10,
)
assert nlos.rsrp < los.rsrp
assert nlos.has_los is False
assert nlos.terrain_loss > 0
def test_building_loss_applied(self):
calc = PointCalculator(FreeSpaceModel())
no_building = calc.calculate_point(
site_lat=50.0, site_lon=30.0, site_height=30,
site_power=43, site_gain=18, site_frequency=1800,
point_lat=50.01, point_lon=30.0, distance=1000,
)
with_building = calc.calculate_point(
site_lat=50.0, site_lon=30.0, site_height=30,
site_power=43, site_gain=18, site_frequency=1800,
point_lat=50.01, point_lon=30.0, distance=1000,
building_loss=20,
)
assert abs(no_building.rsrp - with_building.rsrp - 20) < 0.1
class TestPointCalculatorAntenna:
def test_off_axis_reduces_signal(self):
calc = PointCalculator(FreeSpaceModel())
omni = calc.calculate_point(
site_lat=50.0, site_lon=30.0, site_height=30,
site_power=43, site_gain=18, site_frequency=1800,
point_lat=50.001, point_lon=30.0, distance=111,
)
directional = calc.calculate_point(
site_lat=50.0, site_lon=30.0, site_height=30,
site_power=43, site_gain=18, site_frequency=1800,
point_lat=50.001, point_lon=30.0, distance=111,
azimuth=90, beamwidth=65, # Pointing East, point is North
)
assert directional.rsrp < omni.rsrp
class TestPointCalculatorModelFallback:
def test_out_of_range_uses_fspl(self):
"""When Okumura-Hata is out of valid range, should fall back to FSPL."""
calc = PointCalculator(OkumuraHataModel())
# 50m distance is below Okumura-Hata minimum (1km)
result = calc.calculate_point(
site_lat=50.0, site_lon=30.0, site_height=30,
site_power=43, site_gain=18, site_frequency=900,
point_lat=50.0, point_lon=30.0001, distance=50,
)
# Should still return a valid result (via FSPL fallback)
assert result.rsrp != 0
assert result.path_loss > 0
if __name__ == "__main__":
for cls_name, cls in [
("FSPL", TestPointCalculatorFSPL),
("Antenna", TestPointCalculatorAntenna),
("Fallback", TestPointCalculatorModelFallback),
]:
instance = cls()
for method_name in [m for m in dir(instance) if m.startswith("test_")]:
try:
getattr(instance, method_name)()
print(f" PASS {cls_name}.{method_name}")
except Exception as e:
print(f" FAIL {cls_name}.{method_name}: {e}")
print("\nAll tests completed.")

View File

@@ -0,0 +1,115 @@
"""
Integration tests for the CoverageEngine orchestrator.
Tests model selection, available models API, and the
engine's coordination logic (without running actual
coverage calculations, which require terrain data).
"""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..'))
from app.core.engine import CoverageEngine, BandType, PresetType, CoverageSettings
class TestEngineModelSelection:
def test_lte_urban_uses_cost231(self):
engine = CoverageEngine()
model = engine.select_model(BandType.LTE, "urban")
assert model.name == "COST-231-Hata"
def test_lte_suburban_uses_okumura(self):
engine = CoverageEngine()
model = engine.select_model(BandType.LTE, "suburban")
assert model.name == "Okumura-Hata"
def test_lte_open_uses_fspl(self):
engine = CoverageEngine()
model = engine.select_model(BandType.LTE, "open")
assert model.name == "Free-Space"
def test_uhf_urban_uses_okumura(self):
engine = CoverageEngine()
model = engine.select_model(BandType.UHF, "urban")
assert model.name == "Okumura-Hata"
def test_uhf_rural_uses_longley_rice(self):
engine = CoverageEngine()
model = engine.select_model(BandType.UHF, "rural")
assert model.name == "Longley-Rice"
def test_vhf_urban_uses_p1546(self):
engine = CoverageEngine()
model = engine.select_model(BandType.VHF, "urban")
assert model.name == "ITU-R-P.1546"
def test_vhf_rural_uses_longley_rice(self):
engine = CoverageEngine()
model = engine.select_model(BandType.VHF, "rural")
assert model.name == "Longley-Rice"
def test_unknown_band_falls_back(self):
engine = CoverageEngine()
model = engine.select_model(BandType.CUSTOM, "desert")
assert model is not None # Should not crash
class TestEngineModelsAPI:
def test_returns_dict(self):
engine = CoverageEngine()
models = engine.get_available_models()
assert isinstance(models, dict)
assert len(models) >= 5
def test_model_info_structure(self):
engine = CoverageEngine()
models = engine.get_available_models()
for name, info in models.items():
assert "frequency_range" in info
assert "distance_range" in info
assert "bands" in info
assert len(info["bands"]) > 0
def test_all_expected_models_present(self):
engine = CoverageEngine()
models = engine.get_available_models()
expected = {"COST-231-Hata", "Okumura-Hata", "Free-Space", "Longley-Rice", "ITU-R-P.1546"}
assert expected.issubset(set(models.keys()))
class TestCoverageSettings:
def test_default_settings(self):
s = CoverageSettings()
assert s.radius == 10000
assert s.resolution == 200
assert s.preset == PresetType.STANDARD
assert s.band_type == BandType.LTE
def test_preset_values(self):
assert PresetType.FAST.value == "fast"
assert PresetType.STANDARD.value == "standard"
assert PresetType.DETAILED.value == "detailed"
assert PresetType.FULL.value == "full"
def test_band_type_values(self):
assert BandType.LTE.value == "lte"
assert BandType.UHF.value == "uhf"
assert BandType.VHF.value == "vhf"
if __name__ == "__main__":
for cls_name, cls in [
("ModelSelection", TestEngineModelSelection),
("ModelsAPI", TestEngineModelsAPI),
("CoverageSettings", TestCoverageSettings),
]:
instance = cls()
for method_name in [m for m in dir(instance) if m.startswith("test_")]:
try:
getattr(instance, method_name)()
print(f" PASS {cls_name}.{method_name}")
except Exception as e:
print(f" FAIL {cls_name}.{method_name}: {e}")
print("\nAll tests completed.")