"""
Module containing ResponseFinalizer class focusing on putting on the final touches to the response.
The final touches include:
- Content compression.
- Content length calculation and insertion.
- Content encoding determination and insertion.
- etc.
"""
import re
import io
import fnmatch
from inspect import isasyncgen
from typing import (
Dict,
Optional,
Callable,
)
from duck.http.content import COMPRESS_STREAMING_RESPONSES
from duck.http.request import HttpRequest
from duck.http.response import (
HttpResponse,
FileResponse,
ComponentResponse,
StreamingHttpResponse,
StreamingRangeHttpResponse,
HttpRangeNotSatisfiableResponse,
)
from duck.logging import logger
from duck.logging.logger import (
handle_exception as log_failsafe,
)
from duck.settings import SETTINGS
from duck.utils.dateutils import gmt_date
from duck.utils.asyncio import in_async_context
from duck.shortcuts import (
replace_response,
template_response,
simple_response,
to_response,
)
from duck.meta import Meta
from duck.csp import csp_nonce, csp_nonce_flag
# Custom templates for predefined responses
# This is a mapping of status codes to a response generating callable
CUSTOM_TEMPLATES: Dict[int, Callable] = SETTINGS["CUSTOM_TEMPLATES"] or {}
if SETTINGS["ENABLE_HTTPS"]:
SECURITY_HEADERS = SETTINGS["SSL_SECURITY_HEADERS"]
else:
SECURITY_HEADERS = SETTINGS["SECURITY_HEADERS"]
[docs]
def set_compressable_iter_content(response):
"""
Modifies the response `iter_content` methods with new functions to compress data as were are iterating.
Note:
- Only use this function if response data is compressable.
- This function modifies both sync and async version of iter_content, i.e.
`iter_content` and `async_iter_content`.
"""
from duck.http.content import (
COMPRESSION_ENCODING,
COMPRESSION_LEVEL,
COMPRESSION_MAX_SIZE,
COMPRESSION_MIN_SIZE,
CONTENT_COMPRESSION,
COMPRESSION_MIMETYPES,
)
content_type = response.get_header("content-type", "")
def iter_and_compress():
"""
Compress content as we are iterating.
"""
for chunk in response.super_iter_content():
if not chunk:
continue # Skip empty or None chunks
# Create a fresh compression wrapper or content object per chunk
content_obj = response.content_obj.__class__() # Clone a fresh object
content_obj.set_content(chunk, content_type=content_type)
content_obj.compression_level = COMPRESSION_LEVEL
content_obj.compression_min_size = 0
content_obj.compression_max_size = len(chunk) + 1
content_obj.compression_mimetypes = COMPRESSION_MIMETYPES
compressed = content_obj.compress(COMPRESSION_ENCODING)
compressed_data = content_obj.data
yield compressed_data
async def async_iter_and_compress():
"""
Compress content as we iterate towards it, one chunk at a time.
"""
content = response.super_async_iter_content()
if not isasyncgen(content):
# The content is not an async generator so lets await the coroutine
content = await content
if not isasyncgen(content):
for chunk in content:
if not chunk:
continue # Skip empty or None chunks
# Create a fresh compression wrapper or content object per chunk
content_obj = response.content_obj.__class__() # Clone a fresh object
content_obj.set_content(chunk, content_type=content_type)
content_obj.compression_level = COMPRESSION_LEVEL
content_obj.compression_min_size = 0
content_obj.compression_max_size = len(chunk) + 1
content_obj.compression_mimetypes = COMPRESSION_MIMETYPES
compressed = content_obj.compress(COMPRESSION_ENCODING)
compressed_data = content_obj.data
yield compressed_data
else:
async for chunk in content:
if not chunk:
continue # Skip empty or None chunks
# Create a fresh compression wrapper or content object per chunk
content_obj = response.content_obj.__class__() # Clone a fresh object
content_obj.set_content(chunk, content_type=content_type)
content_obj.compression_level = COMPRESSION_LEVEL
content_obj.compression_min_size = 0
content_obj.compression_max_size = len(chunk) + 1
content_obj.compression_mimetypes = COMPRESSION_MIMETYPES
compressed = content_obj.compress(COMPRESSION_ENCODING)
compressed_data = content_obj.data
yield compressed_data
# Before assigning check if the old methods are not already set to these above functions
old_iter_content_qualname = response.iter_content.__qualname__
old_async_iter_content_qualname = response.async_iter_content.__qualname__
iter_and_compress_qualname = iter_and_compress.__qualname__
async_iter_and_compress_qualname = async_iter_and_compress.__qualname__
if old_iter_content_qualname != iter_and_compress_qualname:
response.super_iter_content = response.iter_content
response.iter_content = iter_and_compress
if old_async_iter_content_qualname != async_iter_and_compress_qualname:
response.super_async_iter_content = response.async_iter_content
response.async_iter_content = async_iter_and_compress
[docs]
class ResponseFinalizer:
"""
ResponseFinalizer class focusing on putting on the final touches to the response.
"""
@log_failsafe
def do_set_fixed_headers(self, response, request) -> None:
"""
Sets fixed headers from settings, i.e. extra headers, cors headers and security headers.
"""
extra = SETTINGS["EXTRA_HEADERS"] or {}
cors = SETTINGS["CORS_HEADERS"] or {}
security = SECURITY_HEADERS or {}
for h, v in {**security, **cors, **extra}.items():
response.headers[h] = v
# Set CSP header
if request and SETTINGS["ENABLE_HEADERS_SECURITY_POLICY"]:
csp_directives = SETTINGS['CSP_TRUSTED_SOURCES']
nonce = csp_nonce(request)
if csp_directives:
csp_parts = []
for directive, sources in csp_directives.items():
if not sources:
continue
# Build each source string
source_parts = [
f"'nonce-{nonce}'" if i == csp_nonce_flag else i
for i in sources
]
csp_parts.append(f"{directive} {' '.join(source_parts)}")
csp_value = "; ".join(csp_parts) + ";"
response.set_header("Content-Security-Policy", csp_value)
@log_failsafe
def do_set_connection_mode(self, response, request) -> None:
"""
Sets the correct response connection mode, i.e. `keep-alive` or `close`.
"""
connection_mode = None
server_mode = SETTINGS["CONNECTION_MODE"].lower()
if not request:
response.set_header("Connection", "close")
return
if request.connection == server_mode:
connection_mode = server_mode
else:
connection_mode = "close"
response.set_header("Connection", connection_mode)
@log_failsafe
def do_set_extra_headers(self, response, request) -> None:
"""
Sets last final extra headers like Date & Cache-Control.
"""
response.set_header("date", gmt_date())
if SETTINGS['DEBUG']:
cache_control = response.get_header("cache-control")
default_cache_control = SECURITY_HEADERS.get("Cache-Control")
if not cache_control or cache_control is default_cache_control:
response.set_header("cache-control", "no-cache")
@log_failsafe
def do_content_compression(self, response, request) -> None:
"""
Compresses the content if the client supports it and
if the content is not a streaming response. (if necessary).
"""
from duck.http.content import (
COMPRESSION_ENCODING,
COMPRESSION_LEVEL,
COMPRESSION_MAX_SIZE,
COMPRESSION_MIN_SIZE,
CONTENT_COMPRESSION,
COMPRESSION_MIMETYPES,
)
accept_encoding = request.get_header("accept-encoding", "").lower() if request else ""
supported_encodings = ["gzip", "deflate", "br", "identity"]
if CONTENT_COMPRESSION.get("vary_on", False):
# Patch vary headers
existing_vary_headers = response.get_header("Vary") or ""
if existing_vary_headers:
existing_vary_headers += ", "
response.set_header(
"Vary",
existing_vary_headers + "Accept-Encoding",
)
if (not request or not SETTINGS["ENABLE_CONTENT_COMPRESSION"]
or COMPRESSION_ENCODING not in accept_encoding
or COMPRESSION_ENCODING not in supported_encodings
or response.content_obj.correct_encoding() != "identity"
):
# No need to compress content if correct_encoding is not identity (might already be compressed)
response.set_header(
"Content-Encoding",
response.content_obj.correct_encoding(),
)
return
if not isinstance(response, StreamingHttpResponse):
# Normal HTTP response here.
response.content_obj.compression_level = COMPRESSION_LEVEL
response.content_obj.compression_min_size = COMPRESSION_MIN_SIZE
response.content_obj.compression_max_size = COMPRESSION_MAX_SIZE
response.content_obj.compression_mimetypes = COMPRESSION_MIMETYPES
compressed = response.content_obj.compress(COMPRESSION_ENCODING)
if compressed:
response.set_header("Content-Encoding", response.content_obj.encoding)
else:
response.set_header(
"Content-Encoding",
response.content_obj.correct_encoding(),
)
else:
# Streaming HTTP response here.
if not COMPRESS_STREAMING_RESPONSES:
# Compressing streaming responses disallowed
return
# Check if we are dealing with a StreamingRangeHttpResponse
if isinstance(response, StreamingRangeHttpResponse):
start_pos, end_pos = response.start_pos, response.end_pos
content_size = end_pos - start_pos
if not (content_size >= COMPRESSION_MIN_SIZE and content_size <= COMPRESSION_MAX_SIZE):
# Compression not applicable.
return
content_type = response.get_header("content-type", "")
total_stream_size = None
if hasattr(response, "stream") and hasattr(response.stream, "tell") and hasattr(response.stream, "seek"):
response.stream.seek(0, io.SEEK_END) # seek to EOF
total_stream_size = response.stream.tell()
else:
return # Quit with the compression, no stream!
if total_stream_size is not None:
if total_stream_size < COMPRESSION_MIN_SIZE or total_stream_size > COMPRESSION_MAX_SIZE :
# Total stream size if beyond or below compression limits
return
else:
# Don't compress anything with unknown size
return
# Don't compress HttpProxyResponse instances as doing response.iter_content() for checking if data is compressable
# may make content data inconsistent.
compressable = False # Whether the content is compressable by trying to compress the first chunk
# Check if content is compressable.
for initial_chunk in response.iter_content():
if initial_chunk:
# Create a fresh compression wrapper or content object per chunk
chunk = initial_chunk[:8] # Check compression using first 8 bytes to avoid performance degradationt
content_obj = response.content_obj.__class__() # Clone a fresh object
content_obj.set_content(chunk, content_type=content_type)
content_obj.compression_level = COMPRESSION_LEVEL
content_obj.compression_min_size = 0
content_obj.compression_max_size = 8
content_obj.compression_mimetypes = COMPRESSION_MIMETYPES
compressable = content_obj.compress(COMPRESSION_ENCODING) # sets if content is compressable
break
if response.get_header("content-encoding", "identity") == "identity" and compressable:
# Assume compression will not fail, this is is a bit dangerous if compression fails as response might include
# unmatching invalid content content encoding
response.set_header("Content-Encoding", COMPRESSION_ENCODING)
# Modify response iter_content & async_iter_content to new funcs which
# compress data as we are iterating over it.
set_compressable_iter_content(response)
@log_failsafe
def do_set_content_headers(self, response, request) -> None:
"""
Sets the appropriate content headers like `Content-Type`, `Content-Encoding` & `Content-Length` if not set.
Notes:
- If response is an instance of `StreamingHttpResponse`, the `Content-Length` header is removed as a safe measure. The size
of the response content can become unpredictable especially when data is compressed as it is being sent.
"""
from duck.http.content import COMPRESSION_ENCODING
from duck.http.core.proxyhandler import HttpProxyResponse
# Set some content headers if not set.
content_length = response.get_header("content-length")
content_encoding = response.get_header("content-encoding")
content_type = response.get_header("content-type")
if not isinstance(response, StreamingHttpResponse):
# Normal HTTP response here
# Set content-length if not set.
if not content_length:
response.set_header("content-length", response.content_length)
# Set content encoding if not set.
if not content_encoding:
response.set_header("content-encoding", response.content_encoding or response.content_obj.correct_encoding())
# Set content-type if not set.
if not content_type:
# Set the predicted content-type from the content object.
response.set_header('content-type', response.content_type)
else:
# Streaming HTTP response here.
# Remove content-length for streaming responses,
# the response content may be unpredictable. This is a safe measure.
if content_length:
# Only ProxyResponse instance is an exception.
if not isinstance(response, HttpProxyResponse):
response.delete_header("content-length")
# Set the content-encoding if not set.
if not content_encoding:
response.set_header("content-encoding", "identity") # default encoding.
# Set the content-type if not set.
if not content_type:
response.set_header("content-type", "application/octet-stream") # default content type for streaming responses.
@log_failsafe
def do_set_streaming_range(self, response, request):
"""
Set streaming range attributes on StreamingRangeHttpResponse.
This method parses the 'Range' header from the request and sets the
start and end positions for partial content streaming.
Args:
response (StreamingRangeHttpResponse): The response object to set streaming range on.
request (HttpRequest): The incoming HTTP request containing the 'Range' header.
Raises:
ValueError: If the 'Range' header is malformed or invalid.
"""
if not request:
return # If no request is provided, exit early.
if not isinstance(response, StreamingRangeHttpResponse):
return # Response is incompatible.
# Set the Range header.
range_header = request.get_header('Range')
if not range_header:
if isinstance(response, StreamingRangeHttpResponse):
if response.status_code == 206:
response.payload_obj.parse_status(200) # modify the response to correct status
response.clear_content_range_headers() # clear range headers
return # If no Range header exists, no need to set content range headers.
# Parse Range header.
if response.status_code == 200:
# Invalid status (200 OK) instead of (206 Partial Content)
response.payload_obj.parse_status(206) # modify the response to correct status
try:
# Extract start and end positions from the Range header
# Note: Use response.start_pos & end_pos rather than start, end as they are the most recent offsets.
start, end = StreamingRangeHttpResponse.extract_range(range_header)
# Set the start and end positions on the response object
response.parse_range(start, end) # set content range headers (if applicable)
except ValueError as e:
# replace response data
new_response = None
if SETTINGS["DEBUG"]:
new_response = template_response(
HttpRangeNotSatisfiableResponse,
body=(
f"<p>Range is not satisfiable, could not resolve: {range_header}</p>"
f"<p>Exception: {e}</p>"
)
)
else:
new_response = simple_response(HttpRangeNotSatisfiableResponse)
# Replace response with new data
replace_response(response, new_response)
# Finalize response again as it has new values
# Set do_set_streaming_range & do_content_compression to False to avoid max recursion error
self.finalize_response(
response,
request,
do_set_streaming_range=False,
do_content_compression=False,
)
@log_failsafe
def do_request_response_transformation(self, response: HttpResponse, request: HttpRequest):
"""
Transforms the response object by applying request- and response-based modifications.
This includes, but is not limited to, header changes and body alterations.
Behavior Examples:
- If the request method is `HEAD`, the response body is replaced with empty bytes.
- If a matching template is found in the `CUSTOM_TEMPLATES` configuration, the entire response may be replaced.
Args:
response (HttpResponse): The original response to be transformed.
request (HttpRequest): The incoming HTTP request associated with the response.
"""
# Check if a custom template is configured for this response
# Return the http response object.
if request and str(request.method).upper() == "HEAD":
# Reset content
request.set_content(b"", auto_add_content_headers=True)
if response:
if response.status_code in CUSTOM_TEMPLATES:
response_callable = CUSTOM_TEMPLATES[response.status_code]
if not callable(response_callable):
raise TypeError(f"Callable required for custom template corresponding to status code of '{response.status_code}' ")
# Parse parameters and obtain the custom template response.
new_response = response_callable(
current_response=response,
request=request,
)
try:
new_response = to_response(new_response) # convert or check the validity of the custom response.
except TypeError:
# The value returned by response_generating_callable is not valid
raise TypeError(f"Invalid data returned by the custom template callable corresponding to status code '{response.status_code}' ")
# Replace response with new data
replace_response(response, new_response)
[docs]
def finalize_response(
self,
response: HttpResponse,
request: HttpRequest,
do_set_streaming_range: bool = True,
do_content_compression: bool = True,
):
"""
Puts the final touches to the response.
"""
# All of the following method calls are failsafe meaning failure of any method
# will not affect the execution of other methods, thus an error encountered will be
# logged appropriately. Decorator responsible: @log_failsafe
self.do_request_response_transformation(response, request)
self.do_set_fixed_headers(response, request)
self.do_set_connection_mode(response, request)
self.do_set_extra_headers(response, request)
if do_set_streaming_range:
self.do_set_streaming_range(response, request)
# Do content compression in the end.
if do_content_compression:
self.do_content_compression(response, request)
# Lastly review content headers.
self.do_set_content_headers(response, request)
[docs]
class AsyncResponseFinalizer(ResponseFinalizer):
"""
Asynchronous ResponseFinalizer class focusing on putting on the final touches to the response.
"""
@log_failsafe
async def do_content_compression(self, response, request) -> None:
"""
Compresses the content if the client supports it and
if the content is not a streaming response. (if necessary).
"""
from duck.http.content import (
COMPRESSION_ENCODING,
COMPRESSION_LEVEL,
COMPRESSION_MAX_SIZE,
COMPRESSION_MIN_SIZE,
CONTENT_COMPRESSION,
COMPRESSION_MIMETYPES,
)
accept_encoding = request.get_header("accept-encoding", "").lower() if request else ""
supported_encodings = ["gzip", "deflate", "br", "identity"]
if CONTENT_COMPRESSION.get("vary_on", False):
# Patch vary headers
existing_vary_headers = response.get_header("Vary") or ""
if existing_vary_headers:
existing_vary_headers += ", "
response.set_header(
"Vary",
existing_vary_headers + "Accept-Encoding",
)
if (not request or not SETTINGS["ENABLE_CONTENT_COMPRESSION"]
or COMPRESSION_ENCODING not in accept_encoding
or COMPRESSION_ENCODING not in supported_encodings
or response.content_obj.correct_encoding() != "identity"
):
# No need to compress content if correct_encoding is not identity (might already be compressed)
response.set_header(
"Content-Encoding",
response.content_obj.correct_encoding(),
)
return
if not isinstance(response, StreamingHttpResponse):
# Normal HTTP response here.
response.content_obj.compression_level = COMPRESSION_LEVEL
response.content_obj.compression_min_size = COMPRESSION_MIN_SIZE
response.content_obj.compression_max_size = COMPRESSION_MAX_SIZE
response.content_obj.compression_mimetypes = COMPRESSION_MIMETYPES
compressed = response.content_obj.compress(COMPRESSION_ENCODING)
if compressed:
response.set_header("Content-Encoding", response.content_obj.encoding)
else:
response.set_header(
"Content-Encoding",
response.content_obj.correct_encoding(),
)
else:
# Streaming HTTP response here.
if not COMPRESS_STREAMING_RESPONSES:
# Compressing streaming responses disallowed
return
# Check if we are dealing with a StreamingRangeHttpResponse
if isinstance(response, StreamingRangeHttpResponse):
start_pos, end_pos = response.start_pos, response.end_pos
content_size = end_pos - start_pos
if not(content_size >= COMPRESSION_MIN_SIZE and content_size <= COMPRESSION_MAX_SIZE):
# Compression not applicable.
return
content_type = response.get_header("content-type", "")
total_stream_size = None
if hasattr(response, "stream") and hasattr(response.stream, "tell") and hasattr(response.stream, "seek"):
response.stream.seek(0, io.SEEK_END) # seek to EOF
total_stream_size = response.stream.tell()
else:
if not isinstance(response, ComponentResponse):
return # Quit with the compression, no stream!
if total_stream_size is not None:
if total_stream_size < COMPRESSION_MIN_SIZE or total_stream_size > COMPRESSION_MAX_SIZE :
# Total stream size if beyond or below compression limits
return
else:
# Don't compress anything with unknown size
return
# Don't compress HttpProxyResponse instances as doing response.iter_content() for checking if data is compressable
# may make content data inconsistent.
compressable = False # Whether the content is compressable by trying to compress the first chunk
content = response.async_iter_content()
if not isasyncgen(content):
# The content is not an async generator so lets await it.
content = await content
if not isasyncgen(content):
for chunk in content:
if chunk:
# Create a fresh compression wrapper or content object per chunk
chunk = chunk[:8] # Check compression using first 8 bytes to avoid performance degradation
content_obj = response.content_obj.__class__() # Clone a fresh object
content_obj.set_content(chunk, content_type=content_type)
content_obj.compression_level = COMPRESSION_LEVEL
content_obj.compression_min_size = 0
content_obj.compression_max_size = 8
content_obj.compression_mimetypes = COMPRESSION_MIMETYPES
compressable = content_obj.compress(COMPRESSION_ENCODING) # sets if content is compressable
break
else:
async for chunk in content:
if chunk:
# Create a fresh compression wrapper or content object per chunk
chunk = chunk[:8] # Check compression using first 8 bytes to avoid performance degradationt
content_obj = response.content_obj.__class__() # Clone a fresh object
content_obj.set_content(chunk, content_type=content_type)
content_obj.compression_level = COMPRESSION_LEVEL
content_obj.compression_min_size = 0
content_obj.compression_max_size = 8
content_obj.compression_mimetypes = COMPRESSION_MIMETYPES
compressable = content_obj.compress(COMPRESSION_ENCODING) # sets if content is compressable
break
if response.get_header("content-encoding", "identity") == "identity" and compressable:
# Assume compression will not fail, this is is a bit dangerous if compression fails as response might include
# unmatching invalid content content encoding
response.set_header("Content-Encoding", COMPRESSION_ENCODING)
# Modify response iter_content & async_iter_content to new funcs which
# compress data as we are iterating over it.
set_compressable_iter_content(response)
@log_failsafe
async def do_set_streaming_range(self, response, request):
"""
Set streaming range attributes on StreamingRangeHttpResponse.
This method parses the 'Range' header from the request and sets the
start and end positions for partial content streaming.
Args:
response (StreamingRangeHttpResponse): The response object to set streaming range on.
request (HttpRequest): The incoming HTTP request containing the 'Range' header.
Raises:
ValueError: If the 'Range' header is malformed or invalid.
"""
if not request:
return # If no request is provided, exit early.
if not isinstance(response, StreamingRangeHttpResponse):
return
# Set the Range header.
range_header = request.get_header('Range')
if not range_header:
if isinstance(response, StreamingRangeHttpResponse):
if response.status_code == 206:
response.payload_obj.parse_status(200) # modify the response to correct status
response.clear_content_range_headers() # clear range headers
return # If no Range header exists, no need to set content range headers.
# Parse Range header.
if response.status_code == 200:
# Invalid status (200 OK) instead of (206 Partial Content)
response.payload_obj.parse_status(206) # modify the response to correct status
try:
# Extract start and end positions from the Range header
# Note: Use response.start_pos & end_pos rather than start, end as they are the most recent offsets.
start, end = StreamingRangeHttpResponse.extract_range(range_header)
# Set the start and end positions on the response object
response.parse_range(start, end) # set content range headers (if applicable)
except ValueError as e:
# replace response data
new_response = None
if SETTINGS["DEBUG"]:
new_response = template_response(
HttpRangeNotSatisfiableResponse,
body=(
f"<p>Range is not satisfiable, could not resolve: {range_header}</p>"
f"<p>Exception: {e}</p>"
)
)
else:
new_response = simple_response(HttpRangeNotSatisfiableResponse)
# Replace response with new data
replace_response(response, new_response)
# Finalize response again as it has new values
# Set do_set_streaming_range & do_content_compression to False to avoid max recursion error
await self.finalize_response(
response,
request,
do_set_streaming_range=False,
do_content_compression=False,
)
[docs]
async def finalize_response(
self,
response: HttpResponse,
request: HttpRequest,
do_set_streaming_range: bool = True,
do_content_compression: bool = True,
):
"""
Puts the final touches to the response.
"""
# All of the following method calls are failsafe meaning failure of any method
# will not affect the execution of other methods, thus an error encountered will be
# logged appropriately. Decorator responsible: @log_failsafe
self.do_request_response_transformation(response, request)
self.do_set_fixed_headers(response, request)
self.do_set_connection_mode(response, request)
self.do_set_extra_headers(response, request)
if do_set_streaming_range:
# This implementation needs to be awaited, it uses some asynchronous implementations.
await self.do_set_streaming_range(response, request)
# Do content compression in the end.
if do_content_compression:
await self.do_content_compression(response, request)
# Lastly review content headers.
self.do_set_content_headers(response, request)
# Set & initialize response finalizers
response_finalizer = ResponseFinalizer()
async_response_finalizer = AsyncResponseFinalizer()