Source code for duck.utils.caching.encrypted

"""
Encrypted cache wrapper using PyNaCl (libsodium).

Provides NaClEncryptor and EncryptedCache — a transparent encryption
layer that wraps any CacheBase backend. The backend never sees plaintext.
Encryption uses XSalsa20-Poly1305 AEAD: authenticated, single-pass,
and ARM-safe.
"""

import os
import json

from typing import Any

import pickle
import nacl.secret
import nacl.utils

from nacl.exceptions import CryptoError

from duck.utils.caching import CacheBase, MISSING


# SecretBox nonce size as defined by libsodium (24 bytes).
NONCE_SIZE: int = nacl.secret.SecretBox.NONCE_SIZE

# Environment variable the user must set to opt in to automatic key derivation
DUCK_NACL_DERIVE_KEY_ENV = "DUCK_NACL_DERIVE_KEY"

# NaCl SecretBox requires exactly this many bytes
NACL_KEY_SIZE = nacl.secret.SecretBox.KEY_SIZE  # 32


[docs] def resolve_nacl_key(secret_key: bytes) -> bytes: """ Ensures a raw secret key meets NaCl's 32-byte requirement. Behaviour by key length: - **Exactly 32 bytes** — returned as-is, no processing. - **Fewer than 32 bytes** — derived to 32 bytes using BLAKE2b with a fixed 32-byte digest. This is stable: the same input always produces the same output, so previously encrypted data remains decryptable. - **More than 32 bytes** — raises ``ValueError`` unless the environment variable ``DUCK_NACL_DERIVE_KEY`` is set to ``"1"``, in which case BLAKE2b derivation is applied to compress the key to 32 bytes. Args: secret_key: The raw key bytes to resolve. Returns: bytes: A exactly 32-byte key safe to pass to ``nacl.secret.SecretBox``. Raises: ValueError: If the key is empty, or longer than 32 bytes without the opt-in environment variable set. """ if not secret_key: raise ValueError( "Secret key is empty. Please provide a non-empty secret key." ) key_len = len(secret_key) if key_len == NACL_KEY_SIZE: # Already the correct size — nothing to do return secret_key if key_len < NACL_KEY_SIZE: # Derive a stable 32-byte key via BLAKE2b so existing ciphertext stays valid return hashlib.blake2b(secret_key, digest_size=NACL_KEY_SIZE).digest() # Key is longer than 32 bytes — require explicit opt-in before truncating if os.environ.get(DUCK_NACL_DERIVE_KEY_ENV) != "1": raise ValueError( f"Secret key is {key_len} bytes but NaCl requires exactly {NACL_KEY_SIZE}. " f"Set the environment variable {DUCK_NACL_DERIVE_KEY_ENV}=1 to allow " f"automatic key derivation via BLAKE2b, or provide a key that is " f"{NACL_KEY_SIZE} bytes or fewer." ) # User opted in — derive down to 32 bytes return hashlib.blake2b(secret_key, digest_size=NACL_KEY_SIZE).digest()
[docs] def generate_secret_key() -> bytes: """ Generate a 32-byte secret key for NaClEncryptor. Run once and persist the result in an environment variable or a secrets manager. Never hardcode or commit the returned value. Returns: 32 cryptographically random bytes. """ return nacl.utils.random(nacl.secret.SecretBox.KEY_SIZE)
[docs] class NaClEncryptor: """ XSalsa20-Poly1305 AEAD encryptor backed by libsodium via PyNaCl. Authenticated encryption and integrity verification happen in a single pass — they are inseparable. A unique random nonce is generated per message and embedded in the output so decrypt() is fully self-contained with no external nonce management. Builds cleanly on ARM, x86, and MIPS — anywhere libsodium compiles. Args: secret_key: 32 bytes from generate_secret_key(). Raises: ValueError: When secret_key is not exactly 32 bytes. """ def __init__(self, secret_key: bytes): if not secret_key: raise ValueError( f"Secret key seems to be empty, please provide secret key." ) self.secret_key = resolve_nacl_key(secret_key).encode("utf-8") self.box = nacl.secret.SecretBox(self.secret_key)
[docs] def encrypt(self, value: Any) -> bytes: """ Serialise value using Pickle then encrypt it. A fresh random nonce is generated on every call so encrypting the same value twice always produces different ciphertext. Args: value: Any Pickle-serialisable Python object. Returns: Raw bytes containing the embedded nonce, ciphertext, and Poly1305 authentication tag. No base64 encoding applied. Raises: TypeError: When value is not Pickle-serialisable. """ # NOTE: Pickle in general is slow but safer than JSON because it supports many types e.g. datetime objects. value = pickle.dumps(value) nonce = nacl.utils.random(NONCE_SIZE) # SecretBox.encrypt embeds the nonce so decrypt() needs no # separate nonce argument. return bytes(self.box.encrypt(value, nonce))
[docs] def decrypt(self, data: bytes) -> Any: """ Decrypt and deserialise bytes produced by encrypt(). Args: data: Raw bytes from a previous encrypt() call. Returns: The original Python object. Raises: CryptoError: When data has been tampered with or was encrypted with a different key. Never silently returns bad data. """ pickled = self.box.decrypt(data) return pickle.loads(pickled)
[docs] class EncryptedCache(CacheBase): """ Transparent encryption wrapper around any CacheBase backend. Values are encrypted with NaClEncryptor before being passed to the inner backend and decrypted transparently on retrieval. The backend never sees or stores plaintext. Tampering or wrong-key errors cause the offending entry to be evicted immediately rather than returned. All methods delegate locking to the inner backend — no double-locking. The async_* variants are forwarded to the inner backend's async_* methods when available, falling back to the sync methods otherwise. Args: backend: Any CacheBase instance to use as the storage layer. secret_key: 32-byte key from generate_secret_key(). Example: ```py import os from duck.settings import SETTINGS from duck.caching import PersistentFileCache from duck.caching.encrypted import EncryptedCache, generate_secret_key # Unencrypted cache for general use. cache = PersistentFileCache("./var/cache") # Encrypted cache for sessions or any sensitive data. session_cache = EncryptedCache( backend=PersistentFileCache("./var/sessions"), secret_key=SETTINGS["SECRET_KEY"].encode(), ) session_cache.set("sid:abc123", {"user_id": 42}, expiry=3600) data = session_cache.get("sid:abc123") ``` """ def __init__(self, backend: CacheBase, secret_key: bytes): self.backend = backend self.encryptor = NaClEncryptor(secret_key)
[docs] def set(self, key: str, value: Any, expiry: int | float | None = None) -> None: """ Encrypt value and store it in the backend under key. Args: key: Cache key. value: Any JSON-serialisable value to store. expiry: TTL in seconds. None means no expiry. Raises: TypeError: When value is not JSON-serialisable. """ self.backend.set(key, self.encryptor.encrypt(value), expiry)
[docs] def get(self, key: str, default: Any = None) -> Any: """ Retrieve and decrypt a value from the backend. Returns default when the key is absent, expired, or the ciphertext fails authentication — tampered entries are evicted immediately and never returned. Args: key: Cache key to look up. default: Returned on miss, expiry, or tamper detection. Returns: The decrypted value or default. """ raw = self.backend.get(key) if raw is None: return default try: return self.encryptor.decrypt(raw) except CryptoError: # Tampered or wrong key — evict and return default cleanly. self.backend.delete(key) return default
[docs] def delete(self, key: str) -> None: """ Remove key from the backend. Silent if absent. Args: key: Cache key to remove. """ self.backend.delete(key)
[docs] def pop(self, key: str, default: Any = MISSING) -> Any: """ Retrieve, decrypt, and atomically remove a value. The raw value is popped from the backend first so that a tampered entry is always removed regardless of whether decryption succeeds. Args: key: Cache key to retrieve and delete. default: Returned when absent. Raises KeyError if omitted. Returns: The decrypted value or default. Raises: KeyError: When the key is missing and no default was given. """ raw = self.backend.pop(key, default=None) if raw is None: if default is MISSING: raise KeyError(key) return default try: return self.encryptor.decrypt(raw) except CryptoError: # Entry was already popped — nothing left to evict. if default is MISSING: raise KeyError(key) return default
[docs] def clear(self) -> None: """ Evict all entries from the underlying backend. """ self.backend.clear()
# Async variants — delegate to backend async methods where available.
[docs] async def async_set( self, key: str, value: Any, expiry: int | float | None = None ) -> None: """ Async-safe version of set. Args: key: Cache key. value: JSON-serialisable value to store. expiry: TTL in seconds. """ from duck.contrib.sync import ensure_async encrypted = self.encryptor.encrypt(value) if hasattr(self.backend, "async_set"): await self.backend.async_set(key, encrypted, expiry) else: await ensure_async(self.backend.set)(key, encrypted, expiry)
[docs] async def async_get(self, key: str, default: Any = None) -> Any: """ Async-safe version of get. Args: key: Cache key to look up. default: Returned on miss, expiry, or tamper detection. Returns: The decrypted value or default. """ from duck.contrib.sync import ensure_async if hasattr(self.backend, "async_get"): raw = await self.backend.async_get(key) else: raw = await ensure_async(self.backend.get)(key) if raw is None: return default try: return self.encryptor.decrypt(raw) except CryptoError: await self.async_delete(key) return default
[docs] async def async_pop(self, key: str, default: Any = MISSING) -> Any: """ Async-safe version of pop. Args: key: Cache key to retrieve and delete. default: Returned when absent. Raises KeyError if omitted. Returns: The decrypted value or default. Raises: KeyError: When the key is missing and no default was given. """ from duck.contrib.sync import ensure_async if hasattr(self.backend, "async_pop"): raw = await self.backend.async_pop(key, default=None) else: raw = await ensure_async(self.backend.pop)(key, default=None) if raw is None: if default is MISSING: raise KeyError(key) return default try: return self.encryptor.decrypt(raw) except CryptoError: if default is MISSING: raise KeyError(key) return default
[docs] async def async_delete(self, key: str) -> None: """ Async-safe version of delete. Args: key: Cache key to remove. """ from duck.contrib.sync import ensure_async if hasattr(self.backend, "async_delete"): await self.backend.async_delete(key) else: await ensure_async(self.backend.delete)(key)
[docs] async def async_clear(self) -> None: """ Async-safe version of clear. """ from duck.contrib.sync import ensure_async if hasattr(self.backend, "async_clear"): await self.backend.async_clear() else: await ensure_async(self.backend.clear)()
[docs] def close(self) -> None: """ Close the underlying backend. """ self.backend.close()