Source code for true_storage.storage.mixed

"""Mixed storage module combining hot and cold storage capabilities.

This module implements a hybrid storage system that leverages both in-memory (hot) and
disk-based (cold) storage for optimal performance and data persistence.

Classes:
    MixedStorage: Main class implementing hybrid storage functionality.
    StoragePolicy: Enumeration of possible storage policies. (imported from base.py)

Functions:
    None (all functionality is encapsulated in classes)

Types:
    None

Exceptions:
    StorageError: Raised when storage operations fail

Key Features:
    - Hybrid storage combining hot and cold storage benefits
    - Automatic data synchronization between storage layers
    - Intelligent data retrieval strategy
    - Performance metrics for both storage layers
    - Storage optimization capabilities
    - Hot storage warm-up functionality
    - Thread-safe operations
    - Event emission system
    - Configurable storage policies
"""

import threading
import time
from typing import Any, Optional, Dict, List

from .base import (
    BaseStorageManager,
    StoragePolicy,
    StorageStrategy
)
from .cold import ColdStorage
from .hot import HotStorage
from ..exceptions import StorageError


__all__ = [
    'MixedStorage',
    'StoragePolicy',
    'StorageError'
]

def __dir__() -> List[str]:
    return sorted(__all__)


class MixedStorageStrategy(StorageStrategy):
    """Strategy for mixed storage operations."""

    def __init__(self, hot_storage: HotStorage, cold_storage: ColdStorage):
        self.hot_storage = hot_storage
        self.cold_storage = cold_storage
        self.lock = threading.Lock()

    def store(self, key: str, value: Any) -> None:
        """Store value in both hot and cold storage."""
        with self.lock:
            self.hot_storage.store(key, value)
            self.cold_storage.store(key, value)

    def retrieve(self, key: str) -> Optional[Any]:
        """Retrieve value from hot storage first, then cold storage."""
        with self.lock:
            # Try hot storage first
            value = self.hot_storage.retrieve(key)
            if value is not None:
                return value

            # If not in hot storage, try cold storage
            value = self.cold_storage.retrieve(key)
            if value is not None:
                # Store in hot storage for future quick access
                self.hot_storage.store(key, value)
            return value

    def delete(self, key: str) -> None:
        """Delete value from both hot and cold storage."""
        with self.lock:
            self.hot_storage.delete(key)
            self.cold_storage.delete(key)


[docs] class MixedStorage(BaseStorageManager): """Mixed storage implementation combining hot and cold storage."""
[docs] def __init__( self, storage_id: str = "mixed_storage", max_size: int = 100, expiration_time: int = 300, storage_directory: str = "cold_storage", compression_level: int = 6, policy: StoragePolicy = StoragePolicy.STRICT ): super().__init__( storage_id=storage_id, policy=policy ) self.hot_storage = HotStorage( storage_id=f"{storage_id}_hot", max_size=max_size, expiration_time=expiration_time, policy=policy ) self.cold_storage = ColdStorage( storage_id=f"{storage_id}_cold", storage_directory=storage_directory, compression_level=compression_level, policy=policy ) self.strategy = MixedStorageStrategy(self.hot_storage, self.cold_storage)
[docs] def store(self, key: str, value: Any) -> None: """Store a value in mixed storage.""" start_time = time.time() success = True try: self.strategy.store(key, value) self.emit_event("item_stored", {"key": key}) self._trigger_callbacks("after_store", key, value) except Exception as e: success = False if self.policy == StoragePolicy.STRICT: raise StorageError(f"Failed to store value: {e}") finally: self.update_metrics("store", success, time.time() - start_time)
[docs] def retrieve(self, key: str) -> Optional[Any]: """Retrieve a value from mixed storage.""" start_time = time.time() success = True try: value = self.strategy.retrieve(key) if value is not None: self.emit_event("item_retrieved", {"key": key}) self._trigger_callbacks("after_retrieve", key, value) return value except Exception as e: success = False if self.policy == StoragePolicy.STRICT: raise StorageError(f"Failed to retrieve value: {e}") return None finally: self.update_metrics("retrieve", success, time.time() - start_time)
[docs] def delete(self, key: str) -> None: """Delete a value from mixed storage.""" start_time = time.time() success = True try: self.strategy.delete(key) self.emit_event("item_deleted", {"key": key}) self._trigger_callbacks("after_delete", key) except Exception as e: success = False if self.policy == StoragePolicy.STRICT: raise StorageError(f"Failed to delete value: {e}") finally: self.update_metrics("delete", success, time.time() - start_time)
[docs] def clear(self) -> None: """Clear all items from mixed storage.""" start_time = time.time() success = True try: self.hot_storage.clear() self.cold_storage.clear() self.emit_event("storage_cleared") self._trigger_callbacks("after_clear") except Exception as e: success = False if self.policy == StoragePolicy.STRICT: raise StorageError(f"Failed to clear storage: {e}") finally: self.update_metrics("clear", success, time.time() - start_time)
[docs] def get_hot_stats(self) -> Dict[str, Any]: """Get hot storage statistics.""" return self.hot_storage.get_stats()
[docs] def get_cold_stats(self) -> Dict[str, Any]: """Get cold storage statistics.""" return self.cold_storage.get_stats()
[docs] def get_stats(self) -> Dict[str, Any]: """Get combined storage statistics.""" hot_stats = self.get_hot_stats() cold_stats = self.get_cold_stats() metrics = self.get_metrics() return { "hot_storage": hot_stats, "cold_storage": cold_stats, "combined": { "total_operations": metrics.total_operations, "total_errors": metrics.total_errors, "avg_response_time": metrics.avg_response_time, "hit_ratio": metrics.hits / (metrics.hits + metrics.misses) if metrics.hits + metrics.misses > 0 else 0 } }
[docs] def get_keys(self) -> list[str]: """Get all keys from both hot and cold storage.""" hot_keys = set(self.hot_storage.get_keys()) cold_keys = set(self.cold_storage.get_keys()) return list(hot_keys | cold_keys)
[docs] def optimize_hot_storage(self) -> None: """Optimize hot storage by removing least accessed items.""" keys = self.hot_storage.get_keys() if not keys: return # Move least accessed items to cold storage only for key in keys: value = self.hot_storage.retrieve(key) if value is not None: self.cold_storage.store(key, value) self.hot_storage.delete(key)
[docs] def warm_up_hot_storage(self, keys: list[str]) -> None: """Pre-load specified keys into hot storage.""" for key in keys: value = self.cold_storage.retrieve(key) if value is not None: self.hot_storage.store(key, value)