"""
Module for representing Duck HttpResponses
"""
import io
import re
import os
import json
import asyncio
from http.cookies import SimpleCookie, Morsel
from inspect import isasyncgen
from datetime import datetime, timedelta
from collections.abc import (
Iterable,
Generator,
AsyncGenerator,
)
from typing import (
Dict,
Optional,
Union,
List,
Callable,
Tuple,
Any,
Awaitable,
)
from duck.settings import SETTINGS
from duck.http.content import Content
from duck.http.headers import Headers
from duck.http.request import HttpRequest
from duck.http.response_payload import (
BaseResponsePayload,
HttpResponsePayload,
)
from duck.template.environment import (
Engine,
Template,
)
from duck.html.components import Component
from duck.contrib.sync import iscoroutinefunction, convert_to_async_if_needed
from duck.exceptions.all import AsyncViolationError, FileNotFoundResponseError
from duck.etc.statuscodes import responses
from duck.utils.string import smart_truncate
from duck.utils.asyncio import in_async_context
from duck.utils.threading.threadpool import get_or_create_thread_manager
from duck.utils.fileio import (
FileIOStream,
AsyncFileIOStream,
to_async_fileio_stream,
)
StreamingType = Union[
Callable[[], Union[bytes, str]],
io.IOBase,
Iterable[Union[bytes, str]]
]
RANGE_HEADER_PATTERN = re.compile(r"bytes=(\d+)-(\d+)?")
NEGATIVE_RANGE_PATTERN = re.compile(r"bytes=(-\d+)")
[docs]
class BaseResponse:
"""
Response object to represent raw response
"""
def __init__(
self,
payload_obj: HttpResponsePayload,
content_obj: Optional[Content] = None,
):
"""'
Initialize Response object
Args:
payload_obj (HttpResponsePayload): Response Header object to represent Header
content_obj (Optional[Content]): Content object.
"""
assert isinstance(payload_obj, BaseResponsePayload), (
f"Expected payload type 'BaseResponsePayload', but got '{type(payload_obj).__name__}'"
)
self.payload_obj: BaseResponsePayload = payload_obj
self.content_obj: Content = content_obj or Content(b"")
self.set_content_type_header()
@property
def cookies(self) -> SimpleCookie:
"""
Property getter for cookies. If not already initialized, it initializes the cookies.
Returns:
SimpleCookie: The cookies for the response.
"""
return self.payload_obj.cookies
@cookies.setter
def cookies(self, cookies_obj: SimpleCookie) -> None:
"""
Setter for cookies. Assigns a SimpleCookie object to the internal cookies attribute.
Args:
cookies_obj (SimpleCookie): The SimpleCookie object to set.
"""
self.payload_obj.cookies = cookies_obj
@property
def raw(self) -> bytes:
"""
Retrieve the raw response data as a bytes object.
This method returns the full response content (including headers) in its raw byte form,
which can be useful for processing non-text-based data or for
low-level handling of the response.
Returns:
bytes: The raw byte representation of the response.
"""
parts = [self.payload_obj.raw]
parts.append(b"\r\n\r\n" if not parts[0].endswith(b"\r\n") else b"\r\n")
parts.append(self.content or b"")
return b"".join(parts)
@property
def content(self) -> bytes:
"""
Returns the content for the response.
"""
return self.content_obj.data
@property
def content_type(self) -> str:
"""
Returns the content type for the response.
Notes:
- This is not retrieved from headers but directly from the content object.
"""
return self.content_obj.content_type
@property
def content_length(self) -> int:
"""
Returns the content length for the response.
Notes:
- This is not retrieved from headers but directly from the content object.
"""
return self.content_obj.size
@property
def content_encoding(self) -> str:
"""
Returns the content encoding for the response.
Notes:
- This is not retrieved from headers but directly from the content object.
"""
return self.content_obj.encoding
@property
def headers(self) -> Headers:
"""
Return the current response headers.
"""
return self.payload_obj.headers
@property
def title_headers(self) -> dict:
"""
Response headers in title format rather than small cased
e.g. `{'Connection': 'close'}` rather than `{'connection': 'close'}`.
"""
return self.payload_obj.headers.titled_headers()
@property
def status(self) -> tuple[int, str]:
"""
The status of the response
Returns:
status (tuple): Status code and status message for response
"""
return self.payload_obj.status_code, self.payload_obj.status_message
@property
def status_code(self) -> int:
"""
Return status code for the response.
"""
return self.payload_obj.status_code
@status_code.setter
def status_code(self, value: int) -> None:
"""
Set status code for the response.
"""
self.payload_obj.status_code = value
@property
def status_message(self) -> str:
"""
Return the status message for the response.
"""
return self.payload_obj.status_message
@status_message.setter
def status_message(self, value: str) -> None:
"""
Set status message for the response.
"""
self.payload_obj.status_message = value
@property
def status_explanation(self) -> str:
"""
Return the status message explanation for the response.
"""
return self.payload_obj.explanation
@status_explanation.setter
def status_explanation(self, value: str) -> None:
"""
Set status explanation for the response.
"""
self.payload_obj.explanation = value
[docs]
def set_multiple_cookies(self, cookies: Dict[str, Dict[str, Any]]) -> None:
"""
Sets multiple cookies at once. Each cookie is specified by a dictionary of attributes.
Args:
cookies (Dict[str, Dict[str, Any]]): A dictionary where the key is the cookie name
and the value is another dictionary of cookie attributes.
"""
self.payload_obj.get_multiple_cookies(cookies)
[docs]
def get_cookie_obj(self, name: str) -> Optional[Morsel]:
"""
Retrieves the cookie object/morsel of a specific cookie by name.
Args:
name (str): The name of the cookie to retrieve.
Returns:
Optional[Morsel]: The cookie object, or None if the cookie does not exist.
"""
return self.payload_obj.get_cookie_obj(name)
[docs]
def get_cookie(self, name: str) -> str:
"""
Retrieves the value of a specific cookie by name.
Args:
name (str): The name of the cookie to retrieve.
Returns:
str: The cookie value, or an empty string if the cookie does not exist.
"""
return self.payload_obj.get_cookie(name)
[docs]
def get_cookie_str(self, name: str, include_cookie_name: bool = True) -> str:
"""
Returns the cookie string for the provided name with all fields including max-age, domain, path, etc.
Args:
include_cookie_name (bool): Whether to cookie name e.g. `cookie=something;`. Defaults to True.
"""
return self.payload_obj.get_cookie_str(name, include_cookie_name=include_cookie_name)
[docs]
def get_all_cookies(self) -> Dict[str, str]:
"""
Retrieves all cookies as a dictionary.
Returns:
Dict[str, str]: A dictionary of all cookies, where the key is the cookie name and the value is the cookie value.
"""
return self.payload_obj.get_all_cookies()
[docs]
def set_cookie(
self,
key: str,
value: str = "",
domain: Optional[str] = None,
path: str = "/",
max_age: Optional[Union[int, timedelta]] = None,
expires: Optional[Union[datetime, str]] = None,
secure: bool = False,
httponly: bool = False,
samesite: Optional[str] = "Lax",
) -> None:
"""
Set a custom cookie on the HttpResponse.
Args:
key (str): The name of the cookie (e.g., 'csrftoken').
value (str): The value of the cookie (e.g., 'some_random_token').
domain (Optional[str]): The domain to set for the cookie. Defaults to None.
path (str): The path for the cookie. Defaults to '/' indicating the whole site.
max_age (Optional[Union[int, timedelta]]): The maximum age of the cookie in seconds or as a timedelta object.
expires (Optional[Union[datetime, str]]): The expiration date of the cookie. Defaults to None.
secure (bool): Whether the cookie should only be sent over HTTPS connections. Defaults to False.
httponly (bool): Whether the cookie should be inaccessible to JavaScript. Defaults to False.
samesite (Optional[str]): The SameSite attribute for the cookie. Default is 'Lax'. Other possible values are 'Strict' or 'None'.
Raises:
ValueError: If an invalid value is provided for `samesite`.
"""
self.payload_obj.set_cookie(
key = key,
value = value,
domain = domain,
path = path,
max_age = max_age,
expires = expires,
secure = secure,
httponly = httponly,
samesite = samesite,
)
[docs]
def delete_cookie(self, key: str, path: str = "/", domain: Optional[str] = None) -> None:
"""
Delete a cookie from the HttpResponse by setting its expiration date to the past.
This will prompt the client to remove the cookie.
Args:
key (str): The name of the cookie to delete.
path (str): The path for which the cookie was set. Defaults to "/".
domain (Optional[str]): The domain for which the cookie was set. Defaults to None.
"""
self.payload_obj.delete_cookie(key, path, domain)
[docs]
def __repr__(self):
return f"<{self.__class__.__name__} (" f"'{self.status_code}'" f")>"
[docs]
class HttpResponse(BaseResponse):
"""
Class representing an http response.
"""
def __init__(
self,
content: Optional[Union[str, bytes]] = None,
status_code: int = 200,
headers: dict = {},
content_type: Optional[str] = None,
):
payload_obj = HttpResponsePayload()
payload_obj.parse_status(status_code)
payload_obj.headers.update(headers) # update payload headers
content_type_header = payload_obj.get_header("content-type")
if content_type and content_type_header:
raise ValueError(
"Content type cannot be specified both as an argument and in the headers. "
"Please provide it in one place only: either as an argument or in the headers."
)
if content and isinstance(content, str):
content = content.encode("utf-8")
# set content object
content_obj = Content(
data=content,
content_type=content_type or content_type_header,
)
super().__init__(payload_obj, content_obj)
[docs]
class StreamingHttpResponse(HttpResponse):
"""
Class representing an HTTP streaming response.
This class allows for streaming large content, such as files or dynamically
generated data, in chunks instead of loading the entire content into memory
at once. This is particularly useful for handling large files like videos,
audio, or data streams.
Notes:
- If a stream provided has a method `close`, it will be called after the response has been sent.
"""
def __init__(
self,
stream: StreamingType,
status_code: int = 200,
headers: Dict = {},
content_type: Optional[str] = 'application/octet-stream',
chunk_size: int = 2 * 1024 * 1024,
):
"""
Initialize a streaming response object.
Args:
stream (StreamingType):
The content to stream. This can either be:
- A callable that returns bytes or string, which will be repeatedly called to fetch data (in which case, the chunking is handled by the callable itself).
- A file-like object (e.g., an instance of `io.IOBase` such as a file) that supports reading. When using an IO object, the response will read from the object in chunks.
- An iterable object which has chunks of data as bytes or a string.
status_code (int):
The HTTP status code (default is 200).
headers (Dict):
Additional response headers (default is an empty dictionary).
content_type (Optional[str]):
The MIME type of the content (e.g., 'application/octet-stream').
chunk_size (int):
The size of chunks in bytes used for reading from the file-like object. This argument is only relevant when the `stream` is an IO object.
A larger chunk size generally improves performance by reducing the number of I/O operations but consumes more memory during the process.
The default value is 2MB (2 048 000 bytes).
Common sizes are between 1 MB (1048576 bytes) and 4 MB (4194304 bytes), but it should be adjusted based on the specific use case and server capabilities.
If the content is callable, this argument is ignored, and chunking must be handled by the callable itself.
"""
super().__init__(
content=b"",
status_code=status_code,
headers=headers,
content_type=content_type,
)
default_chunk_size = 2 * 1024 * 1024
self.chunk_size = chunk_size
self.stream = stream
# Store the content (which can be a callable or file-like object)
if isinstance(stream, io.IOBase):
self.content_provider = lambda: (
self._read_from_file(stream, chunk_size)
if not SETTINGS['ASYNC_HANDLING']
else self._async_read_from_file(stream, chunk_size)
)
elif callable(stream):
self.content_provider = stream
if chunk_size and chunk_size != default_chunk_size and not isinstance(stream, io.IOBase):
raise ValueError(f"Chunk size has been provided yet, the supplied `stream` is not an IO/file-like object. Got {type(stream)} instance.")
elif isinstance(stream, Iterable):
self.content_provider = lambda: stream
if chunk_size and chunk_size != default_chunk_size and not isinstance(stream, io.IOBase):
raise ValueError(f"Chunk size has been provided yet, the supplied `stream` is not an IO/file-like object. Got {type(stream)} instance.")
else:
raise ValueError("Stream must be either a callable, iterable or a file-like object.")
[docs]
@classmethod
def file_io_stream(cls, filepath: str, chunk_size: int = 2 * 1024 * 1024) -> FileIOStream:
"""
Creates an IOBase-like stream from a file for efficient handling of large files.
This method returns an object that mimics the behavior of an io.IOBase stream.
The stream can be used to read file content in chunks, and it supports
operations like `seek`, `tell`, and `read`. This is useful for streaming large
files with minimal memory usage.
Args:
filepath (str): The path to the file that should be streamed.
chunk_size (int, optional): The size of each chunk in bytes. Defaults to 2MB.
Returns:
FileIOStream: A custom stream object that behaves like io.IOBase.
Example:
```py
stream = StreamingHttpResponse.file_io_stream('large_file.txt')
response = StreamingHttpResponse(stream)
```
Notes:
- The file is opened lazily when the stream is accessed.
- The `seek` and `tell` methods allow for random access to the file.
"""
return FileIOStream(filepath, chunk_size, open_now=False)
[docs]
def _read_from_file(self, file_obj: io.IOBase, chunk_size: int = 2 * 1024 * 1024) -> Generator:
"""
Helper method to read the content from a file-like object in chunks.
Args:
file_obj (io.IOBase): The file-like object to stream content from.
chunk_size (int): The size of each chunk to stream (default is 8192 bytes).
Yields:
- bytes: A chunk of data read from the file-like object.
"""
while chunk := file_obj.read(chunk_size):
yield chunk
[docs]
def iter_content(self) -> Iterable[bytes]:
"""
Returns an iterable (like a generator) over the response content.
Ensures that the content provider yields chunks of bytes and not raw bytes or strings directly.
"""
content = self.content_provider()
if isinstance(content, (str, bytes)):
raise TypeError(
"Expected iterable or generator yielding bytes, got raw string or bytes. "
"Wrap your content in a generator or iterable."
)
if not isinstance(content, Iterable):
raise TypeError(
f"Expected an iterable or generator, got {type(content).__name__}"
)
return content
# ASYNCHRONOUS IMPLEMENTATIONS
[docs]
@classmethod
def async_file_io_stream(cls, filepath: str, chunk_size: int = 2 * 1024 * 1024) -> AsyncFileIOStream:
"""
Creates an asynchronous IOBase-like stream from a file for efficient handling of large files.
This method returns an object that mimics the behavior of an io.IOBase stream.
The stream can be used to read file content in chunks, and it supports
operations like `seek`, `tell`, and `read`. This is useful for streaming large
files with minimal memory usage.
Args:
filepath (str): The path to the file that should be streamed.
chunk_size (int, optional): The size of each chunk in bytes. Defaults to 2MB.
Returns:
FileIOStream: A custom stream object that behaves like io.IOBase.
Example:
```py
stream = StreamingHttpResponse.file_io_stream('large_file.txt')
response = StreamingHttpResponse(stream)
```
Notes:
- The file is opened lazily when the stream is accessed.
- The `seek` and `tell` methods allow for random access to the file.
"""
return AsyncFileIOStream(filepath, chunk_size, open_now=False)
[docs]
async def _async_read_from_file(self, file_obj: io.IOBase, chunk_size: int = 2 * 1024 * 1024) -> Generator:
"""
Asynchronous helper method to read the content from a file-like object in chunks.
Args:
file_obj (io.IOBase): The asynchronous file-like object to stream content from.
chunk_size (int): The size of each chunk to stream (default is 8192 bytes).
Notes:
- This method closes the file-like object after yielding all data.
Yields:
- bytes: A chunk of data read from the file-like object.
"""
read_func = file_obj.read
if not iscoroutinefunction(read_func):
raise AsyncViolationError(
f"Expected an asynchronous file-like object with an 'async def read(...)' method, "
f"but received a synchronous method '{type(file_obj).__name__}.read'. "
f"Consider using libraries like 'aiofiles' or implementing an async-compatible interface."
)
if not iscoroutinefunction(file_obj.close):
raise AsyncViolationError(
f"Expected an asynchronous file-like object with an 'async def close(...)' method, "
f"but received a synchronous method '{type(file_obj).__name__}.close'. "
f"Consider using libraries like 'aiofiles' or implementing an async-compatible interface."
)
while chunk:= await read_func(chunk_size):
yield chunk
[docs]
async def async_iter_content(self) -> Awaitable[Iterable[bytes]]:
"""
Coroutine which returns an iterable (like an asynchronous generator) over the response content.
Ensures that the content provider yields chunks of bytes and not raw bytes or strings directly.
"""
if isinstance(self.stream, FileIOStream) and not isinstance(self.stream, AsyncFileIOStream):
self.stream = to_async_fileio_stream(self.stream)
content = self.content_provider()
if isinstance(content, (str, bytes)):
raise TypeError(
"Expected iterable or generator yielding bytes, got raw string or bytes. "
"Wrap your content in a generator or iterable."
)
if not isinstance(content, Iterable) and not isasyncgen(content):
raise TypeError(
f"Expected an iterable, generator or async_generator, got {type(content).__name__}"
)
return content
[docs]
def __repr__(self):
return f"<{self.__class__.__name__} (" f"'{self.status_code}'" f") {repr(self.stream).replace('<', '[').replace('>', ']')}>"
[docs]
class StreamingRangeHttpResponse(StreamingHttpResponse):
"""
A subclass of StreamingHttpResponse designed to handle HTTP responses that
support partial content (range requests), allowing clients to request specific
byte ranges of the response content. This is useful for handling large files
and enabling features like resuming downloads or streaming media.
Example:
```py
response = StreamingRangeHttpResponse(
stream=my_file,
start_pos=-1000, # Last 1000 bytes
chunk_size=1024,
)
return response
```
"""
def __init__(
self,
stream: io.IOBase,
status_code: int = 206,
headers: Dict = {},
content_type: Optional[str] = 'application/octet-stream',
chunk_size: int = 2 * 1024 * 1024,
start_pos: int = 0,
end_pos: Optional[int] = -1,
):
"""
Initialize StreamingRangeHttpResponse class.
Args:
stream (io.IOBase): The stream from which to read the response data.
It can be a file-like object or an in-memory stream.
status_code (int): The HTTP status code to return. Defaults to 206 (Partial Content).
headers (Dict): Additional HTTP headers to include in the response. Defaults to an empty dict.
content_type (str): The MIME type of the response content. Defaults to 'application/octet-stream'.
chunk_size (int): The number of bytes to send in each chunk. Defaults to 2MB (2 * 1024 * 1024 bytes).
start_pos (int): The starting byte position for the range request. Defaults to 0, can also be a negative.
end_pos (int): The ending byte position for the range request. Defaults to -1, meaning the entire stream is used.
"""
# Validate and assign the stream and content type
if not isinstance(stream, io.IOBase):
raise ValueError(f"The 'stream' argument must be a file-like object (io.IOBase) not {type(stream)}.")
# Check stream compatibility
self.validate_stream(stream)
# Validate positions
assert isinstance(start_pos, int), "Argument start_pos must be an integer"
assert isinstance(end_pos, int), "Argument end_pos must be an integer"
# Initialize the base StreamingHttpResponse
super().__init__(
stream=stream,
status_code=status_code,
headers=headers,
content_type=content_type,
chunk_size=chunk_size,
)
# Parse range.
self.parse_range(start_pos, end_pos)
[docs]
def validate_stream(self, stream):
"""
Validates that the given stream has the necessary methods and
ensures coroutine compatibility in async context.
Raises:
ValueError: If a required method is missing.
AsyncViolationError: If a required method is not asynchronous in async context.
"""
# List of required methods for file-like objects
required_methods = {"read", "seek", "tell", "close"} # we don't care about open, assuming io already open.
required_async_methods = {"read", "close"} # need to be async if in async context.
# Check stream compatibility
for method_name in required_methods:
method = getattr(stream, method_name, None)
if method is None:
raise ValueError(f"Missing required method `{method_name}` in stream object.")
if in_async_context() and SETTINGS["ASYNC_HANDLING"]:
if method_name in required_async_methods:
if not iscoroutinefunction(method):
raise AsyncViolationError(
f"In async context, stream method `{method_name}` must be asynchronous."
)
[docs]
def parse_range(self, start_pos: int, end_pos: int) -> int:
"""
Parses content range and sets the respective content range headers.
Returns:
int: The stream size or length.
"""
if not hasattr(self.stream, 'seek') or not hasattr(self.stream, 'tell'):
raise ValueError("Stream must support seeking to determine start and end position. Must have `seek` and `tell` methods.")
default_offset = self.stream.tell() # get current offset
self.stream.seek(0, io.SEEK_END) # seek to EOF
stream_length = self.stream.tell()
# If end_pos is -1, calculate it based on the stream length
if end_pos == -1:
end_pos = stream_length
# Handle negative start_pos (e.g., -n means starting from the last n bytes)
if start_pos < 0:
# Calculate offset from the end of the stream
# Avoid negative numbers using the max function
start_pos = max(0, stream_length + start_pos)
# Reset stream to beginning
self.stream.seek(default_offset)
self.start_pos = start_pos
self.end_pos = end_pos
# Set content range headers
self.set_content_range_headers()
# Set content provider
self.content_provider = lambda: (
self._get_range_stream() if not in_async_context()
else self._async_get_range_stream()
)
# Finally, return stream length
return stream_length
[docs]
def _get_range_stream(self) -> Generator:
"""
Generator that yields chunks of the stream, starting from start_pos and ending at end_pos.
The stream will be read in chunks defined by `chunk_size`.
"""
# Ensure the stream is seekable before seeking to the start position
if not hasattr(self.stream, 'seek') or not hasattr(self.stream, 'tell'):
raise ValueError("Stream must support seeking to handle partial content.")
self.stream.seek(self.start_pos)
# If start_pos == end_pos, this mean last byte is required. This is represented by `or 1` statement.
remaining = (self.end_pos - self.start_pos) or 1
while remaining > 0:
chunk_size = min(self.chunk_size, remaining)
chunk = self.stream.read(chunk_size)
if not chunk:
break # No more data to read
yield chunk
remaining -= len(chunk)
[docs]
async def _async_get_range_stream(self) -> AsyncGenerator:
"""
Asynchronous generator that yields chunks of the stream, starting from start_pos and ending at end_pos.
The stream will be read in chunks defined by `chunk_size`.
"""
# Ensure the stream is seekable before seeking to the start position
if not hasattr(self.stream, 'seek') or not hasattr(self.stream, 'tell'):
raise ValueError("Stream must support seeking to handle partial content.")
self.stream.seek(self.start_pos)
# If start_pos == end_pos, this mean last byte is required. This is represented by `or 1` statement.
remaining = (self.end_pos - self.start_pos) or 1
while remaining > 0:
chunk_size = min(self.chunk_size, remaining)
chunk = await self.stream.read(chunk_size)
if not chunk:
break # No more data to read
yield chunk
remaining -= len(chunk)
[docs]
def __repr__(self):
return f"<{self.__class__.__name__} (" f"'{self.status_code}'" f") {repr(self.stream).replace('<', '[').replace('>', ']')}>"
[docs]
class FileResponse(StreamingRangeHttpResponse):
"""
Class representing an http file response
"""
def __init__(
self,
filepath: str,
headers: Dict = {},
status_code: int = 206,
content_type: Optional[str] = None,
chunk_size: int = 2 * 1024 * 1024,
start_pos: int = 0,
end_pos: Optional[int] = -1,
):
"""
Initializes a streaming HTTP response for serving a file. Determines whether to stream the file in chunks
or as a single response based on its size and ensures appropriate headers and content type are set.
Args:
filepath (str): The path to the file to be streamed.
headers (Dict, optional): Additional HTTP headers to include in the response. Defaults to an empty dictionary.
status_code (int, optional): The HTTP status code for the response. Defaults to 200 (OK).
content_type (Optional[str], optional): The MIME type of the response content. If not provided, it is inferred
from the file's extension. Defaults to None.
chunk_size (int, optional): The size of chunks (in bytes) for streaming the file. Defaults to 2 MB
(2 * 1024 * 1024 bytes). For files smaller than 5 MB, the entire file
will be streamed at once.
start_pos (int): The starting byte position for the range request. Defaults to 0.
end_pos (int): The ending byte position for the range request. Defaults to -1, meaning the entire stream is used.
Raises:
FileNotFoundResponseError: If the specified file does not exist.
ValueError: If the file path is invalid or inaccessible.
Notes:
- If `content_type` is not provided, it is automatically determined from the file type and updated in the
response headers.
- For files smaller than 5 MB, the file will be streamed as a single response by overriding the `chunk_size`.
Example:
```py
response = CustomFileStreamer(
filepath="/path/to/file.txt",
headers={"Content-Disposition": "attachment; filename=file.txt"},
content_type="text/plain",
chunk_size=8192,
)
```
"""
file_stream: Union[FileIOStream, AsyncFileIOStream]
try:
if SETTINGS['ASYNC_HANDLING']:
file_stream = StreamingHttpResponse.async_file_io_stream(filepath)
else:
file_stream = StreamingHttpResponse.file_io_stream(filepath)
if not file_stream.is_open():
# Open file stream
file_stream.open()
except FileNotFoundError as e:
# Raise our custom error which Duck knows how to handle.
raise FileNotFoundResponseError(str(e))
super().__init__(
stream=file_stream,
status_code=status_code,
headers=headers,
content_type=content_type,
chunk_size=chunk_size,
start_pos=start_pos,
end_pos=end_pos,
)
self.file_size = self.stream._file_size
if not content_type: # content type was not provided
self.content_obj.filepath = filepath # sets the content filepath
self.content_obj.parse_type(None) # recalculate the content_type using the set filepath
self.set_content_type_header() # resets content type header
[docs]
class HttpRedirectResponse(HttpResponse):
"""
Class representing an http redirect response.
"""
def __init__(
self,
location: str,
headers: Dict = {},
content_type: Optional[str] = None,
permanent: bool = False,
):
headers = {"Location": "%s" % location, **headers}
self.location = location
status_code = 302 # temporary redirect
if permanent:
status_code = 301 # permanent redirect
super().__init__(
"",
status_code,
headers=headers,
content_type=content_type)
[docs]
class HttpSwitchProtocolResponse(HttpResponse):
"""
Class representing an HTTP response for switching protocols (101 Switching Protocols).
"""
def __init__(
self,
upgrade_to: str,
headers: Dict = {},
content_type: Optional[str] = None,
):
"""
Initialize an HTTP 101 Switching Protocols response.
Args:
upgrade_to (str): The protocol to upgrade to (e.g., "h2c" for HTTP/2 cleartext).
headers (Dict): Additional headers to include in the response.
content_type (str): Content-Type header (not usually needed for 101 response).
"""
headers = {
"Connection": "Upgrade",
"Upgrade": upgrade_to,
**headers,
}
super().__init__(
content="",
status_code=101, # 101 Switching Protocols
headers=headers,
content_type=content_type,
)
[docs]
class JsonResponse(HttpResponse):
"""
Class epresenting an json response.
"""
def __init__(
self,
content: Optional[Dict[str, str]] = None,
status_code: int = 200,
headers: Dict = {},
content_type = "application/json",
):
self.json_obj = content or {}
self.json_content = json.dumps(self.json_obj).encode('utf-8')
super().__init__(
self.json_content,
status_code,
headers=headers,
content_type=content_type,
)
[docs]
class HttpErrorRequestResponse(HttpResponse):
"""
Class epresenting an http error response.
"""
def __init__(
self,
content: Optional[Union[str, bytes]] = None,
status_code: int = 400,
headers: Dict = {},
content_type: Optional[str] = None,
):
if not content:
short_msg, content = responses.get(status_code, (f"{status_code}", "Sorry, There is an error in request"))
super().__init__(
content,
status_code,
headers=headers,
content_type=content_type,
)
[docs]
class HttpRangeNotSatisfiableResponse(HttpErrorRequestResponse):
"""
Class representing an http range not satisfiable response.
"""
def __init__(
self,
content: Optional[Union[str, bytes]] = None,
headers: Dict = {},
content_type: Optional[str] = None,
):
status_code = 416
super().__init__(
content,
status_code,
headers=headers,
content_type=content_type,
)
[docs]
class HttpBadRequestResponse(HttpErrorRequestResponse):
"""
Class representing an http bad request response.
"""
def __init__(
self,
content: Optional[Union[str, bytes]] = None,
headers: Dict = {},
content_type: Optional[str] = None,
):
status_code = 400
super().__init__(
content,
status_code,
headers=headers,
content_type=content_type,
)
[docs]
class HttpForbiddenRequestResponse(HttpErrorRequestResponse):
"""
Class representing an http forbidden request response.
"""
def __init__(
self,
content: Optional[Union[str, bytes]] = None,
headers: Dict = {},
content_type: Optional[str] = None,
):
status_code = 403
super().__init__(
content,
status_code,
headers=headers,
content_type=content_type,
)
[docs]
class HttpBadRequestSyntaxResponse(HttpErrorRequestResponse):
"""
Class representing an http bad request syntax response.
"""
def __init__(
self,
content: Optional[Union[str, bytes]] = "Bad Request Syntax",
headers: Dict = {},
content_type: Optional[str] = None,
):
status_code = 400
super().__init__(
content,
status_code,
headers=headers,
content_type=content_type)
[docs]
class HttpUnsupportedVersionResponse(HttpErrorRequestResponse):
"""
Class representing an http unsupported version response.
"""
def __init__(
self,
content: Optional[Union[str, bytes]] = None,
headers: Dict = {},
content_type: Optional[str] = None,
):
status_code = 505
super().__init__(
content,
status_code,
headers=headers,
content_type=content_type)
[docs]
class HttpNotFoundResponse(HttpErrorRequestResponse):
"""
Class representing an http not found response.
"""
def __init__(
self,
content: Optional[Union[str, bytes]] = None,
headers: Dict = {},
content_type: Optional[str] = None,
):
status_code = 404
super().__init__(content,
status_code,
headers=headers,
content_type=content_type)
[docs]
class HttpMethodNotAllowedResponse(HttpErrorRequestResponse):
"""
Class representing an http method not allowed response.
"""
def __init__(
self,
content: Optional[Union[str, bytes]] = None,
headers: Dict = {},
content_type: Optional[str] = None,
):
status_code = 405
super().__init__(content,
status_code,
headers=headers,
content_type=content_type)
[docs]
class HttpServerErrorResponse(HttpErrorRequestResponse):
"""
Class representing an http server error response.
"""
def __init__(
self,
content: Optional[Union[str, bytes]] = None,
headers: Dict = {},
content_type: Optional[str] = None,
):
status_code = 500
super().__init__(content,
status_code,
headers=headers,
content_type=content_type)
[docs]
class HttpBadGatewayResponse(HttpErrorRequestResponse):
"""
Class representing an http bad gateway response.
"""
def __init__(
self,
content: Optional[Union[str, bytes]] = None,
headers: Dict = {},
content_type: Optional[str] = None,
):
status_code = 502
super().__init__(content,
status_code,
headers=headers,
content_type=content_type)
[docs]
class HttpTooManyRequestsResponse(HttpErrorRequestResponse):
"""
Class to representing an http too many requests response.
"""
def __init__(
self,
content: Optional[Union[str, bytes]] = "Too many requests",
headers: Dict = {},
content_type: Optional[str] = None,
):
status_code = 429
super().__init__(content,
status_code,
headers=headers,
content_type=content_type)
[docs]
class HttpRequestTimeoutResponse(HttpErrorRequestResponse):
"""
Class representing an http request timeout response.
"""
def __init__(
self,
content: Optional[Union[str, bytes]] = None,
headers: Dict = {},
content_type: Optional[str] = None,
):
status_code = 408
super().__init__(content,
status_code,
headers=headers,
content_type=content_type)
[docs]
class TemplateResponse(HttpResponse):
"""
TemplateResponse class representing an http template response.
"""
def __init__(
self,
request: HttpRequest,
template: str,
context: Dict = {},
status_code: int =200,
headers: Dict = {},
content_type: str = "text/html",
engine: Optional[Engine] = None,
):
self.template = template
self.request = request
self.context = context or {}
self.engine = engine
# Update the context
self.context.update({"request": request}) # add request to context
# Create template object for rendering.
self._template_obj = Template(
name=template,
context=self.context,
engine=engine,
)
# Render template
rendered = self._template_obj.render_template()
# Initialize response object.
super().__init__(
rendered,
status_code,
headers=headers,
content_type=content_type,
)
[docs]
class ComponentResponse(StreamingHttpResponse):
"""
HTTP response that streams the rendered output of an HTML component.
Args:
component (Component): The component to be rendered in the response.
status_code (int, optional): HTTP status code. Defaults to 200.
headers (Dict, optional): Response headers. Defaults to empty dict.
content_type (str, optional): Content-Type header. If not provided, it may be inferred.
Raises:
ValueError: If the component is None.
TypeError: If the component is not an instance of Component.
"""
def __init__(
self,
component: Component,
status_code: int = 200,
headers: Optional[Dict[str, str]] = None,
content_type: Optional[str] = "text/html",
):
if component is None:
raise ValueError("Component is required for this response.")
if not isinstance(component, Component):
raise TypeError(f"Component should be an instance of Component, not {type(component).__name__}.")
self.component = component
self._rendered_component = None
# Initialize response object.
super().__init__(
stream=[],
status_code=status_code,
headers=headers or {},
content_type=content_type,
)
# Process some data
if hasattr(component, "fullpage_reload_headers"):
if any([h.lower() in self.headers for h in component.fullpage_reload_headers]):
component.fullpage_reload = True
[docs]
def iter_content(self) -> Generator[bytes, None, None]:
if not self._rendered_component:
if not self.component.is_loaded():
# This is a lazy component
if self.component.is_loading():
self.wait_for_load()
else:
self.component.load()
self._rendered_component = self.component.render()
yield self._rendered_component.encode('utf-8')
[docs]
async def async_iter_content(self) -> AsyncGenerator[bytes, None]:
if not self._rendered_component:
if not self.component.is_loaded():
# This is a lazy component
if self.component.is_loading():
await self.component.async_wait_for_load()
else:
await self.component.async_load()
self._rendered_component = await self.component.async_render()
yield self._rendered_component.encode('utf-8')