Extending Celery with Bootsteps
The Celery worker and consumer are built using a modular system called Bootsteps. This system allows you to extend Celery's internal components by adding your own services, message consumers, or initialization logic.
In this tutorial, you will build a custom worker extension that tracks worker initialization and a specialized consumer that listens for custom control messages.
Prerequisites
To follow this tutorial, you need a configured Celery application.
from celery import Celery
app = Celery('my_app', broker='redis://localhost:6379/0')
Step 1: Creating a Basic Initialization Step
The simplest way to extend the worker is by subclassing Step. This is useful for setting up attributes on the worker instance before it starts.
from celery import bootsteps
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
class WorkerMetadata(bootsteps.Step):
"""A bootstep that adds custom metadata to the worker."""
label = 'Metadata'
def __init__(self, parent, **kwargs):
# parent is the WorkController (the worker instance)
print(f'Initializing {self.label} for {parent.hostname}')
def create(self, parent):
# This is called when the worker is being built
parent.custom_metadata = {'version': '1.0.0', 'env': 'production'}
logger.info('Worker metadata initialized')
In this step:
labelprovides a short name used in logs and graph outputs.__init__is called when the step is bound to the worker.createis where you perform the actual work. Theparentargument is theWorkController(the worker object).
Step 2: Managing Lifecycle with StartStopStep
If your extension needs to start and stop a service (like a background thread or a database connection), use StartStopStep. This class automatically calls start() and stop() on the object returned by create().
class MyService:
def start(self):
print("Background service started.")
def stop(self):
print("Background service stopped.")
class CustomServiceStep(bootsteps.StartStopStep):
"""A bootstep that manages a background service."""
requires = (WorkerMetadata,) # Ensure Metadata is initialized first
def create(self, worker):
# The object returned here is stored in self.obj
return MyService()
def start(self, worker):
# StartStopStep calls self.obj.start() by default
return super().start(worker)
def stop(self, worker):
# StartStopStep calls self.obj.stop() by default
return super().stop(worker)
Key features used here:
- requires: A tuple of other steps that must be initialized before this one. Celery uses a
DependencyGraphto resolve the order. - self.obj:
StartStopStepautomatically assigns the result ofcreate()toself.objand delegatesstart()andstop()calls to it.
Step 3: Adding a Custom Message Consumer
To listen to a specific queue independently of the standard task consumer, use ConsumerStep. This is ideal for building custom control planes or monitoring tools.
from kombu import Consumer, Queue
from celery import bootsteps
class MyConsumerStep(bootsteps.ConsumerStep):
"""A bootstep that adds a custom message consumer."""
def get_consumers(self, channel):
return [
Consumer(channel,
queues=[Queue('custom_control')],
callbacks=[self.on_message],
accept=['json'])
]
def on_message(self, body, message):
print(f'Received custom message: {body}')
message.ack()
ConsumerStep handles the boilerplate of creating a kombu consumer. You only need to implement get_consumers, which returns a list of kombu.Consumer objects.
Step 4: Registering Steps with the Application
To activate your bootsteps, you must add them to the worker or consumer blueprints.
# Add worker-level steps
app.steps['worker'].add(WorkerMetadata)
app.steps['worker'].add(CustomServiceStep)
# Add consumer-level steps
app.steps['consumer'].add(MyConsumerStep)
app.steps['worker']targets theWorkController, which manages the pool, timer, and overall worker process.app.steps['consumer']targets theConsumercomponent, which manages the connection to the broker and message processing.
Step 5: Verifying the Result
When you start your worker, you will see your custom steps in the initialization logs.
celery -A my_app worker -l info
You can also programmatically inspect the blueprint to see the execution order. Celery uses the StepFormatter to visualize the dependency graph.
from celery.bootsteps import Blueprint
# Access the worker blueprint
worker_blueprint = app.steps['worker']
print(f"Worker steps: {[step.name for step in worker_blueprint]}")
Advanced: Conditional Inclusion
You can control whether a step is included based on the worker's configuration using include_if.
class ConditionalStep(bootsteps.Step):
def include_if(self, worker):
# Only include if a specific setting is present
return worker.app.conf.get('my_custom_setting', False)
The Blueprint calls include_if for every step during the apply phase. If it returns False, the step is skipped entirely.
Summary
By using Step, StartStopStep, and ConsumerStep, you have:
- Initialized custom metadata on the worker.
- Created a service with a managed lifecycle.
- Attached a custom message consumer to a specific queue.
- Integrated these components into the Celery worker's dependency graph.
Next, you can explore celery.worker.components to see how Celery implements its own core features like the Timer, Hub, and Pool using these same classes.