Files
rfcp/backend/app/services/cache_db.py

242 lines
8.1 KiB
Python

"""
SQLite cache for OSM data — buildings, vegetation, water, streets.
Replaces in-memory caching for large-area calculations. Instead of holding
hundreds of thousands of buildings in RAM, data is stored on disk in SQLite
and queried per-tile using spatial bbox queries.
Location: ~/.rfcp/osm_cache.db
"""
import json
import time
import sqlite3
from pathlib import Path
from typing import List, Dict, Optional
def _default_db_path() -> str:
"""Get default database path at ~/.rfcp/osm_cache.db."""
cache_dir = Path.home() / '.rfcp'
cache_dir.mkdir(parents=True, exist_ok=True)
return str(cache_dir / 'osm_cache.db')
class OSMCacheDB:
"""SQLite-backed cache for OSM feature data with bbox queries.
Stores buildings and vegetation as JSON blobs with bounding-box
columns for fast spatial queries. Cache freshness is tracked
per 1-degree cell (matching the OSM grid fetch pattern).
"""
def __init__(self, db_path: Optional[str] = None):
if db_path is None:
db_path = _default_db_path()
self.db_path = db_path
self._conn: Optional[sqlite3.Connection] = None
@property
def conn(self) -> sqlite3.Connection:
"""Lazy connection with WAL mode for concurrent reads."""
if self._conn is None:
self._conn = sqlite3.connect(self.db_path, check_same_thread=False)
self._conn.execute("PRAGMA journal_mode=WAL")
self._conn.execute("PRAGMA synchronous=NORMAL")
self._init_tables()
return self._conn
def _init_tables(self):
assert self._conn is not None
self._conn.executescript("""
CREATE TABLE IF NOT EXISTS buildings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
osm_id INTEGER,
min_lat REAL NOT NULL,
min_lon REAL NOT NULL,
max_lat REAL NOT NULL,
max_lon REAL NOT NULL,
height REAL DEFAULT 10.0,
data TEXT NOT NULL,
cell_key TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_bld_cell ON buildings(cell_key);
CREATE INDEX IF NOT EXISTS idx_bld_bbox
ON buildings(min_lat, max_lat, min_lon, max_lon);
CREATE TABLE IF NOT EXISTS vegetation (
id INTEGER PRIMARY KEY AUTOINCREMENT,
osm_id INTEGER,
min_lat REAL NOT NULL,
min_lon REAL NOT NULL,
max_lat REAL NOT NULL,
max_lon REAL NOT NULL,
data TEXT NOT NULL,
cell_key TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_veg_cell ON vegetation(cell_key);
CREATE INDEX IF NOT EXISTS idx_veg_bbox
ON vegetation(min_lat, max_lat, min_lon, max_lon);
CREATE TABLE IF NOT EXISTS cache_meta (
cell_key TEXT NOT NULL,
data_type TEXT NOT NULL,
fetched_at REAL NOT NULL,
item_count INTEGER DEFAULT 0,
PRIMARY KEY (cell_key, data_type)
);
""")
self._conn.commit()
# ── Cell key helpers ──
@staticmethod
def cell_key(min_lat: float, min_lon: float, max_lat: float, max_lon: float) -> str:
"""Generate cell key from bbox (matches 1-degree grid alignment)."""
return f"{min_lat:.0f},{min_lon:.0f},{max_lat:.0f},{max_lon:.0f}"
def is_cell_cached(
self, cell_key: str, data_type: str, max_age_hours: float = 24.0
) -> bool:
"""Check if cell data is cached and fresh."""
cursor = self.conn.execute(
"SELECT fetched_at FROM cache_meta "
"WHERE cell_key = ? AND data_type = ?",
(cell_key, data_type),
)
row = cursor.fetchone()
if row is None:
return False
age_hours = (time.time() - row[0]) / 3600
return age_hours < max_age_hours
def mark_cell_cached(self, cell_key: str, data_type: str, item_count: int):
"""Record that a cell has been fetched."""
self.conn.execute(
"INSERT OR REPLACE INTO cache_meta "
"(cell_key, data_type, fetched_at, item_count) VALUES (?, ?, ?, ?)",
(cell_key, data_type, time.time(), item_count),
)
self.conn.commit()
# ── Buildings ──
def insert_buildings_bulk(self, buildings_data: List[Dict], cell_key: str):
"""Bulk insert serialised building dicts for a cell.
Each dict must have 'geometry' (list of [lon, lat]) and 'id'.
"""
rows = []
for b in buildings_data:
geom = b.get('geometry', [])
if not geom:
continue
lats = [p[1] for p in geom]
lons = [p[0] for p in geom]
rows.append((
b.get('id', 0),
min(lats), min(lons), max(lats), max(lons),
b.get('height', 10.0),
json.dumps(b),
cell_key,
))
if rows:
self.conn.executemany(
"INSERT INTO buildings "
"(osm_id, min_lat, min_lon, max_lat, max_lon, height, data, cell_key) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
rows,
)
self.conn.commit()
def query_buildings_bbox(
self,
min_lat: float, max_lat: float,
min_lon: float, max_lon: float,
limit: int = 20000,
) -> List[Dict]:
"""Query buildings whose bbox overlaps the given bbox."""
cursor = self.conn.execute(
"SELECT data FROM buildings "
"WHERE max_lat >= ? AND min_lat <= ? "
"AND max_lon >= ? AND min_lon <= ? "
"LIMIT ?",
(min_lat, max_lat, min_lon, max_lon, limit),
)
return [json.loads(row[0]) for row in cursor]
# ── Vegetation ──
def insert_vegetation_bulk(self, veg_data: List[Dict], cell_key: str):
"""Bulk insert serialised vegetation dicts for a cell."""
rows = []
for v in veg_data:
geom = v.get('geometry', [])
if not geom:
continue
lats = [p[1] for p in geom]
lons = [p[0] for p in geom]
rows.append((
v.get('id', 0),
min(lats), min(lons), max(lats), max(lons),
json.dumps(v),
cell_key,
))
if rows:
self.conn.executemany(
"INSERT INTO vegetation "
"(osm_id, min_lat, min_lon, max_lat, max_lon, data, cell_key) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
rows,
)
self.conn.commit()
def query_vegetation_bbox(
self,
min_lat: float, max_lat: float,
min_lon: float, max_lon: float,
limit: int = 10000,
) -> List[Dict]:
"""Query vegetation whose bbox overlaps the given bbox."""
cursor = self.conn.execute(
"SELECT data FROM vegetation "
"WHERE max_lat >= ? AND min_lat <= ? "
"AND max_lon >= ? AND min_lon <= ? "
"LIMIT ?",
(min_lat, max_lat, min_lon, max_lon, limit),
)
return [json.loads(row[0]) for row in cursor]
# ── Housekeeping ──
def close(self):
"""Close the database connection."""
if self._conn:
self._conn.close()
self._conn = None
def get_stats(self) -> Dict[str, int]:
"""Get cache statistics."""
stats: Dict[str, int] = {}
for table in ('buildings', 'vegetation'):
cursor = self.conn.execute(f"SELECT COUNT(*) FROM {table}") # noqa: S608
stats[table] = cursor.fetchone()[0]
cursor = self.conn.execute("SELECT COUNT(*) FROM cache_meta")
stats['cached_cells'] = cursor.fetchone()[0]
return stats
# ── Singleton ──
_cache_db: Optional[OSMCacheDB] = None
def get_osm_cache_db() -> OSMCacheDB:
"""Get or create the singleton OSM cache database."""
global _cache_db
if _cache_db is None:
_cache_db = OSMCacheDB()
return _cache_db