Skip to main content

Your First Remote Control Command

In this tutorial, you will build a management script that uses the Control class to verify worker health across your network. By the end of this guide, you will be able to programmatically check which workers are online and responsive using Celery's remote control system.

Prerequisites

To follow this tutorial, you need:

  • A running message broker that supports fanout (such as RabbitMQ or Redis).
  • Celery installed in your environment.
  • At least one active Celery worker running.

Step 1: Initialize the Celery Application

First, you must create a Celery application instance configured to connect to your broker. This instance provides the entry point for all control commands.

from celery import Celery

app = Celery(
'management_app',
broker='pyamqp://guest@localhost//'
)

The app object contains a control attribute, which is an instance of celery.app.control.Control. This class is the primary interface for sending commands that modify or query worker behavior at runtime.

Step 2: Broadcast a Ping Command

The simplest way to verify connectivity is to "ping" every worker currently connected to the broker.

# Send a ping to all workers and wait up to 1.0 second for replies
replies = app.control.ping(timeout=1.0)

print(f"Received {len(replies)} replies.")
for reply in replies:
print(reply)

When you call app.control.ping(), the Control class uses its internal mailbox to broadcast a ping command to the celery exchange. Every worker listening on that exchange will receive the message and send back a "pong" response. The timeout argument ensures your script doesn't hang if a worker is unresponsive.

Step 3: Target Specific Workers

In a large cluster, you may only want to check the status of a specific node. You can do this using the destination argument.

# Target a specific worker by its node name
specific_replies = app.control.ping(
destination=['celery@worker1.example.com'],
timeout=2.0
)

if specific_replies:
print("Worker 1 is online!")
else:
print("Worker 1 did not respond.")

The destination parameter accepts a list of worker names. When provided, the Control.broadcast method (which ping calls internally) ensures that only the specified workers process the command, even though the message is technically broadcast to the exchange.

Step 4: Verify the Response Format

The Control.ping method returns a list of dictionaries. Each dictionary contains the hostname as the key and the response status as the value.

# Example of a successful response structure
# [
# {'celery@node1': {'ok': 'pong'}},
# {'celery@node2': {'ok': 'pong'}}
# ]

for response in replies:
for worker_name, status in response.items():
if status.get('ok') == 'pong':
print(f"Worker {worker_name} is healthy.")

This structure allows you to easily iterate through your cluster and identify which specific nodes are failing. If a worker is under heavy load or its event loop is blocked, it may fail to respond within the timeout period and will be absent from the list.

Step 5: Querying vs. Controlling

While app.control.ping() is a direct command, you can also use the inspect property to perform more detailed queries without affecting worker state.

# Use the Inspect interface for more metadata
inspector = app.control.inspect()
stats = inspector.stats()

if stats:
for node, data in stats.items():
print(f"Node {node} has processed {data['total']} tasks.")

The app.control.inspect() method returns an instance of celery.app.control.Inspect. While Control methods like ping(), revoke(), and rate_limit() are designed to trigger actions, Inspect methods are designed to gather telemetry and state information from the workers.

Troubleshooting Configuration

If you encounter an ImproperlyConfigured error when initializing your app, check your settings for control queues. The Control class enforces that control_queue_durable and control_queue_exclusive cannot both be True.

# This configuration will cause an error in app.control
app.conf.control_queue_durable = True
app.conf.control_queue_exclusive = True

Exclusive queues are automatically deleted by the broker when the connection is lost, which contradicts the "durable" requirement. Ensure only one (or neither) is set to True in your configuration.

Complete Working Example

Here is the complete script combining these steps:

from celery import Celery

# 1. Setup
app = Celery('monitor', broker='pyamqp://guest@localhost//')

def check_cluster_health():
# 2. Broadcast Ping
print("Pinging cluster...")
replies = app.control.ping(timeout=1.5)

# 3. Process Results
online_workers = []
for reply in replies:
for hostname, status in reply.items():
if status.get('ok') == 'pong':
online_workers.append(hostname)

print(f"Online workers: {', '.join(online_workers) if online_workers else 'None'}")

# 4. Targeted check if needed
if 'celery@critical-node' not in online_workers:
print("Warning: Critical node is offline!")

if __name__ == '__main__':
check_cluster_health()

Now that you can verify worker connectivity, you can explore other Control methods such as app.control.revoke() to cancel tasks or app.control.pool_grow() to dynamically scale your workers.