Source code for duck.app.app

"""
This module provides the core application class, `App`, for setting up and running a **Duck-based web application**. It supports various features, including:

- **HTTP/HTTPS Server**: Configures and starts an HTTP or HTTPS server based on application settings.
- **Django Integration**: Can forward requests to a Django server, supporting custom commands on startup.
- **SSL Management**: Checks and manages SSL certificates for secure communication.
- **Force HTTPS**: Redirects all HTTP traffic to HTTPS when enabled.
- **Automations**: Supports running automation scripts during runtime.
- **Ducksight Reloader**: Watches for file changes and enables dynamic reloading in **DEBUG mode**.
- **Port Management**: Ensures that application ports are available.
- **Signal Handling**: Gracefully handles termination signals (e.g., `Ctrl+C`) for clean shutdown.

---

## Attributes

| Attribute                  | Description |
|----------------------------|-------------|
| `DJANGO_ADDR`              | The address and port for the Django server. |
| `DOMAIN`                   | The domain name for the application. |
| `DJANGO_SERVER_WAIT_TIME`  | Time to wait for the Django server to start. |
| `server_up`                | Indicates if the main application server is running. |
| `django_server_up`         | Indicates if the Django server is responsive. |

---

## Methods

### **Application Control**
- `run()`: Starts the application and all services.
- `stop()`: Stops the application.
- `restart()`: Restarts the application.

### **Server Management**
- `start_server()`: Starts the main application server.
- `start_django_server()`: Starts Django and configures Duck as a reverse proxy.
- `start_force_https_server()`: Launches the HTTPS redirection service.

### **Background Services**
- `start_ducksight_reloader()`: Monitors file changes for live reloading.
- `start_automations_dispatcher()`: Handles scheduled automation scripts.

### **Event Handling & Security**
- `register_signals()`: Registers signal handlers for clean exits.
- `on_app_start()`: Event triggered when the application setup is complete.

---

## **Application Instance Management**
The `App` class ensures that only **one instance** of the application is running at a time.  
For **microservices or smaller applications**, use the `MicroApp` class instead.

---

## **Exceptions Handled**
- **`ApplicationError`**: Raised if multiple instances of `App` are created.
- **`SettingsError`**: Raised for misconfigurations in application settings.
- **`SSLError`**: Raised if SSL certificates or private keys are missing/invalid.
"""

import os
import sys
import json
import time
import signal
import socket
import threading
import setproctitle
import multiprocessing

from typing import (
    Optional,
    Dict,
    Any,
    Union,
)
from concurrent.futures import ThreadPoolExecutor, Future

from duck import processes
from duck.settings import SETTINGS
from duck.settings.loaded import SettingsLoaded
from duck.app.microapp import HttpsRedirectMicroApp
from duck.contrib.reloader.ducksight import DuckSightReloader
from duck.exceptions.all import (
    ApplicationError,
    SettingsError,
    SSLError,
)
from duck.http.core.httpd.servers import HTTPServer
from duck.logging import logger
from duck.meta import Meta
from duck.setup import setup, set_asyncio_loop
from duck.csp import csp_nonce_flag
from duck.art import display_duck_art
from duck.version import version
from duck.utils import ipc
from duck.utils.net import (is_ipv4, is_ipv6)
from duck.utils.path import url_normalize
from duck.utils.port_recorder import PortRecorder
from duck.utils.lazy import Lazy
from duck.utils.asyncio.eventloop import get_or_create_loop_manager
from duck.utils.threading.threadpool import get_or_create_thread_manager


if SETTINGS['USE_DJANGO']:
    from duck.backend.django import bridge
else:
    # Bridge for starting Django server.
    bridge = None


