Source code for duck.http.core.httpd.task_executor
"""
Module containing RequestHandlingExecutor which handles execution of async coroutines and threaded tasks efficiently.
"""
import os
import asyncio
import platform
import threading
import concurrent.futures
from typing import (
Optional,
Union,
Callable,
Coroutine,
Any,
)
from collections import deque
from duck.contrib.sync import (
iscoroutine,
iscoroutinefunction,
)
from duck.settings import SETTINGS
from duck.logging import logger
from duck.utils.asyncio import create_task
from duck.utils.asyncio.eventloop import get_or_create_loop_manager
from duck.utils.threading.threadpool import get_or_create_thread_manager
[docs]
class RequestHandlingExecutor:
"""
A hybrid task executor for handling both async coroutines and
blocking CPU-bound operations using threads efficiently.
"""
def __init__(self):
"""
Initialize the RequestHandlingExecutor.
"""
pass
[docs]
def on_task_complete(self, future: Union[concurrent.futures.Future, asyncio.Future]):
"""
Callback to handle completion or failure of a task.
Args:
future (Union[concurrent.futures.Future, asyncio.Future]): Future object for the task.
"""
try:
# Raises if the task failed
future.result()
except Exception as e:
error_msg = f"Request handling task error: {e}"
# Enhance the exception with task name for debugging
if hasattr(future, 'name'):
e.args = (f"{e.args[0]}: [{future.name}]", )
error_msg = f"Request handling task error [{future.name}]: {e}"
# Log exception.
logger.log(error_msg, level=logger.WARNING)
if SETTINGS['DEBUG']:
logger.log_exception(e)
[docs]
def execute(self, task: Union[Callable, Coroutine]):
"""
Public interface to execute a task. It routes the task to either
the async task queue or the thread pool, depending on its type.
Args:
task (Callable or Coroutine): The task to run.
- If async, it's queued to run in event loop.
- If sync, it's submitted to the thread pool.
"""
if iscoroutine(task):
# Schedule coroutine in the asyncio loop from another thread
if not SETTINGS['ASYNC_HANDLING']:
raise RuntimeError(
"ASYNC_HANDLING is set to False yet a coroutine task has been submitted. "
"Expected a synchronous callable."
)
async def request_handler_wrapper(task):
create_task(task)
# Get manager and submit task
loop_manager = get_or_create_loop_manager(id="request-handling-eventloop-manager", strictly_get=True)
future = loop_manager.submit_task(request_handler_wrapper(task), task_type="request-handling-task")
else:
if SETTINGS['ASYNC_HANDLING']:
raise RuntimeError(
"ASYNC_HANDLING is set to True yet a non-coroutine task has been submitted. "
"Expected a coroutine."
)
# Submit blocking or CPU-bound task to the thread pool
threadpool_manager = get_or_create_thread_manager(id="request-handling-threadpool-manager", strictly_get=True)
future = threadpool_manager.submit_task(task, task_type="request-handling-task")
# Attach name and callback for error handling
future.name = getattr(task, 'name', repr(task))
future.add_done_callback(self.on_task_complete)