duck.utils.multiprocessing.process_manager

WorkerProcessManager manages and monitors a pool of worker processes.

Features:

  • Automatic restart of dead or unhealthy workers.

  • Customizable health-check hooks per worker.

  • Threaded non-blocking monitoring loop.

  • Configurable logging and verbosity.

  • Status inspection and graceful shutdown.

Example use cases:

  • WSGI/ASGI server worker orchestration.

  • ML/AI multi-process task runner watchdog.

  • Long-running web backend with process self-repair.

Usage Example:

def sample_worker(idx, *args):
    import time, random
    print(f"Worker {idx} started...")

    while True:
        time.sleep(1)
        print(f"[Worker {idx}] Sleeping")

        # Simulate random crash; restart will occur
        if random.random() < 0.03:
            print(f"[Worker {idx}] Simulating crash")
            exit(1)

def health_check_fn(proc, idx):
    # Returns True if alive; override for custom checks
    return proc.is_alive()

manager = WorkerProcessManager(
    worker_fn=sample_worker,
    num_workers=4,
    args_fn=lambda idx: (...),
    worker_name_fn=lambda idx: f"duck-worker-{idx}",
    health_check_fn=health_check_fn, # Or use HeartbeatHeathCheck object.
    restart_timeout=2,
    enable_logs=True, verbose_logs=False,
    enable_monitoring=True,
    process_stop_timeout=3,
)

try:
    manager.start()
    for _ in range(20):  # Monitor for a while
        print("Worker status:", manager.status())
        time.sleep(2)
finally:
    manager.stop()

Module Contents

Classes

HeartbeatHealthCheck

Process Health Check using heartbeat approach.

WorkerProcessManager

WorkerProcessManager manages and monitors a pool of worker processes.

API

class duck.utils.multiprocessing.process_manager.HeartbeatHealthCheck(heartbeat_timeout: float)[source]

Process Health Check using heartbeat approach.

Example:

healthcheck = HeartbeatHealthCheck(...)

def worker_fn(idx, healthcheck, ...):
    while True:
        healthcheck.update_heartbeat(idx)
        # Some tasks here
        ...

Initialization

Initialize heartbeat health check.

__call__(process: multiprocessing.Process, idx: int) bool[source]

Checks if last heartbeat hasn’t reached a timeout. This may indicate an unhealthy process.

Returns:

True if last heartbeat hasn’t reached a timeout else False.

Return type:

bool

Raises:

HeartbeatUpdateNeverCalled

Raised if no heartbeat update has never been updated. This avoids mistakenly using this approach but not upating heartbeats by calling update_heartbeat. In a process loop, heartbeat update must be called initialialy before handling any tasks.

Example:

healthcheck = HeartbeatHealthCheck(...)

manager = WorkerProcessManager(
    health_check_fn=healthcheck,
    ...
)

def worker_fn(idx, healthcheck, ...):
    while True:
        healthcheck.update_heartbeat(idx)
        # Some tasks here
        ...

check_health(process: multiprocessing.Process, idx: int) bool[source]

Checks if last heartbeat hasn’t reached a timeout. This may indicate an unhealthy process.

Returns:

True if last heartbeat hasn’t reached a timeout else False.

Return type:

bool

Raises:

HeartbeatUpdateNeverCalled

Raised if no heartbeat update has never been called. This avoids mistakenly using this approach but not upating heartbeats by calling update_heartbeat. In a process loop, heartbeat update must be called initialialy before handling any tasks.

Example:

healthcheck = HeartbeatHealthCheck(...)

def worker_fn(idx, healthcheck, ...):
    while True:
        healthcheck.update_heartbeat(idx)
        # Some tasks here
        ...

update_heartbeat(idx: int)[source]

Update last heartbeat.

Parameters:

idx – Index of the process, usually provided to worker_fn.

Raises:

RuntimeError – If the function is called in main process or not in a child process.

exception duck.utils.multiprocessing.process_manager.HeartbeatUpdateNeverCalled[source]

Bases: Exception

Raised by HeartbeatHealthCheck.check_health if heartbeats are empty.

Initialization

Initialize self. See help(type(self)) for accurate signature.

class duck.utils.multiprocessing.process_manager.WorkerProcessManager(worker_fn: Callable, num_workers: int, args_fn: Optional[Callable[[int], tuple]] = None, worker_name_fn: Optional[Callable[[int], str]] = None, health_check_fn: Optional[Union[Callable[[multiprocessing.Process], bool], duck.utils.multiprocessing.process_manager.HeartbeatHealthCheck]] = None, restart_timeout: Union[int, float] = 5, enable_logs: bool = True, verbose_logs: bool = True, enable_monitoring: bool = True, process_stop_timeout: Optional[float] = 5.0, daemon: bool = False)[source]

WorkerProcessManager manages and monitors a pool of worker processes.

Features:

  • Automatic restart of dead or unhealthy workers.

  • Customizable health-check hooks per worker.

  • Threaded non-blocking monitoring loop.

  • Configurable logging and verbosity.

  • Status inspection and graceful shutdown.

Example use cases:

  • WSGI/ASGI server worker orchestration.

  • ML/AI multi-process task runner watchdog.

  • Long-running web backend with process self-repair.

Initialization

Parameters:
  • worker_fn – Function executed by each worker process.

  • num_workers – Number of worker processes to spawn.

  • args_fn – Callable (idx) => tuple for args per worker.

  • worker_name_fn – Callable (idx) => str; worker process name.

  • health_check_fn – Callable (Process) => bool: Function to check health; must return True if worker healthy, False otherwise. You can just supply HeartbeatHealthCheck object instead to use heartbeat health check.

  • restart_timeout – Seconds to wait before restart on process death.

  • enable_logs – Enable info/warning logging.

  • verbose_logs – Enable full exception trace logs.

  • enable_monitoring – Start monitor thread automatically.

  • process_stop_timeout – Maximum seconds to wait for worker to stop. Will be parsed to join() method.

  • daemon – Whether to start daemon processes. Defaults to False.

_monitor_loop()[source]

Monitor thread: checks worker health/liveness and restarts unhealthy/dead workers. Non-blocking for main thread.

_restart_worker(idx: int)[source]

Restart a worker process by index.

running_pids()[source]

Returns a list of PIDs for currently alive worker processes.

start()[source]

Start worker processes and non-blocking monitor loop.

status()[source]

Returns status list for all worker processes. Each dict contains (name, pid, alive).

stop(graceful: bool = True, wait: bool = True, monitor_stop_timeout: float = 0.5, no_logging: bool = False)[source]

Stop all worker processes and monitoring thread.

Parameters:
  • graceful – Use terminate() for workers (soft shutdown).

  • wait – Whether to wait for processes to finish stopping. Defaults to True.

  • monitor_stop_timeout – Timeout for waiting on monitor thread.

  • no_logging – Whether to log stop message. Use this to temporarily disable logging of stop message.