"""
This module provides the **ASGI** (Web Server Gateway Interface) for the Duck HTTP server.
ASGI is a specification that describes how a web server communicates with web applications.
This module will define the ASGI callable that the server will use to serve requests.
"""
import ssl
import time
import socket
import asyncio
from typing import (
Any,
Dict,
Optional,
Tuple,
Union,
Callable,
)
from duck.exceptions.all import (
SettingsError,
RouteNotFoundError,
MethodNotAllowedError,
RequestError,
RequestUnsupportedVersionError,
RequestSyntaxError,
ExpectingNoResponse,
FileNotFoundResponseError,
)
from duck.http.core.handler import response_handler
from duck.http.core.proxyhandler import (
HttpProxyResponse,
BadGatewayError,
)
from duck.http.core.response_finalizer import async_response_finalizer
from duck.http.response import HttpResponse
from duck.http.request import HttpRequest
from duck.http.request_data import RequestData
from duck.logging import logger
from duck.contrib.responses.errors import (
get_server_error_response,
get_bad_gateway_error_response,
get_404_error_response,
get_method_not_allowed_error_response,
get_bad_request_error_response,
)
from duck.contrib.sync import convert_to_async_if_needed, iscoroutinefunction
from duck.utils.xsocket import xsocket
from duck.utils.xsocket.io import SocketIO
from duck.utils.asyncio import create_task
from duck.settings import SETTINGS
[docs]
class ASGI:
"""
Asynchronous Server Gateway Interface for the Duck HTTP server
**Notes:**
- The ASGI callable is the entry point for the server to handle requests.
- The ASGI callable will be called for each incoming request.
- The ASGI callable will handle the request and send the response to the client.
- The ASGI callable will be called with the following arguments:
- application: The Duck application instance.
- client_socket: The client xsocket object.
- client_address: The client address tuple.
- request_data: The raw request data from the client.
- The ASGI is also responsible for sending request to remote servers like Django for processing.
Implement methods get_request, start_response and __call__ to create your custom ASGI callable.
"""
def __init__(self, settings: Dict[str, Any]):
self.settings = settings
[docs]
@staticmethod
async def get_request(
client_socket: xsocket,
client_address: Tuple[str, int],
request_data: RequestData,
) -> HttpRequest:
"""
Construct a Request object from the data received from the client
Args:
client_socket (xsocket): The client xsocket object.
client_address (tuple): The client address tuple.
request_data (RequestData): The request data object
Returns:
HttpRequest: The request object
"""
from duck.settings.loaded import SettingsLoaded
request_class = SettingsLoaded.REQUEST_CLASS
if not issubclass(request_class, HttpRequest):
raise SettingsError(
f"REQUEST_CLASS set in settings.py should be an instance of Duck HttpRequest not {request_class}"
)
request = request_class(
client_socket=client_socket,
client_address=client_address,
)
# Parse request data and return request.
await convert_to_async_if_needed(request.parse)(request_data)
return request
[docs]
async def finalize_response(
self,
response,
request: Optional[HttpRequest] = None
):
"""
Finalizes response by adding final touches and sending response to client.
"""
try:
await async_response_finalizer.finalize_response(response, request)
except Exception as e:
logger.log_exception(e)
[docs]
async def send_response(
self,
response,
client_socket: xsocket,
request: Optional[HttpRequest] = None,
disable_logging: bool = False,
):
"""
Asynchronously sends response to client.
"""
await response_handler.async_send_response(
response,
client_socket,
request=request,
disable_logging=disable_logging,
)
[docs]
async def apply_middlewares_to_response(self, response, request):
"""
Apply middlewares to the final response starting from the failed middleware or
last middleware in list to the first middlewares. Its just like reversing middleware
list and iterating through each and every one of them.
"""
from duck.settings.loaded import SettingsLoaded
middlewares = SettingsLoaded.MIDDLEWARES
failed_middleware = request.META.get("FAILED_MIDDLEWARE")
if failed_middleware:
# strip other middlewares if the request didn't get to reach them or come through any of them
index = middlewares.index(failed_middleware)
middlewares = middlewares[:index]
for middleware in reversed(middlewares):
await convert_to_async_if_needed(middleware.process_response)(response, request)
[docs]
async def django_apply_middlewares_to_response(self, response: HttpProxyResponse, request):
"""
Applies middlewares to the final http proxy response.
"""
if not SETTINGS["DJANGO_SIDE_URLS_SKIP_MIDDLEWARES"]:
# If request reached django server server and we got a response,
# this means all middlewares were successful.
await self.apply_middlewares_to_response(response, request)
[docs]
async def produce_final_response_failsafe(
self,
request: HttpRequest,
response_producer_callable: Callable,
processor: Optional["AsyncRequestProcessor"] = None,
) -> HttpResponse:
"""
Tries to produce a response a response using the given callable. It handles all errors
and produces final response nomatter what.
This returns the full response for a request, with all middlewares and other configurations applied.
Args:
request (HttpRequest): Target HTTP request.
response_producer_callable (Callable): The callable for producing HTTP response.
processor (Optional[AsyncRequestProcessor]): Asynchronous request processor for the request.
Returns:
HttpResponse: The corresponding HTTP response.
Raises:
ExpectingNoResponse: Raised if we are never going to get a response e.g. when we reach a WebSocketView.
This handles everything on its own and it will never return a response.
"""
try:
assert iscoroutinefunction(response_producer_callable), "Response producer callable must be a coroutine function."
response: Union[HttpResponse, HttpProxyResponse] = await response_producer_callable()
# Apply middlewares in reverse order
if isinstance(response, HttpProxyResponse):
await self.django_apply_middlewares_to_response(response, request)
else:
await self.apply_middlewares_to_response(response, request)
except (RouteNotFoundError, FileNotFoundResponseError):
# The request url cannot match any registered routes.
response = get_404_error_response(request)
except MethodNotAllowedError:
# The requested method not allowed for the current route.
route_info = None
try:
if processor:
# Obtain the request route info
route_info = processor.route_info
except Exception:
pass
# Retrieve the method not allowed error response.
response = get_method_not_allowed_error_response(request, route_info=route_info)
except RequestError as e:
# The request has some errors
# Retrieve the bad request error response.
response = get_bad_request_error_response(e, request)
except Exception as e:
if isinstance(e, BadGatewayError):
# Retrieve the bad gateway error response
response = get_bad_gateway_error_response(e, request)
elif isinstance(e, ExpectingNoResponse):
# This is not an error as such but its a way to tell the server that it should not expect a
# response because there is a lower-level handling of the client e.g. websocket protocol
raise e # reraise exception
else:
# Retrieve ther server error response
response = get_server_error_response(e, request)
logger.log_exception(e)
# Finalize and return response
await self.finalize_response(response, request)
# Finally, return response
return response
[docs]
async def get_response(self, request: HttpRequest) -> HttpResponse:
"""
Returns the full response for a request, with all middlewares and other configurations applied.
Returns:
HttpResponse: The corresponding HTTP response.
Raises:
ExpectingNoResponse: Raised if we are never going to get a response e.g. when we reach a WebSocketView.
This handles everything on its own and it will never return a response.
"""
from duck.http.core.processor import AsyncRequestProcessor
response: HttpResponse
processor = AsyncRequestProcessor(request)
async def produce_response() -> HttpResponse:
"""
Produces response for us.
"""
if SETTINGS["USE_DJANGO"]:
# Obtain the http response for the request
response: Union[HttpResponse, HttpProxyResponse] = await processor.process_django_request()
else:
# Obtain the http response for the request
response = await processor.process_request()
# Return response
return response
# Return final response
final_response = await self.produce_final_response_failsafe(request, produce_response, processor)
return final_response
[docs]
async def start_response(self, request: HttpRequest, response: Optional[HttpResponse] = None):
"""
Start the response to the client. This method should be called for handling and sending the response to client.
Args:
request (HttpRequest): The request object.
response (Optional[HttpResponse]): An optional response you want to send or the response will just be resolved by normal means.
"""
try:
response = response or await self.get_response(request)
except ExpectingNoResponse:
# Do nothing at this point.
return
# Send response to client
await self.send_response(
response,
request.client_socket,
request,
disable_logging=False,
)
# Check if another protocol is in use and the request is target on Django endpoint
if isinstance(response, HttpProxyResponse):
# If HttpProxyResponse, this mean this response is from Django remote server.
if response.status_code == 101:
# Start loop for handling some protocol other than HTTP e.g., websockets
create_task(self.handle_django_connection_upgrade(request, response))
[docs]
async def handle_django_connection_upgrade(
self,
upgrade_request: HttpRequest,
upgrade_response: HttpProxyResponse,
protocol_receive_timeout: Union[int, float] =10,
protocol_receive_buffer: int = 4096,
):
"""
This starts a loop for handling any external protocol like `WebSockets` for Django.
Notes:
This receives data to Django and sends the data back to client and it gets the
data from client and send it to Django forever.
"""
# Upgrade response already sent at this point.
from duck.contrib.websockets import log_message
upgrade = upgrade_request.get_header("upgrade", "").strip()
if upgrade and upgrade_response.status_code == 101:
sock: xsocket = upgrade_request.client_socket
django_server_sock: xsocket = upgrade_response.target_socket
path = upgrade_request.path
is_ws_upgrade = False
if upgrade.lower() == "websocket":
is_ws_upgrade = True
log_message(upgrade_request, f"{path} [Django] [WebSocket] [OPEN]")
else:
log_message(upgrade_request, f"{path} [Django] {upgrade.capitalize()} [OPEN]")
try:
# Now handle the new protocol
# Ensure both sockets are not in blocking mode.
sock.setblocking(False)
django_server_sock.setblocking(False)
# First receive something from client
while True:
await asyncio.sleep(0)
client_data = await SocketIO.async_receive(
sock,
protocol_receive_buffer,
protocol_receive_timeout,
)
if client_data:
# Send data to Django immediately
await SocketIO.async_send(
sock=django_server_sock,
data=client_data,
ignore_error_list=[ssl.SSLError],
)
# Receive data from Django and send it to client
django_data = await SocketIO.async_receive(django_server_sock, protocol_receive_buffer, protocol_receive_timeout)
if django_data:
# Send data to client immediately
await self.response_handler.async_send_data(
django_data,
sock,
ignore_error_list=[ssl.SSLError],
)
except (
ConnectionResetError,
OSError,
BrokenPipeError,
TimeoutError,
asyncio.CancelledError,
):
# Ignore connection related errors.
pass
finally:
if is_ws_upgrade:
log_message(upgrade_request, f"{path} [Django] [WebSocket] [CLOSE]")
else:
log_message(upgrade_request, f"{path} [Django] {upgrade.capitalize()} [CLOSE]")
[docs]
async def __call__(
self,
application,
client_socket: socket.socket,
client_address: Tuple[str, int],
request_data: RequestData,
) -> Optional[HttpRequest]:
"""
ASGI Application callable for handling requests
Notes:
- This method is wrapped by a decorator (`if_error_log_then_raise`), which handle any errors
raised within this method.
Args:
application (App): The Duck application instance.
client_socket (socket.socket): The client socket object.
client_address (tuple): The client address tuple.
request_data (RequestData): The request data object
Returns:
HttpRequest: The handled request object.
"""
from duck.app.app import App
from duck.app.microapp import MicroApp
# Run Assertations/Checks
assert application is not None, "Application not provided."
# Check if application is an instance of App or MicroApp
assert isinstance(
application,
(App, MicroApp)), "Invalid application instance provided."
# Check if request data is bytes representing request or tuple containing headers and content/body respectively
assert isinstance(request_data, RequestData), f'Request data should be an instance of RequestData not "{type(request_data)}"'
try:
request = await self.get_request(
client_socket,
client_address,
request_data,
)
except Exception as e:
# Internal server error
response = get_server_error_response(e, None)
await self.send_response(
response,
client_socket,
request=None,
) # send response to client
raise e # reraise error so that it will be logged
request.application = application
request.asgi = self
application.last_request = request
# Process and start sending response
await self.start_response(request)
return request