Skip to main content

Inspecting Worker Health and Activity

To monitor the state of your workers, including active tasks, resource usage, and configuration, use the Inspect API. This API allows you to query workers via a broadcast mechanism without affecting their operation.

Basic Worker Inspection

The entry point for inspection is app.control.inspect(). By default, commands are broadcast to all workers.

from proj.celery import app

# Create an inspect instance
i = app.control.inspect()

# Ping all workers to check availability
# Returns: {'celery@node1': {'ok': 'pong'}, 'celery@node2': {'ok': 'pong'}}
print(i.ping())

# Get a human-readable report of worker status
# Returns: {'celery@node1': {'ok': '...report string...'}}
print(i.report())

Targeting Specific Workers

You can limit the scope of your inspection to specific nodes or use patterns to match worker names.

# Target specific nodes by name
i = app.control.inspect(destination=['celery@node1', 'celery@node2'])
stats = i.stats()

# Use glob patterns to match worker names
# Requires 'matcher' to be set (e.g., 'glob' or 'pcre')
i = app.control.inspect(pattern='celery@node*', matcher='glob')
active = i.active()

Monitoring Task Activity

The Inspect class provides several methods to track tasks in different states: active (currently running), scheduled (ETA/Countdown), and reserved (received but waiting for a pool slot).

i = app.control.inspect()

# List tasks currently being executed
# Returns: {'celery@node1': [{'id': 'uuid', 'name': 'tasks.add', 'args': [1, 2], ...}]}
active_tasks = i.active()

# List tasks with an ETA or countdown
scheduled_tasks = i.scheduled()

# Query the status of specific task IDs
# Returns: {'celery@node1': {'task-id-123': ['active', {...task info...}]}}
task_status = i.query_task('task-id-123', 'task-id-456')

Retrieving Worker Statistics and Health

Use stats() to get detailed information about the worker's internal state, including broker connection details, pool concurrency, and system resource usage (rusage).

i = app.control.inspect()
stats = i.stats()

# Accessing specific stats from a node
node_stats = stats.get('celery@node1')
if node_stats:
print(f"Uptime: {node_stats['uptime']} seconds")
print(f"Total tasks accepted: {node_stats['total']}")
print(f"Broker transport: {node_stats['broker']['transport']}")

# System resource usage (requires getrusage(2) support)
print(f"Max RSS: {node_stats['rusage']['maxrss']} KB")

Inspecting Worker Configuration

You can verify which tasks are registered on a worker and which queues it is currently consuming from.

i = app.control.inspect()

# List all registered task types
# Returns: {'celery@node1': ['tasks.add', 'tasks.mul']}
print(i.registered())

# List active queues and their configurations (exchange, routing_key, etc.)
print(i.active_queues())

# Retrieve the full configuration of the worker
# Set with_defaults=True to include default values
print(i.conf(with_defaults=False))

Troubleshooting and Requirements

  • Transport Support: The Inspect API relies on the remote control mailbox, which is primarily supported by RabbitMQ (AMQP) and Redis.
  • External Libraries: Methods like memsample(), memdump(), and objgraph() require the psutil or objgraph libraries to be installed on the worker nodes.
  • Timeouts: The default timeout for receiving replies is 1.0 second. If you have many workers or a slow network, increase this in the constructor:
    i = app.control.inspect(timeout=5.0)
  • Control Queue Configuration: Ensure your configuration does not set both control_queue_durable and control_queue_exclusive to True simultaneously, as this is an invalid combination for the underlying mailbox.