Source code for duck.http.core.httpd.http2.event_handler

"""
H2 Event handler module.
"""
import ssl
import time
import asyncio
import threading

from typing import (
    Dict,
    Optional,
    List,
    Tuple,
    Callable,
)
from functools import partial
from h2.events import (
    ConnectionTerminated,
    DataReceived,
    RemoteSettingsChanged,
    RequestReceived,
    StreamEnded,
    StreamReset,
    WindowUpdated
)
from h2.exceptions import ProtocolError
from h2.errors import ErrorCodes
from h2.settings import SettingCodes

from duck.settings import SETTINGS
from duck.http.request_data import RequestData
from duck.http.response import HttpResponse
from duck.contrib.sync import iscoroutinefunction, convert_to_async_if_needed
from duck.utils.threading import SyncFuture
from duck.utils.asyncio import create_task
from duck.logging import logger


[docs] class EventHandler: """ HTTP/2 Event handler. This handles `h2` events asynchrously. """ __slots__ = { "protocol", "conn", "server", "stream_data", "flow_control_futures", "async_tasks", "event_map", } def __init__(self, protocol, server): self.protocol = protocol self.conn = protocol.conn self.server = server self.stream_data = {} # {stream_id: RequestData} self.flow_control_futures = {} # {stream_id: asyncio.Future} self.async_tasks = {} # {stream_id: [asyncio.Task, ...]} self.event_map = { RequestReceived: self.on_new_request, DataReceived: self.on_request_body, StreamEnded: self.on_request_complete, ConnectionTerminated: self.on_connection_terminated, StreamReset: lambda e: self.on_stream_reset(e.stream_id), WindowUpdated: lambda e: self.on_window_updated(e.stream_id, e.delta), RemoteSettingsChanged: self.on_remote_settings_changed, }
[docs] async def entry(self, data: bytes): """ Entry method for processing incoming data. """ events = self.conn.receive_data(data) await self.dispatch_events(events)
[docs] def on_new_request(self, event): """ Received headers for a new request. """ stream_id = event.stream_id headers = { header.decode("utf-8"): value.decode("utf-8") for header, value in event.headers } request_data = RequestData(headers) self.stream_data[stream_id] = request_data
[docs] async def on_request_body(self, event): """ Called when we received a request body. """ stream_id = event.stream_id data = event.data stream_data = self.stream_data.get(stream_id) if not stream_data: self.conn.reset_stream( stream_id, error_code=ErrorCodes.PROTOCOL_ERROR ) else: stream_data.content += data self.conn.increment_flow_control_window(5 * 1024 * 1024, stream_id) self.conn.acknowledge_received_data(len(data), stream_id) await self.protocol.async_send_pending_data()
[docs] async def on_request_complete(self, event): """ Full request received. """ stream_id = event.stream_id request_data = self.stream_data.pop(stream_id, None) if not request_data: return # Create headers headers = request_data.headers topheader = "{method} {path} {http_version}".format( method=headers.pop(':method'), path=headers.pop(':path'), http_version='HTTP/1.1' ) if ":authority" in headers: authority = headers.pop(':authority') headers["host"] = authority if ":scheme" in headers: headers.pop(":scheme") #: Important # Set the request data topheader in headers request_data.headers["topheader"] = topheader # Set request data stream ID and h2_handling request_data.request_store["stream_id"] = stream_id request_data.request_store["h2_handling"] = True if SETTINGS['ASYNC_HANDLING']: # We are in async context coro = self.server.async_handle_request_data( self.protocol.sock, self.protocol.addr, request_data, ) await coro else: # The server is using threads to manage the connection, so we need to dispose the processing of request # back to the current thread so that it will be handled synchronously rather than in async context. await self.execute_synchronously_in_current_thread( partial( self.server.handle_request_data, self.protocol.sock, self.protocol.addr, request_data, ) )
[docs] async def execute_synchronously_in_current_thread(self, func: Callable): """ Adds a callable to `sync_queue` so that it will be executed outside async context, useful in multithreaded environment where threads are created for each connection and `ASYNC_HANDLING=False` Args: func (Callable): Callable function or method which doesn't accept any arguments. """ if not self.protocol.sync_queue: raise TypeError("Sync queue is not set, it is required for adding tasks to queue.") if SETTINGS['ASYNC_HANDLING']: raise SettingsError( "ASYNC_HANDLING is set to True so no thread will be available to handle this task." "This method must be called in a multithreaded environment." ) # Add task to queue so that will be executed synchronously. future = SyncFuture() self.protocol.sync_queue.put((func, future)) await convert_to_async_if_needed(future.result)()
[docs] def on_stream_reset(self, stream_id: int): """ Called when the client resets a stream. This can occur when: - The client cancels an in-progress request. - The client connection is aborted or timed out. - An internal protocol error is detected. Cleans up any cached request data, pending flow control futures, or in-progress tasks associated with the given stream. Args: stream_id (int): The HTTP/2 stream ID that was reset. """ # Remove stream request data if exists self.stream_data.pop(stream_id, None) # Cancel flow control future if one exists future = self.flow_control_futures.pop(stream_id, None) if future and not future.done(): future.cancel() # Cancel any tasks if any tasks = self.async_tasks.pop(stream_id, []) for task in tasks: if task and not task.done(): task.cancel()
[docs] def on_window_updated(self, stream_id, delta): """ A window update frame was received. """ if stream_id and stream_id in self.flow_control_futures: f = self.flow_control_futures.pop(stream_id) f.set_result(delta) elif not stream_id: for f in self.flow_control_futures.values(): f.set_result(delta) self.flow_control_futures = {}
[docs] def on_remote_settings_changed(self, event): """ On RemoteSettingsChanged event handler method. """ if SettingCodes.INITIAL_WINDOW_SIZE in event.changed_settings: self.on_window_updated(None, 0)
[docs] def on_connection_terminated(self, event): """ Connection terminated. """ for future in self.flow_control_futures.values(): future.cancel() # Cancel any tasks if any for tasks in self.async_tasks.values(): for task in tasks: task.cancel() self.flow_control_futures = {} self.async_tasks = {} self.protocol.connection_lost()
[docs] async def wait_for_flow_control(self, stream_id: int): """ Waits for a Future that fires when the flow control window is opened. Args: stream_id (int): The HTTP/2 stream ID. """ f = asyncio.Future() self.flow_control_futures[stream_id] = f await f
[docs] async def dispatch_events(self, events: List): """ Dispatch all received events. """ await self.protocol.async_send_pending_data() for event in events: handler = self.event_map.get(type(event)) try: if handler: if iscoroutinefunction(handler): # Do not run StreamEnded event handler in a task, doing that is causing SSLErrors somehow on verified SSL certificates if isinstance(event, (StreamEnded)) and False: # The following block is never going to run # This event need to be executed in background # TODO: Find a way to use tasks because somehow they are messing up the data/request when used to handle # StreamEnded, RequestReceived & DataReceived events # This is a safety measure because using tasks is also periodically causing SSL errors. task = create_task(handler(event)) if isinstance(event, RequestReceived): self.async_tasks[event.stream_id] = [task] else: tasks = self.async_tasks.get(event.stream_id, []) if task not in tasks: tasks.append(task) self.async_tasks[event.stream_id] = tasks else: await handler(event) else: handler(event) except Exception as e: # For every event failure, just log the exception and # continue with other events as not doing so may stall the connection logger.log_exception(e) # Send any pending data. await self.protocol.async_send_pending_data()