Source code for duck.http.core.httpd.http2

"""
Base server implementation for HTTP/2
"""
import re
import os
import h2
import ssl
import time
import socket
import queue
import asyncio
import base64

from typing import (
    Tuple,
    Dict,
    Optional,
    List,
    Callable,
)

from h2.config import H2Configuration
from h2.connection import H2Connection

from duck.settings import SETTINGS
from duck.http.response import (
    HttpResponse,
    HttpSwitchProtocolResponse,
)
from duck.http.request_data import RequestData, RawRequestData
from duck.http.core.httpd.httpd import (
    BaseServer,
    response_handler,
)
from duck.logging import logger
from duck.utils.xsocket import xsocket
from duck.utils.xsocket.io import SocketIO
from duck.utils.asyncio.eventloop import get_or_create_loop_manager


# Regex to match "Upgrade: h2" or "Upgrade: h2c" in headers only
H2_UPGRADE_REGEX = re.compile(
    rb'^(?:[^\r\n]*\r\n)*Upgrade:\s*h2c?\s*(?:\r\n[^\r\n]*)*\r\n\r\n',
    re.IGNORECASE | re.MULTILINE
)             
# Regex to extract HTTP2-Settings header value
H2_SETTINGS_REGEX = re.compile(
    rb"HTTP2-Settings:\s*([A-Za-z0-9+\/=]+)",
    re.IGNORECASE | re.MULTILINE,
)


