Remote Control & Inspection
Remote control and inspection in this project are handled through a broadcast system that uses a fanout exchange (the "pidbox") to communicate with workers. The primary interface for these operations is the Control class, accessible via app.control.
Managing Workers Remotely
You can send commands to workers to change their behavior at runtime using app.control. These commands are broadcast to all workers by default, but can be targeted to specific nodes.
from celery import Celery
app = Celery('myapp')
# Ping all workers to check availability
replies = app.control.ping(timeout=2.0)
# Returns: [{'worker1@hostname': {'ok': 'pong'}}, ...]
# Increase the pool size for a specific worker
app.control.pool_grow(n=2, destination=['worker1@hostname'])
# Set a new rate limit for a specific task type across all workers
app.control.rate_limit('tasks.add', '10/m')
Common Control Commands
The Control class in celery/app/control.py provides several methods for managing worker state:
pool_grow(n=1, destination=None): Increase the number of pool worker processes byn.pool_shrink(n=1, destination=None): Decrease the number of pool worker processes byn.add_consumer(queue, exchange=None, ...): Tell workers to start consuming from a new queue.cancel_consumer(queue, destination=None): Tell workers to stop consuming from a specific queue.time_limit(task_name, soft, hard): Update time limits for a task type at runtime.
Inspecting Worker State
The Inspect class provides a read-only API to query workers for their current state, such as active tasks, statistics, and registered task types. You access it via app.control.inspect().
# Create an inspection client
inspector = app.control.inspect()
# Get statistics from all workers
stats = inspector.stats()
# Get currently executing tasks
active_tasks = inspector.active()
# Get tasks currently in the reserved queue (waiting to be executed)
reserved_tasks = inspector.reserved()
# List all registered task types on each worker
registered = inspector.registered()
Handling Inspection Results
Inspection methods return a dictionary keyed by the worker node name. If you target a single destination, the result is still a dictionary unless you use the destination argument in the Inspect constructor.
# Targeting a specific node
node_stats = app.control.inspect(destination=['worker1@hostname']).stats()
# Returns: {'worker1@hostname': {... stats ...}}
The flatten_reply utility in celery/app/control.py is used internally to merge multiple responses into a single mapping.
Revoking and Terminating Tasks
Revoking a task prevents it from being executed if it hasn't started yet. If the task is already running, you must explicitly request termination.
Revoking by Task ID
You can revoke tasks using the app.control.revoke method or directly from an AsyncResult object.
# Revoke a task by ID
app.control.revoke('16f527de-1c72-47a6-b477-c472b92fef7a')
# Terminate a task that is already running
app.control.revoke('16f527de-1c72-47a6-b477-c472b92fef7a', terminate=True, signal='SIGKILL')
# Shortcut for termination
app.control.terminate('16f527de-1c72-47a6-b477-c472b92fef7a')
Revoking by Stamped Headers
This project supports revoking tasks based on custom metadata (stamps) attached to the task headers.
# Revoke all tasks that have a specific header value
headers = {'request_id': '12345'}
app.control.revoke_by_stamped_headers(headers, terminate=True)
This method first broadcasts a request to find matching task IDs on workers and then revokes those specific IDs.
Configuration
The control system behavior is governed by several configuration options in the Celery app:
| Setting | Default | Description |
|---|---|---|
control_exchange | 'celery' | Name of the fanout exchange for control commands. |
control_queue_ttl | 300.0 | Time-to-live for control queues in seconds. |
control_queue_expires | 10.0 | Expiry time for control queues. |
control_queue_exclusive | True | Whether control queues are exclusive to the worker. |
control_queue_durable | False | Whether control queues should survive broker restarts. |
Important Constraints
In celery/app/control.py, there is a strict check on queue durability:
if (app.conf.control_queue_durable and
app.conf.control_queue_exclusive):
raise ImproperlyConfigured(
"control_queue_durable and control_queue_exclusive cannot both be True "
"(exclusive queues are automatically deleted and cannot be durable).",
)
If you need durable control queues, you must set control_queue_exclusive to False.
Troubleshooting
Duplicate Node Names
If multiple workers share the same node name, the control system will receive multiple replies for the same key. The flatten_reply function will issue a DuplicateNodenameWarning in this case. Always ensure workers are started with unique names using the -n flag:
celery -A myapp worker -n worker1@%h
Missing External Dependencies
Some inspection commands require additional libraries:
memsampleandmemdumprequirepsutil.objgraphrequires theobjgraphlibrary.
If these are missing, the worker will return an error in the reply dictionary for that specific command.