"""
Module to represent a request/response Content class.
"""
import gzip
import zlib
import fnmatch
from typing import Tuple
from duck.exceptions.all import ContentError
from duck.http.mimes import (
guess_data_mimetype,
guess_file_mimetype,
)
from duck.settings import SETTINGS
# Set Compression Configuration
CONTENT_COMPRESSION = SETTINGS["CONTENT_COMPRESSION"]
COMPRESSION_ENCODING = CONTENT_COMPRESSION.get(
"encoding", "identity",
) # defaults to gzip
COMPRESSION_MIN_SIZE = CONTENT_COMPRESSION.get(
"min_size", 1024,
) # defaults to files more than 1KB
COMPRESSION_MAX_SIZE = CONTENT_COMPRESSION.get(
"max_size", 512 * 1024,
) # defaults to files not more than 512KB
COMPRESSION_LEVEL = CONTENT_COMPRESSION.get(
"level", 5,
) # defaults to 5, optimum in most cases
COMPRESS_STREAMING_RESPONSES = CONTENT_COMPRESSION.get(
"compress_streaming_responses", True,
) # defaults to True
COMPRESSION_MIMETYPES = CONTENT_COMPRESSION.get(
"mimetypes",
[
"text/*",
"application/javascript",
"application/json",
"application/xml",
"application/xhtml+xml",
"application/rss+xml",
"application/atom+xml",
], # avoid compressing already compressed files like images
)
[docs]
class Content:
"""
Content class to represent the data to be sent in the response.
"""
__slots__ = (
"_Content__data",
"_Content__filepath",
"_Content__encoding",
"_Content__content_type",
"_force_size",
"compression_min_size",
"compression_max_size",
"compression_level",
"compression_mimetypes",
"supported_encodings",
"suppress_errors",
"auto_read_file",
"disable_datatype_check",
)
def __init__(
self,
data: bytes = b"",
filepath: str = None,
content_type: str = None,
encoding: str = "identity",
compression_min_size: int = COMPRESSION_MIN_SIZE,
compression_max_size: int = COMPRESSION_MAX_SIZE,
compression_level: int = COMPRESSION_LEVEL,
compression_mimetypes: list = COMPRESSION_MIMETYPES,
suppress_errors: bool = False,
auto_read_file: bool = True,
):
"""
Initialize the content object.
Args:
data (bytes): The data to be set.
filepath (str): The file path to read data from.
content_type (str): The content type of the data.
encoding (str): The encoding of the data.
compression_min_size (int): The minimum size of data to compress.
compression_max_size (int): The maximum size of data to compress.
compression_level (int): The compression level to use.
compression_mimetypes (list): The list of mimetypes to compress.
suppress_errors (bool): Whether to suppress errors by trying to fix any issues.
auto_read_file (bool): Automatically read file and set as data if data not provided.
"""
self.__data = None
self.__filepath = None
self.__encoding = None
self.__content_type = None
self._force_size = None
self.compression_min_size = compression_min_size
self.compression_max_size = compression_max_size
self.compression_level = compression_level
self.compression_mimetypes = compression_mimetypes
self.supported_encodings = ["gzip", "deflate", "identity", "br"]
self.suppress_errors = suppress_errors
self.auto_read_file = auto_read_file
self.set_content(data or b'', filepath, content_type)
self.encoding = encoding
self.disable_datatype_check = False
[docs]
def _compress(self, data: bytes, encoding: str, **kwargs) -> Tuple[bytes, bool]:
"""
Compress data for the provided encoding.
Args:
data (bytes): The data to compress.
encoding (str): The encoding to use.
**kwargs: Additional arguments for zlib, brotli or deflate compression
Returns:
(data, success) (Tuple[bytes, bool]): The data and bool whether compression was successful.
Conditions:
- `enable_content_compression` = True.
- `size` <= compression_max_size and `size` >= compression_min_size
- `content_type` set.
- `encoding` is recognized.
- `data` is in bytes.
"""
success = False
mimetype_supported = self.mimetype_supported(self.content_type)
if (data and len(data) <= self.compression_max_size
and len(data) >= self.compression_min_size
and isinstance(data, bytes) and mimetype_supported):
if not encoding:
raise ContentError(
"Please set encoding first to compress data")
if encoding == "gzip":
data = gzip.compress(data, compresslevel=self.compression_level)
success = True
elif encoding == "deflate":
data = zlib.compress(data, level=self.compression_level, **kwargs)
success = True
elif encoding == "br":
try:
import brotli
except ImportError as e:
raise ContentError(
"Brotli compression requires brotli library, please install it first using `pip install brotli`"
) from e
data = brotli.compress(data, quality=self.compression_level, **kwargs)
success = True
elif encoding == "identity":
success = True
return data, success
[docs]
def _decompress(self, data: bytes) -> Tuple[bytes, bool]:
"""
Decompress data for the provided encoding.
Returns:
(data, success) (Tuple[bytes, bool]): The data and bool whether decompression was successful.
Conditions:
- `enable_content_compression` = True.
- `content_type` set.
- `size` <= compression_max_size
- `encoding` is recognized.
- `data` is in bytes.
"""
success = False
mimetype_supported = self.mimetype_supported(self.content_type)
if (data and len(data) <= self.compression_max_size
and isinstance(data, bytes) and mimetype_supported):
if not self.encoding:
raise ContentError(
"Please set encoding first to decompress data")
if self.encoding == "gzip":
data = gzip.decompress(data)
success = True
elif str(self.encoding).lower() == "deflate":
data = zlib.decompress(data)
success = True
elif self.encoding == "br":
try:
import brotli
except ImportError as e:
raise ContentError(
"Brotli decompression requires brotli library, please install it first using `pip install brotli`"
) from e
data = brotli.decompress(data)
success = True
elif self.encoding == "identity":
success = True
return data, success
[docs]
def mimetype_supported(self, mimetype: str) -> bool:
"""
Checks whether the given mimetype is supported for compression or decompression.
"""
for pattern in self.compression_mimetypes:
if fnmatch.fnmatch(mimetype, pattern):
return True
return False
[docs]
def correct_encoding(self):
"""
Returns the calculated current correct encoding depending on the current data.
"""
if not self.data:
return "identity"
if self.data.startswith(b"\x1f\x8b\x08"):
return "gzip"
if (self.data.startswith(b"\x78\x9c")
or self.data.startswith(b"\x78\x01")
or self.data.startswith(b"\x78\xda")):
return "deflate"
if self.data.startswith(b"\x8b\x8b\x8b"):
return "br"
return "identity"
@property
def compressed(self):
"""
Check if the content data is compressed.
"""
if self.correct_encoding() != "identity":
return True
return False
[docs]
def compress(self, encoding: str, **kwargs) -> bool:
"""
Compress the content data.
Args:
encoding (str): The encoding to use.
**kwargs: Additional arguments for zlib, brotli or deflate compression
Returns:
bool: Whether compression has been successfull.
"""
if not self.data:
return self.data, False
self.data, success = self._compress(self.data, encoding, **kwargs)
if success:
self.__encoding = encoding
return success
[docs]
def decompress(self):
"""
Decompress the content data.
Returns:
bool: Whether compression has been successfull.
"""
if not self.data:
self.__encoding = "identity"
return self.data, False
self.data, success = self._decompress(self.data)
if success:
self.__encoding = "identity"
return success
@property
def raw(self):
return self.data
@property
def data(self):
return bytes(self.__data)
@data.setter
def data(self, data: bytes):
if not isinstance(data, bytes) and not self.disable_datatype_check:
if self.suppress_errors:
self.__data = b""
return self.__data
raise ContentError("Bytes required as data.")
self.__data = data
return self.__data
@property
def filepath(self) -> str:
"""
Get the file path.
Returns:
str: The current file path.
"""
return self.__filepath
@filepath.setter
def filepath(self, filepath: str) -> None:
"""
Set the file path.
Args:
filepath (str): The new file path to set.
"""
self.__filepath = filepath
@property
def encoding(self) -> str:
"""
Get the file encoding.
Returns:
str: The current encoding of the file.
"""
return self.__encoding
@encoding.setter
def encoding(self, encoding: str) -> None:
"""
Set the file encoding.
Args:
encoding (str): The new encoding to set.
"""
self.__encoding = encoding
@property
def content_type(self) -> str:
"""
Get the content type.
Returns:
str: The current content type.
"""
return self.__content_type
@content_type.setter
def content_type(self, content_type: str) -> None:
"""
Set the content type.
Args:
content_type (str): The new content type to set.
"""
self.__content_type = content_type
@property
def size(self) -> int:
"""
Returns the size of the data.
If a fake size is set, it returns the forced size.
Otherwise, it calculates the size based on the current data.
"""
if self._force_size is not None:
return self._force_size
if self.__data is not None:
return len(self.__data)
return 0
[docs]
def set_fake_size(self, size: int) -> None:
"""
Forcefully set a fake size for the data.
Args:
size (int): The fake size to set.
"""
self._force_size = size
[docs]
def remove_fake_size(self) -> None:
"""
Remove the forced fake size, restoring size calculation
to be based on the actual data.
"""
self._force_size = None
[docs]
def parse_type(self, content_type=None):
"""
Parse the mimetype of content data, if content_type is None, the content_type will be guessed.
"""
if not content_type:
if self.filepath:
content_type = guess_file_mimetype(self.filepath)
# guess data mimetype if guessing filedata mimetype fails
content_type = content_type or guess_data_mimetype(
data=self.data or b"")
if not content_type:
# guessing data and filedata mimetype fails, this is likely binary content
content_type = "application/octet-stream"
self.content_type = content_type
[docs]
def force_set_data(self, data):
"""
Forcily set data regardless of the type.
"""
self.__data = data
[docs]
def set_content(
self,
data: bytes = b"",
filepath: str = None,
content_type=None,
):
"""
Set the content and data should already be encoded to bytes.
"""
if not data:
if filepath and self.auto_read_file:
try:
with open(filepath, "rb") as fd:
data = fd.read()
except Exception as e:
if self.suppress_errors:
self.__data = b""
return
raise ContentError(
f"Could not set content from file {filepath}: {e}"
) from e
self.filepath = filepath
self.data = bytes(data, "utf-8") if not isinstance(data, bytes) else data
self.parse_type(content_type=content_type)
[docs]
def __repr__(self):
return f"<{self.__class__.__name__} encoding={self.encoding}, size={self.size}, content_type={self.content_type}, compressed={self.compressed}>"