Source code for duck.http.core.httpd.http2.protocol

"""
H2 Protocol responsible for handling H2 Connections.
"""
import ssl
import time
import select
import queue
import socket
import asyncio

from functools import partial
from inspect import isasyncgen
from typing import (
    Tuple,
    Union,
    Optional,
    Callable,
)

from h2.config import H2Configuration
from h2.connection import H2Connection, ConnectionState
from h2.exceptions import ProtocolError, StreamClosedError

from duck.settings import SETTINGS
from duck.http.request import HttpRequest
from duck.http.response import (
    BaseResponse,
    HttpResponse,
    StreamingHttpResponse,
)
from duck.http.core.handler import (
    ResponseHandler,
)
from duck.http.core.httpd.http2.event_handler import EventHandler
from duck.logging import logger
from duck.contrib.sync import iscoroutinefunction
from duck.utils.xsocket import xsocket, ssl_xsocket
from duck.utils.xsocket.io import SocketIO
from duck.utils.asyncio.eventloop import (
    SyncFuture,
    get_or_create_loop_manager,
)


[docs] class H2Protocol: """ Asynchronous H2 Protocol class. """ __slots__ = ( "sock", "addr", "conn", "server", "event_handler", "event_loop", "sync_queue", "__closing", ) def __init__( self, sock: xsocket, addr: Tuple[str, int], conn: H2Connection, event_handler: EventHandler, event_loop: asyncio.BaseEventLoop = None, sync_queue: Optional[queue.Queue] = None, ): """ H2Protocol initialization. Args: sock (xsocket): Client socket object. addr (Tuple[str, int]): Client address. conn (H2Connection): H2 Connection object. event_handler (EventHandler): Asynchrous `H2` event handler class. event_loop (asyncio.BaseEventLoop): The target event loop, only used in `WSGI` mode.. sync_queue (Optional[queue.Queue]): A queue for adding tasks that needs to be executed outside async context, useful when threads are used and `ASYNC_HANDLING=False` """ self.sock = sock self.addr = addr self.conn = conn self.event_handler = event_handler self.event_loop = event_loop self.sync_queue = sync_queue self.__closing = False @property def closing(self) -> bool: """ Returns a boolean on whether if protocol is in closing state. """ return self.__closing @closing.setter def closing(self, state: bool): self.__closing = state
[docs] async def run_forever(self): """ Runs the loop for handling further client requests. """ # Set socket to non-blocking mode if not set. self.sock.setblocking(False) # Send any pending H2 data if any await self.async_send_pending_data() async def async_read_and_handle_data(): """ Receive and handle data asynchrously. """ data = await SocketIO.async_receive(self.sock, timeout=.5) if data: await self.event_handler.entry(data) # Read and handle H2 frames. while not self.closing: try: # Read & handle H2 frames await async_read_and_handle_data() except ( ssl.SSLWantReadError, ssl.SSLWantWriteError, TimeoutError, BlockingIOError, ): # Ignore SSL errors and other errors that are raised because of SSL protocol pass except ( ConnectionError, ConnectionResetError, BrokenPipeError, ): # Connection errors self.closing = True break except ProtocolError as e: logger.log(f"Protocol Error: {e}", level=logger.WARNING) if SETTINGS['DEBUG']: logger.log_exception(e) # Break H2 loop break except Exception as e: logger.log(f"HTTP/2 Error: {e}", level=logger.WARNING) if SETTINGS['DEBUG']: logger.log_exception(e) # Break H2 loop break # Yield control to the eventloop await asyncio.sleep(0.1) # Connection closed SocketIO.close(self.sock) # ensure socket is closed.
[docs] def connection_lost(self, *_): """ Called on socket connection lost. """ self.closing = True
[docs] def close_connection(self, error_code: int = 0, debug_message: bytes = None): """ Closes the socket connection. """ self.closing = True SocketIO.close(self.sock)
[docs] def send_pending_data(self): """ Synchronously sends any pending data from `H2Connection.data_to_send()`. """ pending = self.conn.data_to_send() if pending: SocketIO.send(sock=self.sock, data=pending)
[docs] def send_response( self, response: Union[BaseResponse, HttpResponse], stream_id: int, request: Optional[HttpRequest] = None, disable_logging: bool = False, suppress_errors: bool = False, return_future: bool = False, ) -> Optional[SyncFuture]: """ This sends an HTTP/2 response to the H2Connection. Notes: - This sends data asynchronously but the difference between this method and `async_send_response` is that it submits the coroutine for sending response to `AsyncioLoopManager` and returns a `SyncFuture` you can wait for. Args: response (Union[BaseResponse, HttpResponse]): The HTTP response object containing the response data. stream_id (int): The target H2 stream ID. request (Optional[HttpRequest]): The request object associated with the response. Used for logging and debugging purposes. disable_logging (bool): If True, disables logging of the response. Defaults to False. suppress_errors (bool): If True, suppresses any errors that occur during the sending process (only sending data). Defaults to False. return_future (bool): Whether to return sync future. Returns: Optional[SyncFuture]: A synchronous future you can wait for upon response send completion. Raises: DisallowedAction: If ASYNC_HANDLING is True in settings. Exception: If there is an error during the data sending process (e.g., socket errors), unless suppressed. """ if SETTINGS['ASYNC_HANDLING']: raise DisallowedAction( "This method must be used in a multithreaded environment. " "Set ASYNC_HANDLING to False or just await `async_send_response` instead." ) coro = self.async_send_response( response, stream_id, request, disable_logging, suppress_errors, ) # Return synchronous future. loop_manager = get_or_create_loop_manager(id="request-handling-eventloop-manager", strictly_get=True) if return_future: future = loop_manager.submit_task(coro, return_sync_future=True, task_type="request-handling-task") return future # Fire and forget task loop_manager.submit_task(coro)
# ASYNCHRONOUS IMPLEMENTATIONS
[docs] async def async_send_pending_data(self): """ Asynchronously sends any pending data from `H2Connection.data_to_send()` asynchronously. """ pending = self.conn.data_to_send() if pending: await SocketIO.async_send(sock=self.sock, data=pending)
[docs] async def async_send_data( self, data: bytes, stream_id: int, end_stream: bool = False, on_data_sent: Callable = None, flow_control_timeout: float = 30.0, ): """ Asynchronously send data for a given HTTP/2 stream, respecting flow control. This method automatically chunks data according to the current flow control window and the maximum outbound frame size. It also ensures that sending pauses until the remote peer grants enough window updates. Args: data (bytes): The payload to send. stream_id (int): The ID of the target HTTP/2 stream. end_stream (bool, optional): Whether to close the stream after sending all data. Defaults to False. on_data_sent (Optional[Callable], optional): Optional callback or coroutine to execute once the data has been fully sent. The callback receives `original_data` as its only argument. flow_control_timeout (float, optional): Max time (in seconds) to wait for flow control window updates before giving up. Defaults to 30.0s. Raises: asyncio.CancelledError: If task is cancelled during sending. """ if not isinstance(data, (bytes, bytearray, memoryview)): raise TypeError("Data must be bytes-like") original_data = data view = memoryview(data) time_left = flow_control_timeout interval_timeout = .1 # Continue sending until all data is transmitted while view: # Wait for enough flow control window space but keep on receiving data while self.conn.local_flow_control_window(stream_id) < 1: if time_left <= 0: # Timeout reached if SETTINGS['DEBUG']: logger.log( f"Flow control timeout on stream {stream_id} " f"after {flow_control_timeout:.1f}s", level=logger.WARNING, ) raise asyncio.TimeoutError(f"Flow control timeout on stream {stream_id}") try: await asyncio.wait_for( self.event_handler.wait_for_flow_control(stream_id), timeout=interval_timeout, ) except asyncio.TimeoutError: time_left -= interval_timeout continue except asyncio.CancelledError: return try: # Determine maximum chunk size to respect window and frame limits chunk_size = min( self.conn.local_flow_control_window(stream_id), len(view), self.conn.max_outbound_frame_size, ) chunk = view[:chunk_size] view = view[chunk_size:] # Send chunk (do not close stream until final chunk) await self.async_flush_response_data( data=chunk.tobytes(), stream_id=stream_id, end_stream=(not view and end_stream), ) except StreamClosedError: self.event_handler.on_stream_reset(stream_id) except (ProtocolError, KeyError) as e: if SETTINGS['DEBUG']: logger.log(f"Error sending H2 data: {e}", level=logger.WARNING) # Handle case where no data was sent but stream must be closed if not original_data and end_stream: try: self.conn.end_stream(stream_id) await self.async_send_pending_data() except StreamClosedError: self.event_handler.on_stream_reset(stream_id) except (ProtocolError, KeyError) as e: if SETTINGS['DEBUG']: logger.log(f"Error sending H2 data: {e}", level=logger.WARNING) # Trigger post-send callback if provided if on_data_sent: try: if iscoroutinefunction(on_data_sent): await on_data_sent(original_data) else: on_data_sent(original_data) except Exception as e: logger.exception(e)
[docs] async def async_send_response( self, response: Union[BaseResponse, HttpResponse], stream_id: int, request: Optional[HttpRequest] = None, disable_logging: bool = False, suppress_errors: bool = False, ) -> None: """ Asynchronously sends an HTTP/2 response to the H2Connection. Args: response (Union[BaseResponse, HttpResponse]): The HTTP response object containing the response data. stream_id (int): The target H2 stream ID. request (Optional[HttpRequest]): The request object associated with the response. Used for logging and debugging purposes. disable_logging (bool): If True, disables logging of the response. Defaults to False. suppress_errors (bool): If True, suppresses any errors that occur during the sending process (only sending data). Defaults to False. Raises: Exception: If there is an error during the data sending process (e.g., socket errors), unless suppressed. This method calls `async_send_data` to transmit the raw response data to the client and performs logging if `disable_logging` is False. If the request object contains debug information or failed middleware details, they are included in the logs. """ def log_response(response, request: Optional[HttpResponse], *_): """ Logs the response to console after successful sending. """ if not disable_logging: # Log response (if applicable) ResponseHandler.auto_log_response(response, request) def on_chunk_sent(chunk): """ This increments the response fake content length size (useful for logging) everytime the chunk is sent. ``` {note} - This is very useful for tracking how much of response data have been delivered to the client. ``` """ content_size = response.content_obj.size or 0 # Set response content fake size for logging purposes response.content_obj.set_fake_size(content_size + len(chunk)) async def on_final_data_sent(_): # Lets close streaming response if not done. try: await ResponseHandler.async_close_streaming_response(response) finally: log_response(response, request) if self.conn.state_machine.state == ConnectionState.CLOSED: # Call close_streaming_response if not called. await ResponseHandler.async_close_streaming_response(response) return try: # Run some tests if we need to increase flow control real_content_length = response.get_header("content-length") if real_content_length: real_content_length = len(real_content_length) # Increment flow control window max_incr = 2147483647 incr = min(real_content_length, max_incr) if real_content_length > 635535: self.conn.increment_flow_control_window(incr, stream_id) # Send any pending h2 data await self.async_send_pending_data() # Parse and send H2 headers headers = [(k.lower(), v) for k, v in response.headers.items()] for _, morsel in response.cookies.items(): headers.append(("set-cookie", morsel.output(header="").strip())) # Insert status code in headers headers.insert(0, (":status", str(response.status_code))) # Send headers self.conn.send_headers(stream_id, headers) if not isinstance(response, StreamingHttpResponse): # Send response directly to client socket await self.async_send_data( response.content, stream_id, end_stream=False, on_data_sent=on_chunk_sent, ) # End or close stream await self.async_send_data( b"", stream_id, end_stream=True, on_data_sent=on_final_data_sent, ) # Send any pending data await self.async_send_pending_data() else: # Send response in chunks content = response.async_iter_content() if not isasyncgen(content): # The content is not an async generator so lets await the result. content = await content if not isasyncgen(content): for chunk in content: if isinstance(chunk, str): chunk = chunk.encode("utf-8") # Send chunk to client socket. await self.async_send_data( chunk, stream_id, end_stream=False, on_data_sent=on_chunk_sent, ) else: async for chunk in content: if isinstance(chunk, str): chunk = chunk.encode("utf-8") # Send chunk to client socket. await self.async_send_data( chunk, stream_id, end_stream=False, on_data_sent=on_chunk_sent, ) # End or close stream await self.async_send_data( b"", stream_id, end_stream=True, on_data_sent=on_final_data_sent, ) # Send any pending data await self.async_send_pending_data() except StreamClosedError: self.event_handler.on_stream_reset(stream_id) except Exception as e: # Something interrupted the process, manually call close_streaming_response if not called. await ResponseHandler.async_close_streaming_response(response) if not suppress_errors: raise e
[docs] async def async_send_goaway(self, error_code, debug_message: bytes = None): """ Sends a `GOAWAY` frame with the given error code and debug_message. """ if self.conn.state_machine.state == ConnectionState.CLOSED: self.close_connection() return try: self.conn.close_connection( error_code, additional_data=debug_message or b"" ) await self.async_send_pending_data() except (BrokenPipeError, ConnectionResetError): pass except Exception as e: if "EOF occurred in violation of protocol" not in str(e) and "bad length" not in str(e): logger.log_raw(f"Error sending GOAWAY: {e}", level=logger.WARNING) finally: self.close_connection()
[docs] async def async_flush_response_data( self, data: bytes, stream_id: int, end_stream: bool = False, ): """ Asynchronously sends response data directly to client socket. Notes: - This first response data to queue by sending it to `H2Connection` then fetch the response data from `H2Connection.data_to_send()` then sends the data. """ self.conn.send_data( stream_id, data=data, end_stream=end_stream, ) # Send pending data added to H2Connection. await self.async_send_pending_data() # Remove task from task list (if available in list). self.event_handler.async_tasks.pop(stream_id, None)