Worker Lifecycle and Controller
The worker lifecycle in this codebase is managed by the WorkController class, which serves as the core engine for a Celery worker instance. It orchestrates the initialization, execution, and shutdown of various worker components through a modular system of bootsteps defined in a Blueprint.
The WorkController Engine
The WorkController (found in celery/worker/worker.py) is described as an "unmanaged worker instance." While it contains all the logic necessary to run a worker, it is typically wrapped by a "program" class like celery.apps.worker.Worker which adds external concerns such as signal handling (SIGTERM, SIGINT) and CLI logging setup.
Core Components and the Blueprint
The WorkController uses an internal Blueprint class to define the components that make up the worker. This blueprint specifies the execution order and dependencies of the worker's sub-systems.
class Blueprint(bootsteps.Blueprint):
"""Worker bootstep blueprint."""
name = 'Worker'
default_steps = {
'celery.worker.components:Hub',
'celery.worker.components:Pool',
'celery.worker.components:Beat',
'celery.worker.components:Timer',
'celery.worker.components:StateDB',
'celery.worker.components:Consumer',
'celery.worker.autoscale:WorkerComponent',
}
These components represent the functional units of the worker:
- Hub: The event loop (using
kombu.asynchronous.Hub). - Pool: The execution pool (prefork, threads, gevent, etc.).
- Consumer: The component that consumes messages from the broker.
- Timer: Manages scheduled internal events.
Initialization Flow
When a WorkController is instantiated, it follows a strict sequence to prepare the environment:
- Loader Initialization: Calls
self.app.loader.init_worker()to prepare the application loader. - Default Configuration: The
setup_defaultsmethod resolves configuration settings usingself.app.either, which prioritizes passed arguments over application settings (e.g.,worker_concurrency,worker_pool). - Instance Setup: The
setup_instancemethod performs the heavy lifting:- Configures queues via
setup_queues. - Imports task modules via
setup_includes. - Determines if the event loop should be used via
should_use_eventloop. - Sends the
signals.worker_initsignal. - Initializes the
Blueprintwith steps fromself.app.steps['worker']. - Applies the blueprint using
self.blueprint.apply(self).
- Configures queues via
def setup_instance(self, ...):
# ... (queue and include setup)
self.on_init_blueprint()
self.blueprint = self.Blueprint(
steps=self.app.steps['worker'],
on_start=self.on_start,
on_close=self.on_close,
on_stopped=self.on_stopped,
)
self.blueprint.apply(self, **kwargs)
Startup and Execution
The worker starts when the start() method is called. This method delegates the actual startup to the blueprint:
def start(self):
try:
self.blueprint.start(self)
except WorkerTerminate:
self.terminate()
except Exception as exc:
logger.critical('Unrecoverable error: %r', exc, exc_info=True)
self.stop(exitcode=EX_FAILURE)
# ... (handling SystemExit and KeyboardInterrupt)
The blueprint.start(self) call triggers the start method of every bootstep defined in the blueprint, effectively bringing up the pool, the consumer, and other services in the correct order.
Shutdown Mechanisms
The WorkController supports different levels of shutdown, ranging from graceful to forced.
Warm Shutdown (Graceful)
Triggered by stop(), a warm shutdown attempts to close the consumer first so no new tasks are accepted, then shuts down the rest of the components.
def stop(self, in_sighandler=False, exitcode=None):
"""Graceful shutdown of the worker server (Warm shutdown)."""
if exitcode is not None:
self.exitcode = exitcode
if self.blueprint.state == RUN:
self.signal_consumer_close()
if not in_sighandler or self.pool.signal_safe:
self._shutdown(warm=True)
self._send_worker_shutdown()
Cold Shutdown (Forced)
Triggered by terminate(), this bypasses some of the graceful waiting periods. Both stop and terminate eventually call the internal _shutdown method, which uses the blueprint to stop all steps.
def _shutdown(self, warm=True):
if self.blueprint is not None:
with default_socket_timeout(SHUTDOWN_SOCKET_TIMEOUT):
self.blueprint.stop(self, terminate=not warm)
self.blueprint.join()
Soft Shutdown Timeout
If worker_soft_shutdown_timeout is configured and there are active requests, the worker can be made to wait before initiating a cold shutdown. This is handled in wait_for_soft_shutdown, which checks state.active_requests before sleeping for the configured duration.
Integration with the Worker Program
While WorkController manages the internal mechanics, the Worker class in celery/apps/worker.py provides the operational wrapper. It adds:
- Signal Handling: Installs handlers for
TERM,INT, andHUP. - Platform Tweaks: Calls
install_platform_tweaks(e.g., for handling specific OS behaviors). - Logging: Sets up the worker's logging system.
- Optimization: Calls
trace.setup_worker_optimizationsduringon_before_init.
This separation allows the WorkController to remain a pure engine that can be instantiated in various environments (like unit tests in t/unit/worker/test_worker.py) without necessarily triggering global side effects like signal hijacking.