Source code for duck.http.core.asgi

"""
This module provides the **ASGI** (Web Server Gateway Interface) for the Duck HTTP server.

ASGI is a specification that describes how a web server communicates with web applications.
This module will define the ASGI callable that the server will use to serve requests.
"""
import ssl
import time
import socket
import asyncio

from typing import (
    Any,
    Dict,
    Optional,
    Tuple,
    Union,
    Callable,
)

from duck.exceptions.all import (
    SettingsError,
    RouteNotFoundError,
    MethodNotAllowedError,
    RequestError,
    RequestUnsupportedVersionError,
    RequestSyntaxError,
    ExpectingNoResponse,
    FileNotFoundResponseError,
)
from duck.http.core.handler import response_handler
from duck.http.core.proxyhandler import (
    HttpProxyResponse,
    BadGatewayError,
)
from duck.http.core.response_finalizer import async_response_finalizer
from duck.http.response import HttpResponse
from duck.http.request import HttpRequest
from duck.http.request_data import RequestData
from duck.logging import logger
from duck.contrib.responses.errors import (
    get_server_error_response,
    get_bad_gateway_error_response,
    get_404_error_response,
    get_method_not_allowed_error_response,
    get_bad_request_error_response,
)
from duck.contrib.sync import convert_to_async_if_needed, iscoroutinefunction
from duck.utils.xsocket import xsocket
from duck.utils.xsocket.io import SocketIO
from duck.utils.asyncio import create_task
from duck.settings import SETTINGS