[docs] class SyncH2ProtocolStartWarning(UserWarning): """ Flagged when `H2` protocol failed to start within specific timeframe in a synchronous environment. """
[docs] class BaseHTTP2Server(BaseServer): """ Base HTTP/2 server with HTTP/1.1 backward compatibility. Notes: - Supports both HTTP/2 and HTTP/1.1 protocols. - The `H2Protocol` is implemented using asynchronous I/O, even in WSGI environments. - In WSGI mode, request handling may be offloaded to a synchronous thread for execution outside the async context. """
[docs] def set_h2_settings(self, h2_conn): """ Sets the H2 settings on the unitiated h2 connection. """ h2_conn.update_settings({ h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 2**24, # e.g., 16MB window h2.settings.SettingCodes.MAX_CONCURRENT_STREAMS: 100 })
[docs] def handle_conn( self, sock: xsocket, addr: Tuple[str, int], flowinfo: Optional = None, scopeid: Optional = None, strictly_http2: bool = False, ) -> None: """ Main entry point to handle new connection (supports both ipv6 and ipv4). Args: sock (xsocket): The client socket object. addr (Tuple[str, int]): Client ip and port. flowinfo (Optional): Flow info if IPv6. scopeid (Optional): Scope id if IPv6. strictly_http2 (bool): Whether to srtrictly use `HTTP/2` without checking if user selected it. """ has_h2_alpn = False try: has_h2_alpn = sock.selected_alpn_protocol() == 'h2' except AttributeError: pass if not has_h2_alpn and not strictly_http2: # Fallback to HTTP/1 # The user selected alpn protocol is not h2, switch to default HTTP/1 only if Upgrade to h2c is not set try: # Receive the full request (in bytes) data = SocketIO.receive_full_request(sock=sock) except TimeoutError: # For the first request, client took too long to respond. self.do_request_timeout(sock, addr) return if not data: # Client sent an empty request, terminate the connection immediately SocketIO.close(sock) return if H2_UPGRADE_REGEX.match(data): try: # Lets upgrade to HTTP/2 settings_header = None settings_match = H2_SETTINGS_REGEX.search(data) if settings_match: settings_header = settings_match.group(1) settings_header = base64.urlsafe_base64decode(settings_header + "==") config = H2Configuration(client_side=False) h2_connection = H2Connection(config=config) # Set base server settings self.set_h2_settings(h2_connection) if settings_header: h2_connection.initiate_upgrade_connection(settings_header=settings_header) else: h2_connection.initiate_connection() # Send switching protocols response switching_proto_response = HttpSwitchProtocolResponse(upgrade_to="h2c") response_handler.send_response( switching_proto_response, sock=sock, request=None, strictly_http1=True, ) # Send pending HTTP/2 data. try: SocketIO.send( sock=sock, data=h2_connection.data_to_send(), suppress_errors=False, ) except (BrokenPipeError, ConnectionResetError): # Client disconnected return except Exception: return else: # No HTTP/2 Upgrade, strictly HTTP/1.1 request_data = RawRequestData(data) request_data.request_store["h2_handling"] = False self.process_data(sock, addr, request_data) # Hang here... return # Initiate and send HTTP/2 preamble config = H2Configuration(client_side=False) h2_connection = H2Connection(config=config) # Set HTTP/2 settings and initiate connection self.set_h2_settings(h2_connection) h2_connection.initiate_connection() # Send pending HTTP/2 preamble data. try: SocketIO.send( sock=sock, data=h2_connection.data_to_send(), suppress_errors=False, ) except (BrokenPipeError, ConnectionResetError): # Client disconnected return # Start handling H2 frames self.start_http2_loop(sock, addr, h2_connection)
@logger.handle_exception def handle_request_data( self, sock: xsocket, addr: Tuple[str, int], request_data: RequestData, ) -> None: """ Processes and handles a request but logs any encountered error (but doesn't raise exception). """ super().handle_request_data(sock, addr, request_data)
[docs] def start_http2_loop( self, sock: xsocket, addr: Tuple[str, int], h2_connection: H2Connection, ) -> None: """ Starts the loop for handling HTTP/2 connection. """ from duck.http.core.httpd.http2.protocol import H2Protocol from duck.http.core.httpd.http2.event_handler import EventHandler # Assume client supports HTTP/2 loop_manager = get_or_create_loop_manager(id="request-handling-eventloop-manager", strictly_get=True) # loop manager must be already running event_loop = loop_manager.get_event_loop() protocol = H2Protocol( sock=sock, addr=addr, conn=h2_connection, event_handler=None, event_loop=event_loop, # Required in sync mode. sync_queue=queue.Queue(), ) # Send H2 pending data protocol.send_pending_data() # Set some values. protocol.event_handler = EventHandler(protocol=protocol, server=self) sock.h2_protocol = protocol coro = protocol.run_forever() # Submit coroutine for sending/receiving data asynchronously # but we create a loop for executing synchronous tasks out of async context, in the current thread. loop_manager.submit_task(coro, task_type="request-handling-task") # Fire and forget task # Wait for protocol to start but for limited time wait_time_interval = .1 total_wait_time = 0 max_wait_time = 0 while protocol.closing: if total_wait_time >= max_wait_time: # The protocol did not start within expected time if SETTINGS['DEBUG']: logger.warn(f"H2 protocol failed to start within {max_wait_time} seconds for addr {addr}", SyncH2ProtocolStartWarning) break time.sleep(wait_time_interval) total_wait_time += wait_time_interval # As h2 is asynchrous, some tasks may require to be executed directly in current thread, thats the # use of the following loop. while not protocol.closing: # Listen for synchronous tasks to handle only when h2 connection is valid try: func, future = protocol.sync_queue.get(timeout=0.000000001) # Execute function synchronously in current thread result = func() future.set_result(result) protocol.sync_queue.task_done() except queue.Empty: continue except Exception as e: future.set_exception(e) logger.log_exception(e) protocol.sync_queue.task_done()
# ASYNCHRONOUS IMPLEMENTATIONS
[docs] async def async_handle_conn( self, sock: xsocket, addr: Tuple[str, int], flowinfo: Optional = None, scopeid: Optional = None, strictly_http2: bool = False, ) -> None: """ Main entry point to handle new connection asynchronously (supports both ipv6 and ipv4). Args: sock (xsocket): The client socket object. addr (Tuple[str, int]): Client ip and port. flowinfo (Optional): Flow info if IPv6. scopeid (Optional): Scope id if IPv6. strictly_http2 (bool): Whether to srtrictly use `HTTP/2` without checking if user selected it. """ has_h2_alpn = False try: has_h2_alpn = sock.selected_alpn_protocol() == 'h2' except AttributeError: pass if not has_h2_alpn and not strictly_http2: # Fallback to HTTP/1 # The user selected alpn protocol is not h2, switch to default HTTP/1 only if Upgrade to h2c is not set try: # Receive the full request (in bytes) data = await SocketIO.async_receive_full_request(sock=sock) except TimeoutError: # For the first request, client took too long to respond. await self.async_do_request_timeout(sock, addr) return if not data: # Client sent an empty request, terminate the connection immediately SocketIO.close(sock) return if H2_UPGRADE_REGEX.match(data): try: # Lets upgrade to HTTP/2 settings_header = None settings_match = H2_SETTINGS_REGEX.search(data) if settings_match: settings_header = settings_match.group(1) settings_header = base64.urlsafe_base64decode(settings_header + "==") config = H2Configuration(client_side=False) h2_connection = H2Connection(config=config) # Set base server settings self.set_h2_settings(h2_connection) if settings_header: h2_connection.initiate_upgrade_connection(settings_header=settings_header) else: h2_connection.initiate_connection() # Send switching protocols response switching_proto_response = HttpSwitchProtocolResponse(upgrade_to="h2c") await response_handler.async_send_response( switching_proto_response, sock=sock, request=None, strictly_http1=True, ) # Send pending HTTP/2 data. try: await SocketIO.async_send( sock=sock, data=h2_connection.data_to_send(), suppress_errors=False, ) except (BrokenPipeError, ConnectionResetError): # Client disconnected return except Exception: return else: # No HTTP/2 Upgrade, strictly HTTP/1.1 request_data = RawRequestData(data) request_data.request_store["h2_handling"] = False await self.async_process_data(sock, addr, request_data) # Hang here... return # Initiate and send HTTP/2 preamble config = H2Configuration(client_side=False) h2_connection = H2Connection(config=config) # Set H2 settings and initiate connection self.set_h2_settings(h2_connection) h2_connection.initiate_connection() # Send pending HTTP/2 preamble data. try: await SocketIO.async_send( sock=sock, data=h2_connection.data_to_send(), suppress_errors=False, ) except (BrokenPipeError, ConnectionResetError): # Client disconnected return # Start handling H2 frames await self.async_start_http2_loop(sock, addr, h2_connection)
@logger.handle_exception async def async_handle_request_data( self, sock: xsocket, addr: Tuple[str, int], request_data: RequestData ) -> None: """ Processes and handles a request asynchronously but logs any encountered error (but doesn't raise exception). """ await super().async_handle_request_data(sock, addr, request_data)
[docs] async def async_start_http2_loop( self, sock: xsocket, addr: Tuple[str, int], h2_connection: H2Connection ) -> None: """ This starts the asynchronous loop for handling HTTP/2 connection. """ from duck.http.core.httpd.http2.protocol import H2Protocol from duck.http.core.httpd.http2.event_handler import EventHandler protocol = H2Protocol( sock=sock, addr=addr, conn=h2_connection, event_handler=None, event_loop=None, # optional in async mode. ) # Send pending H2 data. await protocol.async_send_pending_data() # Set some values. protocol.event_handler = EventHandler(protocol=protocol, server=self) sock.h2_protocol = protocol coro = protocol.run_forever() # Wait for coroutine to finish await coro