Inspecting Worker Health and Activity
To monitor the state of your workers, including active tasks, resource usage, and configuration, use the Inspect API. This API allows you to query workers via a broadcast mechanism without affecting their operation.
Basic Worker Inspection
The entry point for inspection is app.control.inspect(). By default, commands are broadcast to all workers.
from proj.celery import app
# Create an inspect instance
i = app.control.inspect()
# Ping all workers to check availability
# Returns: {'celery@node1': {'ok': 'pong'}, 'celery@node2': {'ok': 'pong'}}
print(i.ping())
# Get a human-readable report of worker status
# Returns: {'celery@node1': {'ok': '...report string...'}}
print(i.report())
Targeting Specific Workers
You can limit the scope of your inspection to specific nodes or use patterns to match worker names.
# Target specific nodes by name
i = app.control.inspect(destination=['celery@node1', 'celery@node2'])
stats = i.stats()
# Use glob patterns to match worker names
# Requires 'matcher' to be set (e.g., 'glob' or 'pcre')
i = app.control.inspect(pattern='celery@node*', matcher='glob')
active = i.active()
Monitoring Task Activity
The Inspect class provides several methods to track tasks in different states: active (currently running), scheduled (ETA/Countdown), and reserved (received but waiting for a pool slot).
i = app.control.inspect()
# List tasks currently being executed
# Returns: {'celery@node1': [{'id': 'uuid', 'name': 'tasks.add', 'args': [1, 2], ...}]}
active_tasks = i.active()
# List tasks with an ETA or countdown
scheduled_tasks = i.scheduled()
# Query the status of specific task IDs
# Returns: {'celery@node1': {'task-id-123': ['active', {...task info...}]}}
task_status = i.query_task('task-id-123', 'task-id-456')
Retrieving Worker Statistics and Health
Use stats() to get detailed information about the worker's internal state, including broker connection details, pool concurrency, and system resource usage (rusage).
i = app.control.inspect()
stats = i.stats()
# Accessing specific stats from a node
node_stats = stats.get('celery@node1')
if node_stats:
print(f"Uptime: {node_stats['uptime']} seconds")
print(f"Total tasks accepted: {node_stats['total']}")
print(f"Broker transport: {node_stats['broker']['transport']}")
# System resource usage (requires getrusage(2) support)
print(f"Max RSS: {node_stats['rusage']['maxrss']} KB")
Inspecting Worker Configuration
You can verify which tasks are registered on a worker and which queues it is currently consuming from.
i = app.control.inspect()
# List all registered task types
# Returns: {'celery@node1': ['tasks.add', 'tasks.mul']}
print(i.registered())
# List active queues and their configurations (exchange, routing_key, etc.)
print(i.active_queues())
# Retrieve the full configuration of the worker
# Set with_defaults=True to include default values
print(i.conf(with_defaults=False))
Troubleshooting and Requirements
- Transport Support: The
InspectAPI relies on the remote control mailbox, which is primarily supported by RabbitMQ (AMQP) and Redis. - External Libraries: Methods like
memsample(),memdump(), andobjgraph()require thepsutilorobjgraphlibraries to be installed on the worker nodes. - Timeouts: The default timeout for receiving replies is 1.0 second. If you have many workers or a slow network, increase this in the constructor:
i = app.control.inspect(timeout=5.0) - Control Queue Configuration: Ensure your configuration does not set both
control_queue_durableandcontrol_queue_exclusivetoTruesimultaneously, as this is an invalid combination for the underlying mailbox.