Managing Tasks and Queues Remotely
To manage workers and tasks remotely without restarting the service, use the Control interface provided by app.control. This API allows you to broadcast commands to all workers or specific nodes to revoke tasks, manage consumer queues, and adjust worker resources dynamically.
Revoking and Terminating Tasks
You can revoke tasks to prevent them from executing. If a task is already running, you must explicitly terminate it.
from proj.celery import app
# Revoke a task by ID (prevents it from starting)
app.control.revoke('d85ad3d0-64ad-4791-aa05-39426560256a')
# Terminate a task that is currently executing
app.control.terminate('d85ad3d0-64ad-4791-aa05-39426560256a', signal='SIGKILL')
# Revoke multiple tasks at once
app.control.revoke(['id1', 'id2', 'id3'], terminate=True)
You can also revoke tasks directly from the AsyncResult object returned when the task was delayed:
result = add.delay(2, 2)
result.revoke(terminate=True, signal='SIGKILL')
For advanced use cases, Control.revoke_by_stamped_headers allows revoking tasks based on custom headers:
# Revoke tasks matching specific stamped headers
app.control.revoke_by_stamped_headers(
headers={'priority': 'low', 'user': 'guest'},
terminate=True
)
Managing Consumer Queues Dynamically
You can tell workers to start or stop consuming from specific queues at runtime using add_consumer and cancel_consumer. This is useful for rebalancing workloads without a restart.
# Tell specific workers to start consuming from a new queue
app.control.add_consumer(
queue='priority_high',
exchange='tasks',
exchange_type='topic',
routing_key='high.#',
destination=['worker1@example.com']
)
# Stop consuming from a queue
app.control.cancel_consumer('priority_high', destination=['worker1@example.com'])
Adjusting Worker Resources
The Control API provides methods to modify worker limits and pool sizes on the fly.
# Change the rate limit for a specific task type
app.control.rate_limit('tasks.add', '10/m')
# Adjust the execution pool size
app.control.pool_grow(n=2, destination=['worker1@example.com'])
app.control.pool_shrink(n=1)
# Change autoscale settings
app.control.autoscale(max=10, min=2)
# Set new time limits for a task
app.control.time_limit('tasks.add', soft=30, hard=60)
Inspecting Worker State
Use app.control.inspect() to query the current state of the cluster. Unlike Control methods, Inspect methods are read-only and return the results from the workers.
inspector = app.control.inspect()
# Get active tasks across all workers
active_tasks = inspector.active()
# Get worker statistics (uptime, PID, pool info)
stats = inspector.stats()
# Check which queues each worker is consuming from
queues = inspector.active_queues()
# Query specific tasks by ID
task_info = inspector.query_task('d85ad3d0-64ad-4791-aa05-39426560256a')
Troubleshooting and Best Practices
- Transport Support: Remote control commands require a broker that supports fanout exchanges (e.g., RabbitMQ or Redis).
- Termination: Simply calling
revoke()will not stop a task that has already started. You must passterminate=True. - Queue Configuration: The
add_consumermethod does not use the default queue/exchange settings from your configuration. You must provideexchange,exchange_type, and other options explicitly if they differ from the queue name. - Configuration Conflict: In your Celery configuration,
control_queue_durableandcontrol_queue_exclusivecannot both beTrue. Exclusive queues are automatically deleted and cannot be durable. - Purging Tasks: To remove all waiting tasks from all queues, use
app.control.purge()(also aliased asdiscard_all()):count = app.control.purge()
print(f"Discarded {count} tasks.")