CLI, Security & Extensions
This guide covers the command-line interface (CLI), the bootstep mechanism for extending worker functionality, and message signing security.
Command-Line Interface
The Celery CLI is built using click and provides a unified entry point for all Celery commands.
Global vs. Subcommand Options
In Celery 5.0+, global options (like -A for the app instance) must be placed before the subcommand. Using them after the subcommand will result in an error.
# Correct usage
celery -A proj worker -l INFO
# Incorrect usage (will trigger a custom error message)
celery worker -A proj -l INFO
The CLI entry point is defined in celery/bin/celery.py. It supports several global options:
-A,--app: The Celery app instance to use.-b,--broker: Broker URL.--result-backend: Result backend URL.--workdir: Directory to change to at startup.-C,--no-color: Disable colorized output.
Starting a Worker
The worker subcommand (implemented in celery/bin/worker.py) is used to start a worker instance.
# Start a worker with 4 concurrent processes
celery -A proj worker --concurrency=4
# Start a worker using the eventlet pool
celery -A proj worker -P eventlet -c 1000
# Start a worker listening to specific queues
celery -A proj worker -Q hipri,lopri
Common worker options include:
-n,--hostname: Set a custom node name.-c,--concurrency: Number of child processes or threads.-P,--pool: Pool implementation (prefork,eventlet,gevent,solo).-l,--loglevel: Logging level (DEBUG,INFO,WARNING,ERROR,CRITICAL).-B,--beat: Also run thecelery beatperiodic task scheduler (Note: Not recommended for production).
Extending the Worker with Bootsteps
Bootsteps allow you to hook into the worker lifecycle to add custom components or services. The worker is composed of several default bootsteps defined in celery/worker/components.py.
Creating a Custom Bootstep
To create a bootstep, subclass celery.bootsteps.Step or celery.bootsteps.StartStopStep.
from celery import bootsteps
class MyCustomStep(bootsteps.StartStopStep):
requires = ('celery.worker.components:Pool',)
def __init__(self, parent, **kwargs):
# parent is the WorkController (Worker) instance
print(f'Initializing custom step for {parent.hostname}')
def start(self, parent):
# Logic to run when the worker starts
print('Custom step started')
def stop(self, parent):
# Logic to run when the worker stops
print('Custom step stopped')
Registering Bootsteps
You can register your custom bootstep by adding it to the app.steps['worker'] set.
from celery import Celery
app = Celery('myapp')
app.steps['worker'].add(MyCustomStep)
Conditional Inclusion
Use include_if to decide whether a step should be loaded based on the worker configuration.
class OptionalStep(bootsteps.Step):
def include_if(self, parent):
return parent.app.conf.my_custom_setting
def create(self, parent):
# This is called if include_if returns True
return self
Message Security
Celery provides a message signing system to ensure that tasks and results have not been tampered with. This is implemented via the auth serializer in celery/security/.
Configuring Message Signing
To enable message signing, you must provide a private key, a certificate, and a directory containing trusted certificates.
from celery import Celery
app = Celery('secure_app')
app.conf.update(
security_key='ssl/worker.key',
security_certificate='ssl/worker.pem',
security_cert_store='ssl/*.pem',
task_serializer='auth',
event_serializer='auth',
accept_content=['auth'],
result_accept_content=['json']
)
# Initialize the security system
app.setup_security()
Requirements and Constraints
- Cryptography Library: The
cryptographylibrary must be installed. - Serializer Configuration: You must set
task_serializerto'auth'and include'auth'inaccept_content. - Certificate Store: The
security_cert_storeshould be a glob pattern (e.g.,/var/ssl/*.pem) pointing to the public certificates of all trusted clients/workers. - Insecure Serializers: Calling
setup_security()automatically disables insecure serializers (likepickle) unless they are explicitly allowed.
Troubleshooting Security
If the security settings are incomplete, app.setup_security() will raise an ImproperlyConfigured exception. Ensure that all three paths (security_key, security_certificate, and security_cert_store) are valid and accessible.
# Example of a common error if settings are missing
# celery.exceptions.ImproperlyConfigured: You have to configure a special task serializer...