Source code for duck.http.session.connector
"""
Module containing SessionConnector class which can be used to connect to
session storage to perform operations like get, set, update, delete, clear, etc.
"""
import os
import uuid
from typing import Callable
from duck.settings import SETTINGS
from duck.settings.loaded import SettingsLoaded
from duck.utils.caching import CacheBase, InMemoryCache
[docs]
def get_session_storage_connector():
"""
Returns the session storage connector object.
"""
return SettingsLoaded.SESSION_STORAGE_CONNECTOR.getresult()
[docs]
class SessionStorageConnectorError(Exception):
"""
Raised when errors related to the session storage connector arise.
"""
[docs]
class SessionStorageConnector:
"""
Connects to the configured session storage backend and exposes a uniform
interface for get, set, update, delete, and clear operations.
A write-through in-memory cache sits in front of the real storage backend
to reduce latency on repeated reads. The cache layer is bypassed entirely
when the backend is itself an InMemoryCache.
"""
instance = None
initialized = False
CACHED_SESSIONS = InMemoryCache(maxkeys=1024 * 4)
"""
Write-through in-memory cache that fronts the real session storage backend.
"""
[docs]
def __new__(cls, session_storage_cls: type[CacheBase]):
from duck.meta import Meta
if cls.instance is None:
Meta.set_metadata("SESSION_STORAGE_SET", True)
cls.instance = super().__new__(cls)
return cls.instance
def __init__(self, session_storage_cls: type[CacheBase]):
"""
Initializes the connector with the given storage backend class.
Initialization only runs once — subsequent calls with the same class
are no-ops. Passing a different class after initialization raises an
error to prevent silent misconfiguration.
Args:
session_storage_cls: The storage backend class to instantiate.
Raises:
SessionStorageConnectorError: If already initialized with a different class.
"""
from duck.utils.caching.encrypted import EncryptedCache
if self.__class__.initialized:
if session_storage_cls is not self.session_storage_cls:
raise SessionStorageConnectorError(
"SessionStorageConnector already initialized with a "
"different session storage class."
)
return
self.session_storage_cls = session_storage_cls
self.session_dir = SETTINGS["SESSION_DIR"]
# Ensure the session directory exists if one is configured
if self.session_dir:
os.makedirs(self.session_dir, exist_ok=True)
# Some backends take a directory path, others take no arguments
try:
raw_session_storage = session_storage_cls()
except TypeError:
raw_session_storage = session_storage_cls(self.session_dir)
# Wrap session storage
self._session_storage = EncryptedCache(backend=raw_session_storage, secret_key=SETTINGS["SECRET_KEY"])
self.__class__.initialized = True
# Convenience flag so callers can check without inspecting the class directly
@property
def using_memory_backend(self) -> bool:
"""
Returns True when the backend is itself an InMemoryCache.
When True, the write-through cache layer is bypassed to avoid
double-storing the same data in two separate InMemoryCache instances.
"""
return issubclass(self.session_storage_cls, InMemoryCache)
[docs]
@staticmethod
def generate_session_id() -> str:
"""
Returns a randomly generated session ID.
"""
return str(uuid.uuid4())
[docs]
def set_session(
self,
session_id: str,
data: dict,
expiry: int | float | None = None,
) -> None:
"""
Writes session data to the backend and updates the in-memory cache.
Args:
session_id: The session identifier.
data: The session data to store.
expiry: Optional TTL in seconds. Omitted from the call if not provided.
"""
if expiry is not None:
self._session_storage.set(session_id, data, expiry)
else:
self._session_storage.set(session_id, data)
# Mirror into the cache only when the backend is not already in-memory
if not self.using_memory_backend:
if expiry is not None:
self.CACHED_SESSIONS.set(session_id, data, expiry=expiry)
else:
self.CACHED_SESSIONS.set(session_id, data)
[docs]
def update_session(self, session_id: str, data: dict) -> None:
"""
Merges new data into an existing session, preserving existing keys.
Args:
session_id: The session identifier.
data: Partial data to merge into the current session.
"""
prev_data = self.get_session(session_id) or {}
prev_data.update(data)
self.set_session(session_id, prev_data)
[docs]
def get_session(self, session_id: str) -> dict | None:
"""
Retrieves session data, consulting the in-memory cache first.
Args:
session_id: The session identifier.
Returns:
The session data dict, or None if the session does not exist.
"""
if not self.using_memory_backend:
# Check the fast in-memory cache before hitting real storage
cached = self.CACHED_SESSIONS.get(session_id)
if cached is not None:
return cached
return self._session_storage.get(session_id)
[docs]
def delete_session(self, session_id: str) -> None:
"""
Deletes a session from both the backend and the in-memory cache.
Args:
session_id: The session identifier to remove.
"""
self._session_storage.delete(session_id)
# Keep the cache consistent on deletion
if not self.using_memory_backend:
self.CACHED_SESSIONS.delete(session_id)
[docs]
def clear_all_sessions(self) -> None:
"""
Clears all sessions from the backend and flushes the in-memory cache.
"""
self._session_storage.clear()
# Flush the cache so stale entries don't linger after a full clear
if not self.using_memory_backend:
self.CACHED_SESSIONS.clear()
[docs]
def save(self) -> None:
"""
Persists current session state to the backend storage.
"""
self._session_storage.save()
[docs]
def close(self) -> None:
"""
Closes the session storage backend connection.
"""
self._session_storage.close()