[docs] class ASGI: """ Asynchronous Server Gateway Interface for the Duck HTTP server **Notes:** - The ASGI callable is the entry point for the server to handle requests. - The ASGI callable will be called for each incoming request. - The ASGI callable will handle the request and send the response to the client. - The ASGI callable will be called with the following arguments: - application: The Duck application instance. - client_socket: The client xsocket object. - client_address: The client address tuple. - request_data: The raw request data from the client. - The ASGI is also responsible for sending request to remote servers like Django for processing. Implement methods get_request, start_response and __call__ to create your custom ASGI callable. """ def __init__(self, settings: Dict[str, Any]): self.settings = settings
[docs] @staticmethod async def get_request( client_socket: xsocket, client_address: Tuple[str, int], request_data: RequestData, ) -> HttpRequest: """ Construct a Request object from the data received from the client Args: client_socket (xsocket): The client xsocket object. client_address (tuple): The client address tuple. request_data (RequestData): The request data object Returns: HttpRequest: The request object """ from duck.settings.loaded import SettingsLoaded request_class = SettingsLoaded.REQUEST_CLASS if not issubclass(request_class, HttpRequest): raise SettingsError( f"REQUEST_CLASS set in settings.py should be an instance of Duck HttpRequest not {request_class}" ) request = request_class( client_socket=client_socket, client_address=client_address, ) # Parse request data and return request. await convert_to_async_if_needed(request.parse)(request_data) return request
[docs] async def finalize_response( self, response, request: Optional[HttpRequest] = None ): """ Finalizes response by adding final touches and sending response to client. """ try: await async_response_finalizer.finalize_response(response, request) except Exception as e: logger.log_exception(e)
[docs] async def send_response( self, response, client_socket: xsocket, request: Optional[HttpRequest] = None, disable_logging: bool = False, ): """ Asynchronously sends response to client. """ await response_handler.async_send_response( response, client_socket, request=request, disable_logging=disable_logging, )
[docs] async def apply_middlewares_to_response(self, response, request): """ Apply middlewares to the final response starting from the failed middleware or last middleware in list to the first middlewares. Its just like reversing middleware list and iterating through each and every one of them. """ from duck.settings.loaded import SettingsLoaded middlewares = SettingsLoaded.MIDDLEWARES failed_middleware = request.META.get("FAILED_MIDDLEWARE") if failed_middleware: # strip other middlewares if the request didn't get to reach them or come through any of them index = middlewares.index(failed_middleware) middlewares = middlewares[:index] for middleware in reversed(middlewares): await convert_to_async_if_needed(middleware.process_response)(response, request)
[docs] async def django_apply_middlewares_to_response(self, response: HttpProxyResponse, request): """ Applies middlewares to the final http proxy response. """ if not SETTINGS["DJANGO_SIDE_URLS_SKIP_MIDDLEWARES"]: # If request reached django server server and we got a response, # this means all middlewares were successful. await self.apply_middlewares_to_response(response, request)
[docs] async def produce_final_response_failsafe( self, request: HttpRequest, response_producer_callable: Callable, processor: Optional["AsyncRequestProcessor"] = None, ) -> HttpResponse: """ Tries to produce a response a response using the given callable. It handles all errors and produces final response nomatter what. This returns the full response for a request, with all middlewares and other configurations applied. Args: request (HttpRequest): Target HTTP request. response_producer_callable (Callable): The callable for producing HTTP response. processor (Optional[AsyncRequestProcessor]): Asynchronous request processor for the request. Returns: HttpResponse: The corresponding HTTP response. Raises: ExpectingNoResponse: Raised if we are never going to get a response e.g. when we reach a WebSocketView. This handles everything on its own and it will never return a response. """ try: assert iscoroutinefunction(response_producer_callable), "Response producer callable must be a coroutine function." response: Union[HttpResponse, HttpProxyResponse] = await response_producer_callable() # Apply middlewares in reverse order if isinstance(response, HttpProxyResponse): await self.django_apply_middlewares_to_response(response, request) else: await self.apply_middlewares_to_response(response, request) except (RouteNotFoundError, FileNotFoundResponseError): # The request url cannot match any registered routes. response = get_404_error_response(request) except MethodNotAllowedError: # The requested method not allowed for the current route. route_info = None try: if processor: # Obtain the request route info route_info = processor.route_info except Exception: pass # Retrieve the method not allowed error response. response = get_method_not_allowed_error_response(request, route_info=route_info) except RequestError as e: # The request has some errors # Retrieve the bad request error response. response = get_bad_request_error_response(e, request) except Exception as e: if isinstance(e, BadGatewayError): # Retrieve the bad gateway error response response = get_bad_gateway_error_response(e, request) elif isinstance(e, ExpectingNoResponse): # This is not an error as such but its a way to tell the server that it should not expect a # response because there is a lower-level handling of the client e.g. websocket protocol raise e # reraise exception else: # Retrieve ther server error response response = get_server_error_response(e, request) logger.log_exception(e) # Finalize and return response await self.finalize_response(response, request) # Finally, return response return response
[docs] async def get_response(self, request: HttpRequest) -> HttpResponse: """ Returns the full response for a request, with all middlewares and other configurations applied. Returns: HttpResponse: The corresponding HTTP response. Raises: ExpectingNoResponse: Raised if we are never going to get a response e.g. when we reach a WebSocketView. This handles everything on its own and it will never return a response. """ from duck.http.core.processor import AsyncRequestProcessor response: HttpResponse processor = AsyncRequestProcessor(request) async def produce_response() -> HttpResponse: """ Produces response for us. """ if SETTINGS["USE_DJANGO"]: # Obtain the http response for the request response: Union[HttpResponse, HttpProxyResponse] = await processor.process_django_request() else: # Obtain the http response for the request response = await processor.process_request() # Return response return response # Return final response final_response = await self.produce_final_response_failsafe(request, produce_response, processor) return final_response
[docs] async def start_response(self, request: HttpRequest, response: Optional[HttpResponse] = None): """ Start the response to the client. This method should be called for handling and sending the response to client. Args: request (HttpRequest): The request object. response (Optional[HttpResponse]): An optional response you want to send or the response will just be resolved by normal means. """ try: response = response or await self.get_response(request) except ExpectingNoResponse: # Do nothing at this point. return # Send response to client await self.send_response( response, request.client_socket, request, disable_logging=False, ) # Check if another protocol is in use and the request is target on Django endpoint if isinstance(response, HttpProxyResponse): # If HttpProxyResponse, this mean this response is from Django remote server. if response.status_code == 101: # Start loop for handling some protocol other than HTTP e.g., websockets create_task(self.handle_django_connection_upgrade(request, response))
[docs] async def handle_django_connection_upgrade( self, upgrade_request: HttpRequest, upgrade_response: HttpProxyResponse, protocol_receive_timeout: Union[int, float] =10, protocol_receive_buffer: int = 4096, ): """ This starts a loop for handling any external protocol like `WebSockets` for Django. Notes: This receives data to Django and sends the data back to client and it gets the data from client and send it to Django forever. """ # Upgrade response already sent at this point. from duck.contrib.websockets import log_message upgrade = upgrade_request.get_header("upgrade", "").strip() if upgrade and upgrade_response.status_code == 101: sock: xsocket = upgrade_request.client_socket django_server_sock: xsocket = upgrade_response.target_socket path = upgrade_request.path is_ws_upgrade = False if upgrade.lower() == "websocket": is_ws_upgrade = True log_message(upgrade_request, f"{path} [Django] [WebSocket] [OPEN]") else: log_message(upgrade_request, f"{path} [Django] {upgrade.capitalize()} [OPEN]") try: # Now handle the new protocol # Ensure both sockets are not in blocking mode. sock.setblocking(False) django_server_sock.setblocking(False) # First receive something from client while True: await asyncio.sleep(0) client_data = await SocketIO.async_receive( sock, protocol_receive_buffer, protocol_receive_timeout, ) if client_data: # Send data to Django immediately await SocketIO.async_send( sock=django_server_sock, data=client_data, ignore_error_list=[ssl.SSLError], ) # Receive data from Django and send it to client django_data = await SocketIO.async_receive(django_server_sock, protocol_receive_buffer, protocol_receive_timeout) if django_data: # Send data to client immediately await self.response_handler.async_send_data( django_data, sock, ignore_error_list=[ssl.SSLError], ) except ( ConnectionResetError, OSError, BrokenPipeError, TimeoutError, asyncio.CancelledError, ): # Ignore connection related errors. pass finally: if is_ws_upgrade: log_message(upgrade_request, f"{path} [Django] [WebSocket] [CLOSE]") else: log_message(upgrade_request, f"{path} [Django] {upgrade.capitalize()} [CLOSE]")
[docs] async def __call__( self, application, client_socket: socket.socket, client_address: Tuple[str, int], request_data: RequestData, ) -> Optional[HttpRequest]: """ ASGI Application callable for handling requests Notes: - This method is wrapped by a decorator (`if_error_log_then_raise`), which handle any errors raised within this method. Args: application (App): The Duck application instance. client_socket (socket.socket): The client socket object. client_address (tuple): The client address tuple. request_data (RequestData): The request data object Returns: HttpRequest: The handled request object. """ from duck.app.app import App from duck.app.microapp import MicroApp # Run Assertations/Checks assert application is not None, "Application not provided." # Check if application is an instance of App or MicroApp assert isinstance( application, (App, MicroApp)), "Invalid application instance provided." # Check if request data is bytes representing request or tuple containing headers and content/body respectively assert isinstance(request_data, RequestData), f'Request data should be an instance of RequestData not "{type(request_data)}"' try: request = await self.get_request( client_socket, client_address, request_data, ) except Exception as e: # Internal server error response = get_server_error_response(e, None) await self.send_response( response, client_socket, request=None, ) # send response to client raise e # reraise error so that it will be logged request.application = application request.asgi = self application.last_request = request # Process and start sending response await self.start_response(request) return request