[docs] class App: """ Initializes and configures the **Duck** application. """ DJANGO_ADDR: tuple[str, int] = None """ Specifies the host address and port for the Django server. For enhanced security, ensure that uncommon ports are used. """ DOMAIN: str = None """ Domain for the application used in building the application absolute URI. """ DJANGO_SERVER_WAIT_TIME: int = SETTINGS["DJANGO_SERVER_WAIT_TIME"] """ Time in seconds to wait before checking if the Django server is up and running. This variable is used to periodically verify the server's status during the initial startup or maintenance routines, ensuring that the server is ready to handle incoming requests. """ __instances__: int = 0 """ The number of App instances, must be <= 1. """ __mainapp__ = None """ This is the main application instance. """ def __init__( self, addr: str = "localhost", port: int = 8000, domain: str = None, uses_ipv6: bool = False, no_checks: bool = False, disable_signal_handler: bool = False, disable_ipc_handler: bool = False, skip_setup: bool = False, enable_force_https_logs: bool = False, start_bg_eventloop_if_wsgi: bool = True, process_name: str = "duck-server", workers: Optional[int] = None, force_https_workers: Optional[int] = None, force_worker_processes: bool = False, force_https_force_worker_processes: bool = False, ): """ Initializes the main Duck application instance. This constructor sets up the application server, including optional Django integration, HTTPS redirection, and automation dispatching. It validates IP configuration, initializes environment settings, performs startup checks, and prepares runtime threads for the core services. Args: addr (str): The IP address or hostname the server will bind to. Defaults to "localhost". port (int): The port number to run the application on. Defaults to 8000. domain (str, optional): The public-facing domain for the app. If not provided, defaults to `addr`. uses_ipv6 (bool): Whether to use IPv6 for networking. Defaults to False. no_checks (bool): If True, skips initial environment checks. Defaults to False. disable_signal_handler (bool): If True, disables setup of OS-level signal handlers. Defaults to False. disable_ipc_handler (bool): If True, disables setup of inter-process communication handlers. Defaults to False. skip_setup (bool): If True, skips setting up Duck environment, e.g. setting up urlpatterns and blueprints. enable_force_https_logs (bool): If True, force https microapp logs will be outputed to console. Defaults to False. start_bg_eventloop_if_wsgi (bool): If True, it starts a request handling event loop in background thread for offloading coroutines in `WSGI` environment. This is useful for running asynchronous protocols like `HTTP/2` and `WebSockets` even in `WSGI` environment. process_name (str): The name of the process for this application. Defaults to "duck-server". workers (Optional[int]): Number of workers to use. None will disable workers. force_https_workers (Optional[int]): Number of workers to use for HTTPS redirects. None will disable workers. force_worker_processes (bool): Determines whether to use worker **processes** instead of the default worker **threads**. By default, when `workers` is greater than 1, the server will use worker **threads**. Threads avoid cross-process synchronization issues—such as component registry mismatches (e.g., issues with Lively components) that occur when state lives in separate processes. Set this flag to `True` only when process isolation is explicitly desired **and** you do not require shared in-memory synchronization between workers. force_https_force_worker_processes (bool): Whether to force use of multiple processes for `HTTPS` redirect app. Defaults to False. Raises: ApplicationError: If the provided address is invalid or if multiple main application instances are created (only one is allowed). Side Effects: - Validates IP address format (IPv4 or IPv6). - Initializes HTTPS redirect server if `FORCE_HTTPS` is enabled. - Starts Django server if `USE_DJANGO` is set. - Starts the main application server. - Registers automation triggers if `RUN_AUTOMATIONS` is enabled. - Adds the application port to a port registry to prevent conflicts. Notes: - Only a single instance of the main `App` should be created. For additional services or sub-applications, use `MicroApp`. - Set `disable_ipc_handler=False` **only** in a test environment. The IPC handler introduces a blocking mechanism that keeps the main interpreter running. Disabling it in production may lead to unhandled or improperly managed requests, as the blocking behavior is essential for proper execution. The app will be run in background and `app.run` won't be blocking anymore. """ if uses_ipv6 and not is_ipv6(addr) and not str(addr).isalnum(): raise ApplicationError("Argument uses_ipv6=True yet addr provided is not a valid IPV6 address.") if not uses_ipv6 and not is_ipv4(addr) and not str(addr).isalnum(): raise ApplicationError("Argument `addr` is not a valid IPV4 address.") self.addr = addr self.port = port self.uses_ipv6 = uses_ipv6 self.no_checks = no_checks self.is_domain_set = True if domain else False self.started = False self._restart_success = False # state on whether last restart operation has been successfull # Set appropriate domain self.domain = domain or (addr if not uses_ipv6 else f"[{addr}]") if is_ipv4(self.domain) and self.domain.startswith("0"): # IP "0.x.x.x" not allowed as domain because most browsers cannot resolve this. self.domain = "localhost" self.enable_https: bool = SETTINGS["ENABLE_HTTPS"] self.DOMAIN = self.domain self.SETTINGS = SETTINGS self.DJANGO_ADDR = addr, SETTINGS["DJANGO_BIND_PORT"] self.force_https = SETTINGS["FORCE_HTTPS"] self.force_https_port = SETTINGS["FORCE_HTTPS_BIND_PORT"] self.enable_force_https_logs = enable_force_https_logs self.use_django = SETTINGS["USE_DJANGO"] self.run_automations = SETTINGS["RUN_AUTOMATIONS"] self.disable_signal_handler = disable_signal_handler self.disable_ipc_handler = disable_ipc_handler self.skip_setup = skip_setup self.start_bg_eventloop_if_wsgi = start_bg_eventloop_if_wsgi self.process_name = process_name or "duck-server" self.workers = workers self.force_https_workers = force_https_workers self.force_worker_processes = force_worker_processes self.force_https_force_worker_processes = force_https_force_worker_processes # Initialize some attributes self.automations_dispatcher = None self.ducksight_reloader = None self.force_https_app = None self.last_request = None # Will be updated everytime by either ASGI/WSGI # Set the app thread pool executor def thread_pool_submit(task) -> Future: # Custom submit callable to always handle exceptions gracefully. @logger.handle_exception def on_task_done(task): err = task.exception() # May sometimes raise exception if err: raise err # Reraise exception future = self.thread_pool_executor._super_submit(task) future.add_done_callback(on_task_done) return future # Modify default thread pool executor submit method self.thread_pool_executor = ThreadPoolExecutor() self.thread_pool_executor._super_submit = self.thread_pool_executor.submit self.thread_pool_executor.submit = thread_pool_submit # Initialize some concurrent futures self.duck_server_future = None self.django_server_future = None self.automations_dispatcher_future = None self.ducksight_reloader_thread = None # Create some child processes (will be set later when necessary) self.force_https_app_process = None # Add application port to used ports PortRecorder.add_new_occupied_port(port, f"{self}") PortRecorder.add_new_occupied_port( SETTINGS["DJANGO_BIND_PORT"], "DJANGO_BIND_PORT", ) if self.use_django else None if not no_checks: # Run some checks self.run_checks() # Vital objects creation if self.force_https: self.force_https_addr = addr self.force_https_app = Lazy( HttpsRedirectMicroApp, location_root_url=self.absolute_uri, addr=self.force_https_addr, port=self.force_https_port, parent_app=self, domain=self.domain, uses_ipv6=uses_ipv6, enable_https=False, no_logs=not enable_force_https_logs, workers=force_https_workers, force_worker_processes=force_https_force_worker_processes, ) # Create https redirect micro application. # Set the process safe state for the force https app. self.force_https_process_safe_running_state = multiprocessing.Value('i', 0) # Create a server object. self.server = HTTPServer( (addr, port), application=self, domain=self.domain, uses_ipv6=uses_ipv6, enable_ssl=self.enable_https, no_logs=False, workers=workers, force_worker_processes=force_worker_processes, ) # Set process title self.set_process_name() # Set app instances count if type(self).__instances__ == 0: type(self).__instances__ += 1 type(self).__mainapp__ = self else: raise ApplicationError( "Application limit reached: Only one main application is permitted. " "To create additional functionalities, consider using a MicroApp." )
[docs] @classmethod def instances(cls) -> int: """ Returns number of Application instances. """ return cls.__instances__
[docs] @classmethod def get_main_app(cls) -> "App": """ Returns the main application instance if set. """ if not cls.__mainapp__: raise ApplicationError("Main application not set, there is no running application.") return cls.__mainapp__
[docs] @staticmethod def start_background_workers( application: Union['App', 'MicroApp'], start_request_handling_threadpool_manager, start_request_handling_eventloop_manager, start_component_threadpool_manager: bool = True, start_automations_eventloop_manager: bool = False, recreate_managers: bool = False, ): """ Starts or restarts background workers, e.g. `AsyncioLoopManager` & `ThreadPoolManager`. Args: application (Union['App', 'MicroApp']): The target application. start_request_handling_threadpool_manager (bool): Whether to start request handling threadpool in `WSGI` environment. This is only valid in WSGI environment only. start_request_handling_eventloop_manager (bool): Whether to start asyncio event loop either in `WSGI` or `ASGI` environment. start_component_threadpool_manager (bool): Whether to start the background threadpool manager for HTML components rendering, assistance, etc. Defaults to True. start_automations_eventloop_manager (bool): Whether to start a dedicated event loop for running automations. Defaults to False. recreate_managers (bool): Whether to recreate managers for the current thread and all it's descendents. Defaults to False. This argument doesn't affect argument `start_automations_eventloop_manager`. Argument `recreate_managers` only applies to every other manager except the automations eventloop manager. Notes: - This is usually useful when starting new worker processes/threads. - Use methods `get_or_create_loop_manager` and `get_or_create_thread_manager` to create new managers before this function if new managers are needed. - This only focus on default `AsyncioLoopManager` & `ThreadPoolManager`. - The thread manager is only run in `WSGI` mode but loop manager can be run in any environment (ASGI or WSGI). """ from duck.utils.threading import get_max_workers _async = SETTINGS['ASYNC_HANDLING'] start_bg_eventloop_if_wsgi = getattr(application, "start_bg_eventloop_if_wsgi", True) max_threadpool_workers = get_max_workers() if _async and start_request_handling_threadpool_manager: raise ApplicationError("Argument 'start_request_handling_threadpool_manager' can only be True in a WSGI environment.") if not _async and start_request_handling_eventloop_manager and not start_bg_eventloop_if_wsgi: raise ApplicationError("Argument 'start_request_handling_threadpool_manager' can only be True if app's `start_bg_eventloop_if_wsgi` is set to True.") if multiprocessing.parent_process() != None: # Not in main process; this is a child process. # Reset asyncio event loop set_asyncio_loop() # This block is not affected by recreate managers. if start_automations_eventloop_manager: loop_manager = get_or_create_loop_manager(id="automations-eventloop-manager") loop_manager.start() # Do not restrict task types for automations # Start component threadpool manager if start_component_threadpool_manager: component_threadpool_manager = get_or_create_thread_manager(id="component-threadpool-manager", force_create=recreate_managers) component_threadpool_manager.start( task_type="component-task", # Restrict task to only `component-task` type. max_workers=int(max_threadpool_workers / 2), daemon=True, thread_name_prefix="component-task", ) # In WSGI environment if not _async: if start_request_handling_threadpool_manager: # Start request handling threadpool request_handling_threadpool_manager = get_or_create_thread_manager(id="request-handling-threadpool-manager", force_create=recreate_managers) request_handling_threadpool_manager.start( task_type="request-handling-task", # Restrict task to only `request-handling-task` type. max_workers=max_threadpool_workers, daemon=True, thread_name_prefix="request-handling-task", ) # In any environment WSGI/ASGI if start_request_handling_eventloop_manager: request_handling_loop_manager = get_or_create_loop_manager(id="request-handling-eventloop-manager", force_create=recreate_managers) request_handling_loop_manager.start(task_type="request-handling-task") # Restrict to only request handling tasks
[docs] @staticmethod def check_ssl_credentials(): """ This checks for ssl certfile and private key file existence. Raises: SSLError: Either certfile or private key file is not found. """ certfile_path = SETTINGS["SSL_CERTFILE_LOCATION"] private_key_path = SETTINGS["SSL_PRIVATE_KEY_LOCATION"] if not os.path.isfile(certfile_path): raise SSLError( "SSL certfile provided in settings.py not found. You may use command `python3 -m duck ssl-gen` to " "generate a new self signed certificate and key pair." ) if not os.path.isfile(private_key_path): raise SSLError( "SSL private key provided in settings.py not found. You may use command `python3 -m duck ssl-gen` to " "generate a new self signed certificate and key pair." )
@property def running(self) -> bool: """ Returns True if the main server running else False. """ return self.server.running @property def meta(self) -> Dict[str, Any]: """ Get application metadata. """ return Meta.compile() @property def process_id(self) -> int: """ Returns the application main process ID. """ if not hasattr(self, "_main_process_id"): self._main_process_id = os.getpid() return self._main_process_id @property def absolute_uri(self) -> str: """ Returns application server absolute `URL`. """ scheme = "http" if self.enable_https: scheme = "https" uri = f"{scheme}://{self.domain}" uri = uri.strip("/").strip("\\") if not (self.port == 80 or self.port == 443): uri += f":{self.port}" return uri @property def absolute_ws_uri(self) -> str: """ Returns application server absolute WebSockets `URL`. """ scheme = "ws" if self.enable_https: scheme = "wss" uri = f"{scheme}://{self.domain}" uri = uri.strip("/").strip("\\") if not (self.port == 80 or self.port == 443): uri += f":{self.port}" return uri @property def server_up(self) -> bool: """ Checks whether the main application server is up and running. Returns: bool: True if up else False """ return self.server.running @property def django_server_up(self) -> bool: """ Checks whether django server to forward requests to has started Returns: started (bool): True if up else False """ import requests try: host_addr, port = self.DJANGO_ADDR if host_addr.startswith("0") and not self.uses_ipv6: # Host 0.0.0.0 not allowed on windows host_addr = "127.0.0.1" # Note: Use /admin path as this is not usually altered like path /. Path / # may be accessing real Duck path from Django side, which might be slower than # /admin path, leading to ReadTimeoutError. if not self.uses_ipv6: url = f"http://{host_addr}:{port}/admin" else: url = f"http://[{host_addr}]:{port}/admin" response = requests.get( url=url, headers={"Host": SETTINGS["DJANGO_SHARED_SECRET_DOMAIN"]}, timeout=1, ) # If we reached here, a response has been received return True except Exception: pass return False @property def force_https_server_up(self) -> bool: """ Checks whether force HTTPS redirect micro application is running. Returns: bool: True if up else False """ if self.force_https_app_process: if self.force_https_app_process.is_alive(): return bool(self.force_https_process_safe_running_state.value) else: return False else: return self.force_https_app.server.running
[docs] def set_process_name(self): """ Set the whole process name. """ setproctitle.setproctitle(self.process_name)
[docs] def run_checks(self): """ Runs application checks. """ if self.enable_https: self.check_ssl_credentials() # HTTPS checks if self.force_https: if not self.enable_https: raise SettingsError("FORCE_HTTPS has been set in settings.py, also ensure ENABLE_HTTPS=True.")
[docs] def build_absolute_uri(self, path: str) -> str: """ Builds and returns absolute URL from provided path. """ return url_normalize(self.absolute_uri + "/" + path)
[docs] def build_absolute_ws_uri(self, path: str) -> str: """ Builds and returns absolute WebsSockets URL from provided path. """ return url_normalize(self.absolute_ws_uri + "/" + path)
[docs] def start_server(self): """ Starts the app server in new thread. """ def start_server(): """ Starts Duck application main server. """ self.server.start_server() if not self.duck_server_future or not self.duck_server_future.running(): self.duck_server_future = self.thread_pool_executor.submit(start_server)
[docs] def start_django_server(self): """ Starts Django server and use Duck as reverse proxy server for Django. """ # We were starting Django in new process but we shouldn't because it isolates # memory spaces which may make using Lively component system at Django side difficult. # If we used a new process, synchronization between django process and main process Lively Component System registry # is almost impossible now. This can lead to components not found all the time. def start_django_server(): """ Starts Django application server """ host = self.DJANGO_ADDR # Start django server bridge.start_django_server(*host, uses_ipv6=self.uses_ipv6) if self.use_django: if not self.django_server_future or not self.django_server_future.is_running(): self.django_server_future = self.thread_pool_executor.submit(start_django_server)
[docs] def start_force_https_server(self, log_message: bool = True): """ Starts force HTTPS redirect micro application. Args: log_message (bool): Whether to log something before starting the micro app. Conditions: - `ENABLE_HTTPS = True` - `FORCE_HTTPS = True` """ def start_force_https(process_safe_running_state: multiprocessing.Value): """ Starts app for redirecting non encrypted traffic to main app using https. """ p = multiprocessing.current_process() setproctitle.setproctitle(p.name) signal.signal(signal.SIGINT, lambda *a: self.force_https_app.stop()) # Restart background workers # Recreate managers recreates and attaches new managers fot the current # thread and all its descendents. # This only restarts request handling threadpool/eventloop manager plus component threadpool manager App.start_background_workers(self, recreate_managers=True) # Start the microapp self.force_https_app.on_app_start = lambda: setattr(process_safe_running_state, 'value', int(self.force_https_app.server.running)) self.force_https_app.run(run_forever=True) # This is blocking; run_forever=True is blocking. if self.enable_https and self.force_https: # Log something if applicable. if log_message: logger.log( f"Forcing HTTPS to all incoming traffic [{SETTINGS['FORCE_HTTPS_BIND_PORT']} -> {self.port}]", level=logger.DEBUG, ) # Start https redirect process if not self.force_https_app_process or not self.force_https_app_process.is_alive(): self.force_https_app_process = multiprocessing.Process( target=start_force_https, name="duck-force-https-server", args=(self.force_https_process_safe_running_state, ), ) # Start the force https process self.force_https_app_process.start()
[docs] def start_automations_dispatcher(self, log_message: bool = True): """ Starts automations dispatcher for executing automations during runtime. Args: log_message (bool): Whether to log something before starting the micro app. Conditions: - `RUN_AUTOMATIONS = True` """ def start_automations_dispatcher(): """ Starts automations dispatcher for running and managing automations on runtime. """ if not self.automations_dispatcher: self.automations_dispatcher = SettingsLoaded.AUTOMATION_DISPATCHER(self) if log_message: automations_dispatcher_name = self.automations_dispatcher.__class__.__name__ logger.log( f"Running all automations with {automations_dispatcher_name}", level=logger.DEBUG, ) for trigger, automation in SettingsLoaded.AUTOMATIONS: # Register trigger and automation self.automations_dispatcher.register(trigger, automation) self.automations_dispatcher.start() if self.run_automations: # Submit task to the pool if not self.automations_dispatcher_future or not self.automations_dispatcher_future.running(): self.automations_dispatcher_future = self.thread_pool_executor.submit(start_automations_dispatcher)
[docs] def start_ducksight_reloader(self): """ Starts the DuckSight Reloader for reloading app on file modifications, deletions, etc. Notes: - Unlike other tasks like starting Duck server, HTTPS redirect server, etc which will be run by the app `thread_pool_executor`, this runs in an independant background thread with `daemon=True`. Conditions: - `DEBUG = True` """ # Note: Production server should not be restarted at any point only start duck sight reloader on DEBUG def start_reloader(): if not self.ducksight_reloader: self.ducksight_reloader = DuckSightReloader(SETTINGS['BASE_DIR']) self.ducksight_reloader.run() if SETTINGS["DEBUG"] and SETTINGS['AUTO_RELOAD']: # Start ducksight reloader if not self.ducksight_reloader_thread or not self.ducksight_reloader_thread.is_alive(): if not self.ducksight_reloader_thread: self.ducksight_reloader_thread = threading.Thread(target=start_reloader) self.ducksight_reloader_thread.start()
[docs] def get_threadpool_futures(self) -> Dict[str, Optional[Future]]: """ Returns a dictionary of all application concurrent futures as a result of submitting tasks to the `ThreadPoolExector`. Notes: - The `DuckSightReloader` task is run on an independent thread, so no future for it will be included in the dictionary. """ return { "duck_server": self.duck_server_future, "automations_dispatcher": self.automations_dispatcher_future, "django_server": self.django_server_future, }
[docs] def register_signals(self): """ Register and bind signals to appropriate signal handler. """ signal.signal(signal.SIGINT, self.handle_signal) signal.signal(signal.SIGTERM, self.handle_signal)
[docs] def handle_ipc_messages(self): """ Handles incoming IPC messages from the shared file. Notes: - This usually holds the main process from exiting because of the blocking behavior. """ with ipc.get_reader() as reader: # Clear ipc writer file with ipc.get_writer() as writer: writer.write_message("") # clear ipc shared file # Handle any incoming message. while True: message = reader.read_message().strip() if message: if any({message.lower() == i for i in ["bye", "quit", "exit"]}): self.stop() break # Simulate some delay time.sleep(1)
[docs] def record_metadata(self): """ Sets or updates the metadata for the app, these changes will be globally available in `duck.meta.Meta` class. """ # Security reasons not mentioning real servername but 'webserver' is safer. data = { "DUCK_SERVER_NAME": "webserver", "DUCK_SERVER_PORT": self.port, "DUCK_SERVER_DOMAIN": self.domain, "DUCK_SERVER_PROTOCOL": ("https" if self.enable_https else "http"), "DUCK_DJANGO_ADDR": self.DJANGO_ADDR, "DUCK_USES_IPV6": self.uses_ipv6, "DUCK_SERVER_BUFFER": SETTINGS["SERVER_BUFFER"], "DUCK_WORKERS": int(self.workers or 0), # Meta.update doesnt support NoneType } Meta.update_meta(data)
[docs] def handle_signal(self, sig, frame): """ Method for handling process signals. Signals: - `SIGINT` (Ctrl-C), `SIGTERM` (Terminate): Quits the server/application. """ if sig in [signal.SIGINT, signal.SIGTERM]: logger.log_raw("") # print a blank line to separate ^C and Stop message. self.stop(wait_for_thread_pool_executor_shutdown=False)
[docs] def stop_servers( self, stop_force_https_server: bool = True, log_to_console: bool = True, wait: bool = True, ): """ Stop all running servers i.e., Duck main server, Force HTTPS server & Django server. Args: stop_force_https_server (bool): Whether to stop Force HTTPS redirect microapp. log_to_console (bool): Whether to an exit message log to console. wait (bool): Whether to wait for termination. Defaults to True but with a timeout. """ self.server.stop_server(log_to_console=log_to_console, wait=wait) if ( stop_force_https_server and self.force_https_app_process and self.force_https_app_process.is_alive() ): self.force_https_app_process.terminate() self.force_https_app_process.join(1)
[docs] def on_pre_stop(self): """ Event called before final application termination. """ if self.run_automations: self.automations_dispatcher.stop()
[docs] def stop( self, log_to_console: bool = True, no_exit: bool = False, call_on_pre_stop_handler: bool = True, kill_ducksight_reloader: bool = True, wait_for_thread_pool_executor_shutdown: bool = True, close_log_file: bool = True, ): """ Stops the application and terminates the whole program. Args: no_exit (bool): Whether to terminate everything but keep the program running. log_to_console (bool): Whether to log an exit message. call_on_pre_stop_handler (bool): Whether to call method `on_pre_stop`. Defaults to True. kill_ducksight_reloader (bool): This attempts to kill the `DuckSightReloader`. Useful if `no_exit=True`, wait_for_thread_pool_executor_shutdown (bool): Whether to wait for the thread pool executor to complete shutdown, meaning waiting for current tasks to finish/cancel. This is only used if argument `no_exit=True` else it is automatically `False`. close_log_file (bool): Whether to close the log file. Defaults to True. """ if "--is-reload" in sys.argv: log_to_console = False def stop_future(future): """ Stops a running running future safely. """ if future and future.running(): try: future.cancel() except Exception: # Ignore as the thread_pool_executor is going to be shut down anyway if it's still running. pass # Cleanup session cache try: # Close the session storage connector SettingsLoaded.SESSION_STORAGE_CONNECTOR.close() except Exception as e: logger.log_raw('\n') logger.log(f"Error while closing session storage: {e}", level=logger.WARNING) try: # Stop all servers self.stop_servers(log_to_console=log_to_console, wait=wait_for_thread_pool_executor_shutdown if no_exit else False) except Exception as e: logger.log_raw('\n') logger.log(f"Error stopping servers: {e}", level=logger.ERROR) if SETTINGS['DEBUG']: logger.log_exception(e) if call_on_pre_stop_handler: try: # Execute a pre stop method before final termination. self.on_pre_stop() except Exception as e: logger.log_exception(e) # Log the exception. # Try cancel other cancelable components. if self.run_automations: try: self.automations_dispatcher.stop() except Exception as e: logger.log_exception(e) # Try cancel other cancelable components. if SETTINGS['DEBUG'] and SETTINGS['AUTO_RELOAD'] and kill_ducksight_reloader: try: self.ducksight_reloader.stop() if self.ducksight_reloader else None except Exception as e: logger.log_exception(e) # Cancel all running futures concurrent_futures = self.get_threadpool_futures() for name, future in concurrent_futures.items(): stop_future(future) # Shutdown threadpool executor if not stopped. if self.thread_pool_executor: try: self.thread_pool_executor.shutdown(wait=wait_for_thread_pool_executor_shutdown if no_exit else False) except Exception as e: logger.log_exception(e) if no_exit and kill_ducksight_reloader and wait_for_thread_pool_executor_shutdown: try: if self.ducksight_reloader_thread.is_alive(): self.ducksight_reloader_thread.join() except Exception as e: logger.log_exception(e) if close_log_file: # Close logging file if SETTINGS['LOG_TO_FILE']: try: logger.Logger.close_logfile() except Exception as e: logger.log(f"Error closing log file: {e}", level=logger.WARNING) # Perform forceful termination if needed if not no_exit: # Force exit (avoids lingering threads/processes) os._exit(0)
[docs] def on_app_start(self): """ Event called when application has successfully been started. """ # Record main process data log_file = None is_reload = False if "--is-reload" in sys.argv: # App is being restarted somehow is_reload = True if SETTINGS["LOG_TO_FILE"]: log_file = logger.Logger.get_current_logfile() try: # Set process metadata in a file. processes.set_process_data( name="main", data={ "pid": self.process_id, "start_time": time.time(), "sys_argv": sys.argv, "log_file": log_file, }, clear_existing_data=True, ) except json.JSONDecodeError: # The file used by duck.processes is corrupted pass # ignore for now, maybe writting is still in progress if not self.disable_signal_handler: self.register_signals() # bind signals to appropriate signal handlers logger.log( f"Use Ctrl-C to quit [APP PID: {self.process_id}]", level=logger.DEBUG, custom_color=logger.Fore.GREEN, ) if SETTINGS["AUTO_RELOAD"] and SETTINGS["DEBUG"]: # Start ducksight reloader (if not running) self.start_ducksight_reloader() logger.log( f"Duck Sight Reloader watching file changes", level=logger.DEBUG, custom_color=logger.Fore.GREEN, ) # Continue logging logger.log( "Waiting for incoming requests :-) \n", level=logger.DEBUG, custom_color=logger.Fore.GREEN, ) # Update application state self.started = True # Handle any incoming IPC messages. if not self.disable_ipc_handler: self.handle_ipc_messages() # this is a blocking operation
[docs] def run(self, print_ansi_art: bool = True): """ Runs the Duck application. """ # Setup Duck environment and the entire application. if not self.skip_setup: setup() # Record application metadata and run the server self.record_metadata() return self._run(print_ansi_art=print_ansi_art)
[docs] def _run(self, print_ansi_art: bool = True): """ Runs the Duck application. """ # Fine tune threadpool for executing sync_to_async calls (Only in ASGI because in WSGI, the app is already flooding with many Threads). from duck.contrib.sync.smart_async import _TRANSACTION_THREAD_POOL default_sync_to_async_workers = _TRANSACTION_THREAD_POOL.max_threads # Update max_threads according to app workers _TRANSACTION_THREAD_POOL.max_threads = (default_sync_to_async_workers * self.workers) if self.workers else default_sync_to_async_workers # App is not in reload state, continue bold_start = "\033[1m" bold_end = "\033[0m" duck_start_failure_msg = f"{bold_start}Failed to start Duck server{bold_end}" is_reload = False if "--is-reload" in sys.argv: # App is being restarted somehow is_reload = True logger.log_raw("") if not is_reload and print_ansi_art and not SETTINGS["SILENT"]: display_duck_art() # print duck art settings_mod = "DUCK_SETTINGS_MODULE" settings_mod = os.environ.get(settings_mod, 'settings') print(f"{bold_start}VERSION {version}{bold_end}") # Redirect all console output to log file if SETTINGS["LOG_TO_FILE"]: logger.Logger.redirect_console_output() # Log the current settings module in use settings_mod = "DUCK_SETTINGS_MODULE" settings_mod = os.environ.get(settings_mod, 'settings') logger.log_raw(f'{bold_start}USING SETTINGS{bold_end} "{settings_mod}" \n') if not SETTINGS['DEBUG'] and "*" in SETTINGS['ALLOWED_HOSTS']: logger.log("WARNING: ALLOWED_HOSTS seem to have global host (*)", level=logger.WARNING) if not self.is_domain_set: logger.log( f'WARNING: Domain not set, using "{self.domain}" ', level=logger.WARNING, ) # Start background event loop or threadpool if needed if self.workers: # We are using workers, so eventloops or threadpools will be # started by httpd.httpd.start_server.start_server_loop_in_worker. # Only start automations asyncio loop (if applicable) that will be run in background outside worker App.start_background_workers( self, start_request_handling_threadpool_manager=False, start_request_handling_eventloop_manager=False, start_component_threadpool_manager=False, start_automations_eventloop_manager=True, ) else: # No worker processes/threads will be responsible for starting the threads/asyncio loop manager for us # Start everything that needs to be started _async = SETTINGS['ASYNC_HANDLING'] start_bg_eventloop_if_wsgi = getattr(self, "start_bg_eventloop_if_wsgi", True) start_eventloop = _async or (not _async and start_bg_eventloop_if_wsgi) App.start_background_workers( self, start_request_handling_threadpool_manager=not _async, start_request_handling_eventloop_manager=start_eventloop, start_component_threadpool_manager=True, start_automations_eventloop_manager=True, ) if not SETTINGS['ASYNC_HANDLING']: if self.start_bg_eventloop_if_wsgi: # Started asyncio loop in background logger.log( "Background event loop scheduled", level=logger.DEBUG, ) else: logger.log( "App argument `start_bg_eventloop_if_wsgi` is set to False. " "This may prevent protocols like `HTTP/2` or `WebSockets` from working correctly\n", level=logger.WARNING, ) if SETTINGS['ENABLE_COMPONENT_SYSTEM']: # Components are enabled logger.log("Lively Component System active", level=logger.DEBUG) # Check if CSP headers are set correctly for component system to run nicely. csp_directives = SETTINGS['CSP_TRUSTED_SOURCES'] if SETTINGS['ENABLE_HEADERS_SECURITY_POLICY'] and csp_directives: script_src = set(csp_directives.get("script-src", [])) style_src = set(csp_directives.get("style-src", [])) # For components, we often need 'unsafe-eval' in script-src or default-src required_script_flags = {"'unsafe-eval'"} missing_script_flags = [ flag for flag in required_script_flags if flag not in script_src ] # For styles, we often need 'unsafe-inline' in style-src required_style_flag = "'unsafe-inline'" if "'unsafe-eval'" not in script_src: logger.log( ( f"Component system active but script flag {'unsafe-eval'} is missing from script-src. " "This may prevent JS execution from lively components." ), level=logger.WARNING, ) elif missing_script_flags: logger.log( ( f"Component system active but script flag(s) {', '.join(missing_script_flags)} are missing from script-src. " "This may prevent dynamic components from loading correctly." ), level=logger.WARNING, ) elif csp_nonce_flag in style_src: logger.log( ( f"Component system active but `csp_nonce_flag` is in style-src. " "This may block inline styles from components." ), level=logger.WARNING, ) elif required_style_flag not in style_src: logger.log( ( f"Component system active but flag {required_style_flag} is missing from style-src. " "This may block inline styles from components." ), level=logger.WARNING, ) if self.run_automations: # Start the automations dispatcher self.start_automations_dispatcher() # Start the main application server. self.start_server() # Log some info message and sleep for 2 minutes. logger.log("Waiting 2s to read server state...", level=logger.DEBUG) time.sleep(2) # Check server start attempt if not self.server_up: logger.log(duck_start_failure_msg, level=logger.ERROR) self.stop() return if self.force_https: # Start force HTTPS redirect server & wait 2 seconds self.start_force_https_server() time.sleep(2) # Check force HTTPS server start attempt if not self.force_https_server_up: logger.log("HTTPS redirect app failed to start", level=logger.ERROR) self.stop() return if self.use_django: logger.log( "Requests will be forwarded to Django server", level=logger.DEBUG, ) logger.log( f"Starting Django server on port [{SETTINGS['DJANGO_BIND_PORT']}]", level=logger.DEBUG, ) if SETTINGS["DJANGO_COMMANDS_ON_STARTUP"]: try: logger.log_raw("\n") bridge.run_django_app_commands() except Exception as e: logger.log( f"Failed to run django commands: {e}\n", level=logger.ERROR, ) logger.log_exception(e) self.stop() return # Wait for Django server to start wait_t = self.DJANGO_SERVER_WAIT_TIME logger.log( f"Waiting for Django server to start ({wait_t} secs)\n", level=logger.DEBUG, ) self.start_django_server() time.sleep(wait_t) # Check if django is running if not self.django_server_up: logger.log(f"Failed to get response from Django server [{wait_t} secs]", level=logger.ERROR) self.stop() return else: host_url = "http://" if not self.server.enable_ssl else "https://" host, port = self.server.addr if host.startswith("0") and not self.uses_ipv6: # Convert host to browser's recognizeable host = "127.0.0.1" else: if self.uses_ipv6: host = f"[{host}]" # Log some info host_url += f"{host}:{port}" #logger.log_raw("") logger.log( "Django started yey, that's good!", level=logger.DEBUG, custom_color=logger.Fore.GREEN, ) logger.log( f"Duck Server listening on {host_url}", level=logger.DEBUG, custom_color=logger.Fore.GREEN, ) # Call on_app_start handler self.on_app_start()
if __name__ == "__main__": multiprocessing.freeze_support()