Skip to main content

Remote Control & Inspection

Remote control and inspection in this project are handled through a broadcast system that uses a fanout exchange (the "pidbox") to communicate with workers. The primary interface for these operations is the Control class, accessible via app.control.

Managing Workers Remotely

You can send commands to workers to change their behavior at runtime using app.control. These commands are broadcast to all workers by default, but can be targeted to specific nodes.

from celery import Celery

app = Celery('myapp')

# Ping all workers to check availability
replies = app.control.ping(timeout=2.0)
# Returns: [{'worker1@hostname': {'ok': 'pong'}}, ...]

# Increase the pool size for a specific worker
app.control.pool_grow(n=2, destination=['worker1@hostname'])

# Set a new rate limit for a specific task type across all workers
app.control.rate_limit('tasks.add', '10/m')

Common Control Commands

The Control class in celery/app/control.py provides several methods for managing worker state:

  • pool_grow(n=1, destination=None): Increase the number of pool worker processes by n.
  • pool_shrink(n=1, destination=None): Decrease the number of pool worker processes by n.
  • add_consumer(queue, exchange=None, ...): Tell workers to start consuming from a new queue.
  • cancel_consumer(queue, destination=None): Tell workers to stop consuming from a specific queue.
  • time_limit(task_name, soft, hard): Update time limits for a task type at runtime.

Inspecting Worker State

The Inspect class provides a read-only API to query workers for their current state, such as active tasks, statistics, and registered task types. You access it via app.control.inspect().

# Create an inspection client
inspector = app.control.inspect()

# Get statistics from all workers
stats = inspector.stats()

# Get currently executing tasks
active_tasks = inspector.active()

# Get tasks currently in the reserved queue (waiting to be executed)
reserved_tasks = inspector.reserved()

# List all registered task types on each worker
registered = inspector.registered()

Handling Inspection Results

Inspection methods return a dictionary keyed by the worker node name. If you target a single destination, the result is still a dictionary unless you use the destination argument in the Inspect constructor.

# Targeting a specific node
node_stats = app.control.inspect(destination=['worker1@hostname']).stats()
# Returns: {'worker1@hostname': {... stats ...}}

The flatten_reply utility in celery/app/control.py is used internally to merge multiple responses into a single mapping.

Revoking and Terminating Tasks

Revoking a task prevents it from being executed if it hasn't started yet. If the task is already running, you must explicitly request termination.

Revoking by Task ID

You can revoke tasks using the app.control.revoke method or directly from an AsyncResult object.

# Revoke a task by ID
app.control.revoke('16f527de-1c72-47a6-b477-c472b92fef7a')

# Terminate a task that is already running
app.control.revoke('16f527de-1c72-47a6-b477-c472b92fef7a', terminate=True, signal='SIGKILL')

# Shortcut for termination
app.control.terminate('16f527de-1c72-47a6-b477-c472b92fef7a')

Revoking by Stamped Headers

This project supports revoking tasks based on custom metadata (stamps) attached to the task headers.

# Revoke all tasks that have a specific header value
headers = {'request_id': '12345'}
app.control.revoke_by_stamped_headers(headers, terminate=True)

This method first broadcasts a request to find matching task IDs on workers and then revokes those specific IDs.

Configuration

The control system behavior is governed by several configuration options in the Celery app:

SettingDefaultDescription
control_exchange'celery'Name of the fanout exchange for control commands.
control_queue_ttl300.0Time-to-live for control queues in seconds.
control_queue_expires10.0Expiry time for control queues.
control_queue_exclusiveTrueWhether control queues are exclusive to the worker.
control_queue_durableFalseWhether control queues should survive broker restarts.

Important Constraints

In celery/app/control.py, there is a strict check on queue durability:

if (app.conf.control_queue_durable and
app.conf.control_queue_exclusive):
raise ImproperlyConfigured(
"control_queue_durable and control_queue_exclusive cannot both be True "
"(exclusive queues are automatically deleted and cannot be durable).",
)

If you need durable control queues, you must set control_queue_exclusive to False.

Troubleshooting

Duplicate Node Names

If multiple workers share the same node name, the control system will receive multiple replies for the same key. The flatten_reply function will issue a DuplicateNodenameWarning in this case. Always ensure workers are started with unique names using the -n flag:

celery -A myapp worker -n worker1@%h

Missing External Dependencies

Some inspection commands require additional libraries:

  • memsample and memdump require psutil.
  • objgraph requires the objgraph library.

If these are missing, the worker will return an error in the reply dictionary for that specific command.