"""SQLite-based database storage module.
This module provides a persistent storage implementation using SQLite,
offering ACID-compliant transactions and efficient data storage.
Classes:
SQLiteStorage: Main class implementing SQLite-based storage functionality.
Functions:
None (all functionality is encapsulated in classes)
Types:
None
Exceptions:
StorageError: See :exc:`true_storage.exceptions.StorageError`
KeyError: Raised when accessing non-existent keys
Key Features:
- ACID-compliant transactions
- Thread-safe operations
- In-memory or file-based storage
- Binary data support
- Automatic connection management
- Connection pooling
- Resource cleanup
- SQL-based querying capabilities
"""
import pickle
import sqlite3
import threading
from typing import Any, Optional, List
from ..exceptions import StorageError
from true_storage.base import BaseStorage
__all__ = [
'SQLiteStorage',
]
def __dir__() -> List[str]:
return sorted(__all__)
"""SQLite-based storage implementation."""
[docs]
class SQLiteStorage(BaseStorage):
"""SQLite-based storage implementation."""
[docs]
def __init__(self, db_path: Optional[str] = None):
"""Initialize SQLite storage.
Args:
db_path: Path to SQLite database file. If None, uses in-memory database.
"""
self.db_path = db_path or ":memory:"
self._lock = threading.RLock()
self._conn = None
self._init_db()
[docs]
def _get_connection(self):
"""Get a SQLite connection, creating it if needed."""
if self._conn is None:
self._conn = sqlite3.connect(self.db_path, check_same_thread=False)
return self._conn
[docs]
def _init_db(self):
"""Initialize the database schema."""
with self._lock:
try:
conn = self._get_connection()
conn.execute("""
CREATE TABLE IF NOT EXISTS checkpoints (
key TEXT PRIMARY KEY,
value BLOB
)
""")
conn.commit()
except Exception as e:
raise StorageError(f"Failed to initialize database: {e}")
[docs]
def store(self, key: str, value: Any) -> None:
"""Store a value in the database."""
with self._lock:
try:
conn = self._get_connection()
value_bytes = pickle.dumps(value, protocol=pickle.HIGHEST_PROTOCOL)
conn.execute(
"INSERT OR REPLACE INTO checkpoints (key, value) VALUES (?, ?)",
(key, value_bytes)
)
conn.commit()
except Exception as e:
raise StorageError(f"Failed to store value: {e}")
[docs]
def retrieve(self, key: str) -> Any:
"""Retrieve a value from the database."""
with self._lock:
try:
conn = self._get_connection()
cursor = conn.execute(
"SELECT value FROM checkpoints WHERE key = ?",
(key,)
)
row = cursor.fetchone()
if row is None:
raise KeyError(f"No value found for key: {key}")
return pickle.loads(row[0])
except KeyError:
raise
except Exception as e:
raise StorageError(f"Failed to retrieve value: {e}")
[docs]
def delete(self, key: str) -> None:
"""Delete a value from the database."""
with self._lock:
try:
conn = self._get_connection()
conn.execute("DELETE FROM checkpoints WHERE key = ?", (key,))
conn.commit()
except Exception as e:
raise StorageError(f"Failed to delete value: {e}")
[docs]
def clear(self) -> None:
"""Clear all values from the database."""
with self._lock:
try:
conn = self._get_connection()
conn.execute("DELETE FROM checkpoints")
conn.commit()
except Exception as e:
raise StorageError(f"Failed to clear storage: {e}")
[docs]
def close(self):
"""Close the database connection."""
with self._lock:
if self._conn is not None:
self._conn.close()
self._conn = None
[docs]
def clone(self) -> 'BaseStorage':
...
[docs]
def __del__(self):
"""Ensure connection is closed on deletion."""
self.close()