"""
This module provides the core application class, `App`, for setting up and running a **Duck-based web application**. It supports various features, including:
- **HTTP/HTTPS Server**: Configures and starts an HTTP or HTTPS server based on application settings.
- **Django Integration**: Can forward requests to a Django server, supporting custom commands on startup.
- **SSL Management**: Checks and manages SSL certificates for secure communication.
- **Force HTTPS**: Redirects all HTTP traffic to HTTPS when enabled.
- **Automations**: Supports running automation scripts during runtime.
- **Ducksight Reloader**: Watches for file changes and enables dynamic reloading in **DEBUG mode**.
- **Port Management**: Ensures that application ports are available.
- **Signal Handling**: Gracefully handles termination signals (e.g., `Ctrl+C`) for clean shutdown.
"""
import os
import sys
import json
import time
import signal
import threading
import setproctitle
import multiprocessing
from typing import (
Optional,
Dict,
Any,
Union,
Callable,
)
from concurrent.futures import Future
from duck.settings import SETTINGS
from duck.settings.loaded import SettingsLoaded
from duck.exceptions.all import (
SSLError,
ApplicationError,
SettingsError,
)
from duck.http.core.httpd.servers import HTTPServer
from duck.logging import logger
from duck.meta import Meta
from duck.csp import csp_nonce_flag
from duck.version import version
from duck.utils.lazy import Lazy
from duck.utils.asyncio.eventloop import get_or_create_loop_manager
from duck.utils.threading.threadpool import get_or_create_thread_manager
from duck.app.base import BaseApp
[docs]
class App(BaseApp):
"""
Initializes and configures the **Duck** application.
"""
DEFAULT_ADDR = "localhost"
DEFAULT_PORT = 8000
__instances__ = 0
__mainapp__ = None
def __init__(
self,
name: Optional[str] = None,
# Network configuration
addr: str = DEFAULT_ADDR,
port: int = DEFAULT_PORT,
domain: Optional[str] = None,
server_url: Optional[str] = None,
uses_ipv6: bool = False,
# Process configuration
process_name: Optional[str] = None,
workers: Optional[int] = None,
force_worker_processes: bool = False,
# HTTPS redirect server
https_redirect_logs: bool = False,
https_redirect_workers: Optional[int] = None,
https_redirect_force_worker_processes: bool = False,
# Runtime behavior
start_bg_eventloop_if_wsgi: bool = True,
disable_signal_handler: bool = False,
disable_ipc_handler: bool = False,
# Development / internal flags
no_checks: bool = False,
skip_setup: bool = False,
events: Optional[Dict[str, Optional[Callable]]] = None,
) -> None:
"""
Initialize the main Duck application instance.
Sets up the application runtime, networking configuration, worker handling,
HTTPS redirect support, and internal runtime services.
Args:
name:
Optional application name.
addr:
Address or hostname to bind the server to.
port:
Port to bind the server to.
domain:
Public-facing domain name for the application. If not provided,
the bound address is used.
server_url:
Optional public-facing absolute server URL.
uses_ipv6:
Whether to use IPv6 networking.
process_name:
Optional process name used for runtime identification.
no_checks:
Whether to skip startup and environment checks.
skip_setup:
Whether to skip automatic framework setup such as URL and
blueprint registration.
disable_signal_handler:
Whether to disable OS signal handlers.
disable_ipc_handler:
Whether to disable the internal IPC handler.
start_bg_eventloop_if_wsgi:
Whether to start a background asyncio event loop when running
in a WSGI environment. This allows async protocols such as
WebSockets and HTTP/2 to run under WSGI.
workers:
Number of workers to use for the main application server.
`None` disables workers.
force_worker_processes:
Whether to use worker processes instead of threads for the
main application server.
By default, Duck uses threads because they avoid cross-process
synchronization issues involving shared in-memory state such as
component registries.
Enable this only when process isolation is explicitly required.
https_redirect_logs:
Whether to enable console logs for the HTTPS redirect server.
https_redirect_workers:
Number of workers to use for the HTTPS redirect server.
`None` disables workers.
https_redirect_force_worker_processes:
Whether to use worker processes instead of threads for the
HTTPS redirect server.
events:
Events to handle e.g. {"on_start": some_callable}. Defaults to None.
Raises:
ApplicationError:
Raised if the provided address is invalid or if multiple
main application instances are created.
Notes:
- Only one main `App` instance should exist per process.
Use `MicroApp` for additional services or sub-applications.
- Disabling the IPC handler is intended mainly for testing.
In normal environments, the IPC handler keeps the runtime
alive and coordinated correctly.
"""
from duck.utils.threading.threadpool import get_or_create_thread_manager
from duck.app.microapp import HttpsRedirectMicroApp
# Runtime behavior
self.skip_setup = skip_setup
self.disable_signal_handler = disable_signal_handler
self.disable_ipc_handler = disable_ipc_handler
self.start_bg_eventloop_if_wsgi = start_bg_eventloop_if_wsgi
# Process configuration
self.process_name = process_name or "duck-server"
# Django configuration
self.use_django = SETTINGS["USE_DJANGO"]
self.django_server_wait_time = SETTINGS["DJANGO_SERVER_WAIT_TIME"]
self.django_addr = addr
self.django_bind_port = SETTINGS["DJANGO_BIND_PORT"]
# HTTPS redirect configuration
self.https_redirect = SETTINGS["HTTPS_REDIRECT"]
self.https_redirect_addr = addr
self.https_redirect_port = SETTINGS["HTTPS_REDIRECT_BIND_PORT"]
self.https_redirect_logs = https_redirect_logs
self.https_redirect_workers = https_redirect_workers
self.https_redirect_force_worker_processes = https_redirect_force_worker_processes
# Automation configuration
self.run_automations = SETTINGS["RUN_AUTOMATIONS"]
# Runtime objects
self.automations_dispatcher = None
self.ducksight_reloader = None
self.https_redirect_app = None
# Concurrent futures / threads
self.server_future = None
self.django_server_future = None
self.automations_dispatcher_future = None
self.ducksight_reloader_thread = None
# Child processes
self.https_redirect_process = None
# Runtime executor
self.runtime_executor = get_or_create_thread_manager(id="app-runtime-executor")
# Super initialize
super().__init__(
name=name,
addr=addr,
port=port,
domain=domain,
server_url=server_url,
uses_ipv6=uses_ipv6,
enable_https=SETTINGS['ENABLE_HTTPS'],
no_checks=no_checks,
workers=workers,
force_worker_processes=force_worker_processes,
events=events,
)
# HTTPS redirect app
if self.https_redirect:
self.https_redirect_app = HttpsRedirectMicroApp(
server_url=self.server_url,
addr=self.https_redirect_addr,
port=self.https_redirect_port,
domain=self.domain,
uses_ipv6=self.uses_ipv6,
enable_https=False,
no_logs=not self.https_redirect_logs,
workers=self.https_redirect_workers,
force_worker_processes=self.https_redirect_force_worker_processes,
events={"on_start": lambda *_: setattr(self.https_redirect_process_running_state, 'value', int(self.https_redirect_app.server.running))}
)
# Set HTTPS redirect state.
self.https_redirect_process_running_state = multiprocessing.Value("i", 0)
# Main HTTP server
self.server = HTTPServer(
(addr, port),
application=self,
domain=self.domain,
uses_ipv6=self.uses_ipv6,
enable_ssl=self.enable_https,
no_logs=False,
workers=self.workers,
force_worker_processes=self.force_worker_processes,
)
# Process setup
self.set_process_name()
# Main app singleton guard
if type(self).__instances__ > 0:
raise ApplicationError(
"Application limit reached: only one main application is permitted. "
"Use MicroApp for additional services or sub-applications."
)
type(self).__instances__ += 1
type(self).__mainapp__ = self
[docs]
@classmethod
def instances(cls) -> int:
"""
Returns number of Application instances.
"""
return cls.__instances__
[docs]
@classmethod
def get_main_app(cls) -> "App":
"""
Returns the main application instance if set.
"""
if not cls.__mainapp__:
raise ApplicationError("Main application not set, there is no running application.")
return cls.__mainapp__
[docs]
@staticmethod
def start_background_workers(
application: BaseApp,
start_request_handling_threadpool_manager,
start_request_handling_eventloop_manager,
start_component_threadpool_manager: bool = True,
start_automations_eventloop_manager: bool = False,
recreate_managers: bool = False,
):
"""
Starts or restarts background workers, e.g. `AsyncioLoopManager` & `ThreadPoolManager`.
Args:
application (BaseApp): The target application.
start_request_handling_threadpool_manager (bool): Whether to start request handling threadpool in `WSGI` environment. This is only valid in WSGI environment only.
start_request_handling_eventloop_manager (bool): Whether to start asyncio event loop either in `WSGI` or `ASGI` environment.
start_component_threadpool_manager (bool): Whether to start the background threadpool manager for HTML components rendering, assistance, etc. Defaults to True.
start_automations_eventloop_manager (bool): Whether to start a dedicated event loop for running automations. Defaults to False.
recreate_managers (bool): Whether to recreate managers for the current thread and all it's descendents. Defaults to False.
This argument doesn't affect argument `start_automations_eventloop_manager`. Argument `recreate_managers` only applies to
every other manager except the automations eventloop manager.
Notes:
- This is usually useful when starting new worker processes/threads.
- Use methods `get_or_create_loop_manager` and `get_or_create_thread_manager` to create new managers before this function if
new managers are needed.
- This only focus on default `AsyncioLoopManager` & `ThreadPoolManager`.
- The thread manager is only run in `WSGI` mode but loop manager can be run in any environment (ASGI or WSGI).
"""
from duck.utils.threading import get_max_workers
from duck.setup import set_asyncio_loop
async_ = SETTINGS['ASYNC_HANDLING']
start_bg_eventloop_if_wsgi = getattr(application, "start_bg_eventloop_if_wsgi", True)
max_threadpool_workers = get_max_workers()
if async_ and start_request_handling_threadpool_manager:
raise ApplicationError("Argument 'start_request_handling_threadpool_manager' can only be True in a WSGI environment.")
if not async_ and start_request_handling_eventloop_manager and not start_bg_eventloop_if_wsgi:
raise ApplicationError("Argument 'start_request_handling_threadpool_manager' can only be True if app's `start_bg_eventloop_if_wsgi` is set to True.")
if multiprocessing.parent_process() is not None:
# Not in main process; this is a child process.
# Reset asyncio event loop
set_asyncio_loop()
# This block is not affected by recreate managers.
if start_automations_eventloop_manager:
loop_manager = get_or_create_loop_manager(id="automations-eventloop-manager")
loop_manager.start() # Do not restrict task types for automations
# Start component threadpool manager
if start_component_threadpool_manager:
component_threadpool_manager = get_or_create_thread_manager(id="component-threadpool-manager", force_create=recreate_managers)
component_threadpool_manager.start(
task_type="component-task", # Restrict task to only `component-task` type.
max_workers=int(max_threadpool_workers / 2),
daemon=True,
thread_name_prefix="component-task",
)
# In WSGI environment
if not async_:
if start_request_handling_threadpool_manager:
# Start request handling threadpool
request_handling_threadpool_manager = get_or_create_thread_manager(id="request-handling-threadpool-manager", force_create=recreate_managers)
request_handling_threadpool_manager.start(
task_type="request-handling-task", # Restrict task to only `request-handling-task` type.
max_workers=max_threadpool_workers,
daemon=True,
thread_name_prefix="request-handling-task",
)
# In any environment WSGI/ASGI
if start_request_handling_eventloop_manager:
request_handling_loop_manager = get_or_create_loop_manager(id="request-handling-eventloop-manager", force_create=recreate_managers)
request_handling_loop_manager.start(task_type="request-handling-task") # Restrict to only request handling tasks
[docs]
@staticmethod
def check_ssl_credentials():
"""
This checks for ssl certfile and private key file existence.
Raises:
SSLError: Either certfile or private key file is not found.
"""
certfile_path = SETTINGS["SSL_CERTFILE_LOCATION"]
private_key_path = SETTINGS["SSL_PRIVATE_KEY_LOCATION"]
if not os.path.isfile(certfile_path):
raise SSLError(
"SSL certfile provided in settings.py not found. You may use command `python3 -m duck ssl-gen` to "
"generate a new self signed certificate and key pair."
)
if not os.path.isfile(private_key_path):
raise SSLError(
"SSL private key provided in settings.py not found. You may use command `python3 -m duck ssl-gen` to "
"generate a new self signed certificate and key pair."
)
@property
def meta(self) -> Dict[str, Any]:
"""
Get global application metadata.
"""
return Meta.compile()
@property
def process_id(self) -> int:
"""
Returns the application main process ID.
"""
if not hasattr(self, "_main_process_id"):
self._main_process_id = os.getpid()
return self._main_process_id
@property
def absolute_uri(self) -> str:
"""
Returns application server absolute `URL` - this is fetched using `duck.meta.Meta`.
"""
return Meta.get_absolute_server_url()
@property
def absolute_ws_uri(self) -> str:
"""
Returns application server absolute WebSockets `URL` - this is fetched using `duck.meta.Meta`.
"""
return Meta.get_absolute_ws_server_url()
@property
def django_server_up(self) -> bool:
"""
Checks whether django server to forward requests to has started
Returns:
started (bool): True if up else False
"""
import requests
try:
host_addr, port = self.django_addr, self.django_bind_port
if host_addr.startswith("0") and not self.uses_ipv6:
# Host 0.0.0.0 not allowed on windows
host_addr = "127.0.0.1"
# Note: Use /admin path as this is not usually altered like path /. Path /
# may be accessing real Duck path from Django side, which might be slower than
# /admin path, leading to ReadTimeoutError.
if not self.uses_ipv6:
url = f"http://{host_addr}:{port}/admin"
else:
url = f"http://[{host_addr}]:{port}/admin"
_ = requests.get(
url=url,
headers={"Host": SETTINGS["DJANGO_SHARED_SECRET_DOMAIN"]},
timeout=1,
)
# If we reached here, a response has been received
return True
except Exception:
pass
return False
@property
def https_redirect_server_up(self) -> bool:
"""
Checks whether HTTPS redirect micro application is running.
Returns:
bool: True if up else False
"""
if self.https_redirect_process:
if self.https_redirect_process.is_alive():
return bool(self.https_redirect_process_running_state.value)
else:
return False
else:
return self.https_redirect_app.server.running
[docs]
def register_ports(self):
"""
Register occupied ports.
"""
from duck.utils.port_registry import PortRegistry
super().register_ports()
if self.use_django:
PortRegistry.register_port(self.django_bind_port, "DJANGO_BIND_PORT")
[docs]
def run_checks(self):
"""
Runs application checks.
"""
if self.enable_https:
self.check_ssl_credentials()
[docs]
def set_process_name(self):
"""
Set the whole process name.
"""
setproctitle.setproctitle(self.process_name)
[docs]
def register_signals(self):
"""
Register and bind signals to appropriate signal handler.
"""
signal.signal(signal.SIGINT, self.handle_signal)
signal.signal(signal.SIGTERM, self.handle_signal)
[docs]
def handle_signal(self, sig, frame):
"""
Method for handling process signals.
Signals:
- `SIGINT` (Ctrl-C), `SIGTERM` (Terminate): Quits the server/application.
"""
if sig in [signal.SIGINT, signal.SIGTERM]:
self.stop(wait_for_runtime_executor_shutdown=False)
[docs]
def handle_ipc_messages(self):
"""
Handles incoming IPC messages from the shared file.
Notes:
- This usually holds the main process from exiting because of the blocking behavior.
"""
from duck.utils import ipc
with ipc.get_reader() as reader:
with ipc.get_writer() as writer:
# Clear IPC writer file
writer.write_message("") # Clear ipc shared file
while True:
# Handle any incoming message.
message = reader.read_message().strip()
if message:
if any({message.lower() == i for i in ["bye", "quit", "exit"]}):
self.stop()
break
# Simulate some delay
time.sleep(1)
[docs]
def start_server(self) -> None:
"""
Starts the app server in new thread.
"""
if not self.server_future or not self.server_future.running():
self.server_future = self.runtime_executor.submit_task(self.server.start_server)
[docs]
def start_django_server(self) -> None:
"""
Starts Django server and use Duck as reverse proxy server for Django.
"""
from duck.backend.django import bridge
# We were starting Django in new process but we shouldn't because it isolates
# memory spaces which may make using Lively component system at Django side difficult.
# If we used a new process, synchronization between django process and main process Lively Component System registry
# is almost impossible now. This can lead to components not found all the time.
def start_django_server():
"""
Starts Django application server
"""
# Set the host to start Django on.
host = self.django_addr, self.django_bind_port
# Start django server
bridge.start_django_server(*host, uses_ipv6=self.uses_ipv6)
if self.use_django:
if not self.django_server_future or not self.django_server_future.is_running():
self.django_server_future = self.runtime_executor.submit_task(start_django_server)
[docs]
def start_https_redirect_server(self, log_message: bool = True):
"""
Starts HTTPS redirect micro application in a new process.
Args:
log_message (bool): Whether to log something before starting the micro app.
"""
def start_https_redirect(process_safe_running_state: multiprocessing.Value):
"""
Starts app for redirecting non encrypted traffic to main app using https.
"""
p = multiprocessing.current_process()
# Set process name.
setproctitle.setproctitle(p.name)
# Register signal handler
signal.signal(signal.SIGINT, lambda *a: self.https_redirect_app.stop())
# Restart background workers
# Recreate managers recreates and attaches new managers fot the current
# thread and all its descendents.
# Restart only request handling threadpool/eventloop manager
_async = SETTINGS['ASYNC_HANDLING']
start_bg_eventloop_if_wsgi = getattr(self, "start_bg_eventloop_if_wsgi", True)
start_eventloop = _async or (not _async and start_bg_eventloop_if_wsgi)
App.start_background_workers(
self,
start_request_handling_threadpool_manager=not _async,
start_request_handling_eventloop_manager=start_eventloop,
start_component_threadpool_manager=False,
start_automations_eventloop_manager=False,
recreate_managers=True,
)
# Start the microapp
self.https_redirect_app.run(run_forever=True) # This is blocking; run_forever=True is blocking.
if self.https_redirect:
# Start HTTP redirect process
if not self.https_redirect_process or not self.https_redirect_process.is_alive():
self.https_redirect_process = multiprocessing.Process(
target=start_https_redirect,
name="duck-https-redirect-server",
args=(self.https_redirect_process_running_state, ),
)
# Start the HTTPS redirect process
self.https_redirect_process.start()
# Log something if applicable.
if log_message:
logger.log(
f"Redirecting incoming HTTP traffic [{self.https_redirect_port} -> {self.port}]",
level=logger.DEBUG,
)
[docs]
def start_automations_dispatcher(self, log_message: bool = True):
"""
Starts automations dispatcher for executing automations during runtime.
Args:
log_message (bool): Whether to log something before starting the micro app.
"""
def start_automations_dispatcher():
"""
Starts automations dispatcher for running and managing automations on runtime.
"""
if not self.automations_dispatcher:
self.automations_dispatcher = SettingsLoaded.AUTOMATION_DISPATCHER(self)
if log_message:
automations_dispatcher_name = self.automations_dispatcher.__class__.__name__
logger.log(
f"Running all automations with {automations_dispatcher_name}",
level=logger.DEBUG,
)
for trigger, automation in SettingsLoaded.AUTOMATIONS:
# Register trigger and automation
self.automations_dispatcher.register(trigger, automation)
# Start automations dispatcher
self.automations_dispatcher.start()
if self.run_automations:
# Submit task to the pool
if not self.automations_dispatcher_future or not self.automations_dispatcher_future.running():
self.automations_dispatcher_future = self.runtime_executor.submit_task(start_automations_dispatcher)
[docs]
def start_ducksight_reloader(self):
"""
Starts the DuckSight Reloader for reloading app on file modifications, deletions, etc.
Notes:
- Unlike other tasks like starting Duck server, HTTPS redirect server, etc which will be run by the app's `runtime_executor`,
this runs in an independant background thread with `daemon=True`.
"""
from duck.contrib.reloader.ducksight import DuckSightReloader
# Note: Production server should not be restarted at any point only start duck sight reloader on DEBUG
def start_reloader():
"""
Start the app's reloader.'
"""
if not self.ducksight_reloader:
self.ducksight_reloader = DuckSightReloader(SETTINGS['BASE_DIR'])
self.ducksight_reloader.run()
if SETTINGS["DEBUG"] and SETTINGS['AUTO_RELOAD']:
if not self.ducksight_reloader_thread or not self.ducksight_reloader_thread.is_alive():
if not self.ducksight_reloader_thread:
self.ducksight_reloader_thread = threading.Thread(target=start_reloader)
self.ducksight_reloader_thread.start()
[docs]
def get_runtime_futures(self) -> Dict[str, Optional[Future]]:
"""
Returns a dictionary of all application runtime concurrent futures.
Notes:
- The `DuckSightReloader` task is run on an independent thread, so no future for it will be included.
"""
return {
"duck_server": self.server_future,
"automations_dispatcher": self.automations_dispatcher_future,
"django_server": self.django_server_future,
}
[docs]
def stop_servers(
self,
stop_https_redirect_server: bool = True,
log_to_console: bool = True,
wait: bool = True,
):
"""
Stop all running servers i.e., Duck main server, HTTPS redirect server & Django server.
Args:
stop_https_redirect_server (bool): Whether to stop HTTPS redirect microapp server.
log_to_console (bool): Whether to an exit message log to console.
wait (bool): Whether to wait for termination. Defaults to True but with a timeout.
"""
self.server.stop_server(log_to_console=log_to_console, wait=wait)
if (
stop_https_redirect_server and self.https_redirect_process
and self.https_redirect_process.is_alive()
):
# Terminate https redirect app process
self.https_redirect_process.terminate()
if wait:
# Wait for termination
self.https_redirect_process.join(1)
[docs]
def stop(
self,
log_to_console: bool = True,
no_exit: bool = False,
dispatch_pre_stop_event: bool = True,
kill_ducksight_reloader: bool = True,
wait_for_runtime_executor_shutdown: bool = True,
close_log_file: bool = True,
):
"""
Stops the application and terminates the whole program.
Args:
no_exit (bool): Whether to terminate everything but keep the program running.
log_to_console (bool): Whether to log an exit message.
dispatch_pre_stop_event (bool): Whether to call method `on_pre_stop`. Defaults to True.
kill_ducksight_reloader (bool): This attempts to kill the `DuckSightReloader`. Useful if `no_exit=True`,
wait_for_runtime_executor_shutdown (bool): Whether to wait for the runtime executor to complete shutdown.
This mean waiting for current tasks to finish/cancel.
This is only used if argument `no_exit=True` else it is automatically `False`.
close_log_file (bool): Whether to close the log file. Defaults to True.
"""
if "--is-reload" in sys.argv:
log_to_console = False
def stop_future(future):
"""
Stops a running running future safely.
"""
if future and future.running():
try:
future.cancel()
except Exception:
# Ignore as the thread_pool_executor is going to be shut down anyway if it's still running.
pass
if dispatch_pre_stop_event:
# Dispatch pre stop event.
self.dispatch_event("on_pre_stop")
try:
# Close the session storage connector in new thread
SettingsLoaded.SESSION_STORAGE_CONNECTOR.close()
except Exception as e:
logger.log_raw('\n')
logger.log(f"Error closing session storage: {e}", level=logger.WARNING)
try:
# Stop all started servers
self.stop_servers(log_to_console=log_to_console, wait=wait_for_runtime_executor_shutdown if no_exit else False)
except Exception as e:
logger.log_raw('\n')
logger.log(f"Error stopping servers: {e}", level=logger.ERROR)
if SETTINGS['DEBUG']:
logger.log_exception(e)
# Try cancel other cancelable components.
if self.run_automations:
try:
self.automations_dispatcher.stop()
except Exception as e:
logger.log_exception(e)
# Try cancel other cancelable components.
if SETTINGS['DEBUG'] and SETTINGS['AUTO_RELOAD'] and kill_ducksight_reloader:
try:
self.ducksight_reloader.stop() if self.ducksight_reloader else None
except Exception as e:
logger.log_exception(e)
# Cancel all running futures
concurrent_futures = self.get_runtime_futures()
for name, future in concurrent_futures.items():
stop_future(future)
# Shutdown runtime executor if not stopped.
if self.runtime_executor:
try:
self.runtime_executor.stop(wait=wait_for_runtime_executor_shutdown if no_exit else False)
except Exception as e:
logger.log_exception(e)
if (
no_exit
and kill_ducksight_reloader
and wait_for_runtime_executor_shutdown
):
try:
if self.ducksight_reloader_thread.is_alive():
self.ducksight_reloader_thread.join()
except Exception as e:
logger.log_exception(e)
if close_log_file:
# Close the logging file
if SETTINGS['LOG_TO_FILE']:
try:
logger.Logger.close_logfile()
except Exception as e:
logger.log(f"Error closing log file: {e}", level=logger.WARNING)
if not no_exit:
# Perform forceful termination if needed
# Force exit (avoids lingering threads/processes)
os._exit(0)
[docs]
def log_startup_warnings(self):
"""
Logs configuration warnings before the server starts.
Covers allowed hosts, domain, and CSP policy checks
relevant to the component system.
"""
from duck.security.dashboard import get_dashboard_security_issues
if not SETTINGS['DEBUG'] and "*" in SETTINGS['ALLOWED_HOSTS']:
logger.log(
"WARNING: ALLOWED_HOSTS seem to have global host (*)",
level=logger.WARNING,
)
if not self.original_domain:
logger.log(
f'WARNING: Domain not set, using "{self.domain}" ',
level=logger.WARNING,
)
# Log some setup warnings.
if SETTINGS['ENABLE_DASHBOARD']:
issues = get_dashboard_security_issues() or []
for issue in issues:
logger.log(issue, level=logger.WARNING)
# Log a message that dashboard has been disabled if insecure.
if issues:
logger.log(f"Dashboard disabled for security reasons: {len(issues)} issue(s) found", level=logger.WARNING)
[docs]
def log_component_system_warnings(self):
"""
Warns about missing CSP flags required by the Lively component system.
Checks script-src for 'unsafe-eval' and style-src for 'unsafe-inline',
logging warnings for any missing directives.
"""
logger.log("Lively Component System active", level=logger.DEBUG)
# Build CSP directives.
csp_directives = SETTINGS['CSP_TRUSTED_SOURCES']
if not (SETTINGS['ENABLE_HEADERS_SECURITY_POLICY'] and csp_directives):
return
script_src = set(csp_directives.get("script-src", []))
style_src = set(csp_directives.get("style-src", []))
# Script flags
required_script_flags = {"'unsafe-eval'"}
missing_script_flags = [
flag for flag in required_script_flags
if flag not in script_src
]
# Style flags
required_style_flag = "'unsafe-inline'"
if "'unsafe-eval'" not in script_src:
logger.log(
(
f"Component system active but script flag {'unsafe-eval'} "
"is missing from script-src. "
"This may prevent JS execution from lively components."
),
level=logger.WARNING,
)
elif missing_script_flags:
logger.log(
(
f"Component system active but script flag(s) "
f"{', '.join(missing_script_flags)} are missing from script-src. "
"This may prevent dynamic components from loading correctly."
),
level=logger.WARNING,
)
elif csp_nonce_flag in style_src:
logger.log(
(
"Component system active but `csp_nonce_flag` is in style-src. "
"This may block inline styles from components."
),
level=logger.WARNING,
)
elif required_style_flag not in style_src:
logger.log(
(
f"Component system active but flag {required_style_flag} "
"is missing from style-src. "
"This may block inline styles from components."
),
level=logger.WARNING,
)
[docs]
def start_background_event_loop(self):
"""
Starts background threads and event loop(s) based on worker config.
With workers enabled, only the automations loop is started here — the
rest are handled per-worker. Without workers, all managers are started
directly, with the event loop conditional on ASYNC_HANDLING.
"""
if self.workers:
# Workers handle their own request loops; only start automations here
App.start_background_workers(
self,
start_request_handling_threadpool_manager=False,
start_request_handling_eventloop_manager=False,
start_component_threadpool_manager=False,
start_automations_eventloop_manager=True,
)
return
# No workers — start everything ourselves
async_ = SETTINGS['ASYNC_HANDLING']
start_bg_eventloop_if_wsgi = getattr(self, "start_bg_eventloop_if_wsgi", True)
start_eventloop = async_ or (not async_ and start_bg_eventloop_if_wsgi)
App.start_background_workers(
self,
start_request_handling_threadpool_manager=not async_,
start_request_handling_eventloop_manager=start_eventloop,
start_component_threadpool_manager=True,
start_automations_eventloop_manager=True,
)
if not async_:
if start_bg_eventloop_if_wsgi:
logger.log("Background event loop scheduled", level=logger.DEBUG)
else:
logger.log(
"App argument `start_bg_eventloop_if_wsgi` is set to False. "
"This may prevent protocols like `HTTP/2` or `WebSockets` from working correctly\n",
level=logger.WARNING,
)
[docs]
def wait_for_main_server(self, start_failure_msg: str) -> bool:
"""
Waits briefly then checks whether the main server came up.
Args:
start_failure_msg: Message to log if the server failed to start.
Returns:
True if the server is up, False otherwise.
"""
wait_t = 1
# Log something to the console.
logger.log(f"Waiting {wait_t}s to read server state...", level=logger.DEBUG)
# Wait for sometime.
time.sleep(wait_t)
if not self.server_up:
# Log failure message and stop the application.
logger.log(start_failure_msg, level=logger.ERROR)
# Stop the app.
self.stop()
# Return failure flag.
return False
return True
[docs]
def start_https_redirect_if_needed(self, start_failure_msg: str) -> bool:
"""
Starts the HTTPS redirect server if configured.
Args:
start_failure_msg: Fallback message used if the redirect server fails.
Returns:
True if the redirect server started (or was not needed), False on failure.
"""
if not self.https_redirect:
return True
# Start the HTTPS redirect server.
self.start_https_redirect_server()
# Wait for sometime.
time.sleep(1)
if not self.https_redirect_server_up:
# Log a failure message and stop the application.
logger.log(start_failure_msg, level=logger.ERROR)
# Stop the app
self.stop()
# Return failure flag.
return False
return True
[docs]
def start_django_if_needed(self) -> bool:
"""
Starts the Django server and waits for it to become responsive.
Runs any configured startup commands, then waits the configured grace
period before checking Django's health. Logs success details on a clean
start or an error and stops the app on failure.
Returns:
True if Django started successfully (or is not in use), False otherwise.
"""
from duck.backend.django import bridge
if not self.use_django:
return True
logger.log(
"Requests will be forwarded to Django server",
level=logger.DEBUG,
)
logger.log(
f"Starting Django server on port [{self.django_bind_port}]",
level=logger.DEBUG,
)
if SETTINGS["DJANGO_COMMANDS_ON_STARTUP"]:
# Start Django startup commands.
try:
logger.log_raw("\n")
bridge.run_django_app_commands()
except Exception as e:
logger.log(f"Failed to run django commands: {e}\n", level=logger.ERROR)
logger.log_exception(e)
self.stop()
return False
# Wait for Django server to start
wait_t = self.django_server_wait_time
# Log something.
logger.log(
f"Waiting for Django server to start ({wait_t} secs)\n",
level=logger.DEBUG,
)
# Start the django server.
self.start_django_server()
# Wait for some time.
time.sleep(wait_t)
if not self.django_server_up:
# Log a failure message.
logger.log(
f"Failed to get response from Django server [{wait_t} secs]",
level=logger.ERROR,
)
# Stop the application - usally kills the whole process.
self.stop()
# Return failure flag
return False
# Resolve the host URL for logging
host_url = "http://" if not self.server.enable_ssl else "https://"
host, port = self.server.addr
# Create Django host URL
if host.startswith("0") and not self.uses_ipv6:
host = "127.0.0.1"
elif self.uses_ipv6:
host = f"[{host}]"
# Add host and port to URL
host_url += f"{host}:{port}"
# Log something to the console.
logger.log(
"Django started yey, that's good!",
level=logger.DEBUG,
custom_color=logger.Fore.GREEN,
)
logger.log(
f"Duck Server listening on {host_url}",
level=logger.DEBUG,
custom_color=logger.Fore.GREEN,
)
return True
[docs]
def run(self, print_ansi_art: bool = True):
"""
Runs the Duck application.
"""
from duck.setup import setup
if not self.skip_setup:
# Setup Duck environment and the entire application.
setup()
# Record application metadata and run the server
self.record_metadata()
# Start runtime executor
self.runtime_executor.start(max_workers=5)
# Run the main application
return self._run(print_ansi_art=print_ansi_art)
[docs]
def _run(self, print_ansi_art: bool = True):
"""
Runs the Duck application.
Tunes the thread pool, starts background workers, launches the main
server, and conditionally starts HTTPS redirect and Django servers.
Calls the on_app_start handler once everything is verified as running.
Args:
print_ansi_art: Whether to display the Duck ASCII art on startup.
"""
from duck.contrib.sync.smart_async import _TRANSACTION_THREAD_POOL
from duck.art import display_duck_art
# Tune threadpool size relative to configured workers
default_workers = _TRANSACTION_THREAD_POOL.max_threads
_TRANSACTION_THREAD_POOL.max_threads = (
(default_workers * self.workers) if self.workers else default_workers
)
# Some values.
bold_start = "\033[1m"
bold_end = "\033[0m"
start_failure_msg = f"{bold_start}Failed to start Duck server{bold_end}"
settings_mod = os.environ.get("DUCK_SETTINGS_MODULE", "settings")
# Handle reload state — skip art and extra setup if restarting
is_reload = "--is-reload" in sys.argv
if is_reload:
logger.log_raw("")
if not is_reload and print_ansi_art and not SETTINGS["SILENT"]:
display_duck_art()
if SETTINGS["LOG_TO_FILE"]:
logger.Logger.redirect_console_output()
# Log the active settings module
logger.log_raw(f'{bold_start}USING SETTINGS{bold_end} "{settings_mod}" \n')
# Log warnings and start event loop.
self.log_startup_warnings()
self.start_background_event_loop()
if SETTINGS['ENABLE_COMPONENT_SYSTEM']:
self.log_component_system_warnings()
if self.run_automations:
self.start_automations_dispatcher()
# Boot main server and verify it came up
self.start_server()
if not self.wait_for_main_server(start_failure_msg):
return
if not self.start_https_redirect_if_needed("HTTPS redirect app failed to start"):
return
if not self.start_django_if_needed():
return
# Call on app start callable.
self._on_app_start()
[docs]
def _on_app_start(self):
"""
Internal method called when application has successfully been started.
"""
from duck import processes
# Record main process data
logfile = None
if SETTINGS["LOG_TO_FILE"]:
logfile = logger.Logger.get_current_logfile()
try:
# Set process metadata in a file.
processes.set_process_data(
name="main",
data={
"pid": self.process_id,
"sys_argv": sys.argv,
"log_file": logfile,
"start_time": time.time(),
},
clear_existing_data=True,
)
except json.JSONDecodeError:
# The file used by duck.processes is corrupted
pass # Ignore for now, maybe writting is still in progress
if not self.disable_signal_handler:
# Bind signals to appropriate signal handlers
self.register_signals()
# Log something to the console.
logger.log(
f"Use Ctrl-C to quit [APP PID: {self.process_id}]",
level=logger.DEBUG,
custom_color=logger.Fore.GREEN,
)
if SETTINGS["AUTO_RELOAD"] and SETTINGS["DEBUG"]:
# Start duck sight reloader (if not running)
self.start_ducksight_reloader()
# Log something to the console.
logger.log(
"Duck Sight Reloader watching file changes",
level=logger.DEBUG,
custom_color=logger.Fore.GREEN,
)
# Log a success message
logger.log(
"Waiting for incoming requests :-) \n",
level=logger.DEBUG,
custom_color=logger.Fore.GREEN,
)
# Run the super method for dispatching on_start event.
super()._on_app_start()
if not self.disable_ipc_handler:
# Handle any incoming IPC messages.
# This is a blocking operation, it prevents app from exiting.
self.handle_ipc_messages()
if __name__ == "__main__":
multiprocessing.freeze_support()