Monitoring & Events
The monitoring and events system provides a real-time stream of cluster activity, allowing for live observation of worker health, task progress, and overall cluster state. This system is built on an event-driven architecture where producers (dispatchers) broadcast state changes and consumers (receivers) process these events to build a coherent view of the system.
Event Dispatching
The EventDispatcher class in celery/events/dispatcher.py is the primary interface for sending events to the broker. It is used by workers to signal heartbeats and task state transitions, and by clients to signal when a task has been sent.
Core Features
- Buffering: The dispatcher can buffer events while the connection is down if
buffer_while_offlineis enabled. It also supports grouping events (e.g., sending multiple task events in one message) viabuffer_groupandbuffer_limit. - Logical Clock: Every event includes a logical clock value (
self.clock.forward()), ensuring that events can be ordered correctly even if physical clocks across the cluster are not perfectly synchronized. - Groups: Events can be filtered by groups (e.g., "task", "worker"). The dispatcher will only send events belonging to the groups specified in its
groupsargument.
Usage Example
Workers typically use the dispatcher within a context manager to ensure resources are released:
from celery.events.dispatcher import EventDispatcher
with EventDispatcher(connection, enabled=True) as dispatcher:
dispatcher.send('worker-online', custom_field='value')
Event Receiving
The EventReceiver class in celery/events/receiver.py consumes events from the broker. It creates a temporary, often exclusive queue to receive the event stream from the exchange defined by event_exchange (defaulting to celeryev).
Clock Synchronization
The receiver is responsible for maintaining the local view of the logical clock. When an event is received, it adjusts the local clock using self.clock.adjust(clock).
A notable exception is the task-sent event. Since clients do not synchronize their clocks with the rest of the cluster, the receiver applies a CLIENT_CLOCK_SKEW to these events to prevent them from disrupting the logical ordering of worker-generated events.
Capturing Events
The capture method starts a blocking loop that processes incoming events and dispatches them to registered handlers:
def my_handler(event):
print(f"Received event: {event['type']}")
with app.connection() as conn:
recv = app.events.Receiver(conn, handlers={'*': my_handler})
recv.capture(limit=None)
Cluster State Management
The State class in celery/events/state.py maintains an in-memory representation of the entire cluster. It processes the raw event stream and updates internal Worker and Task objects.
Tracking Entities
- Workers: Tracked via the
Workerclass.Statemonitors heartbeats to determine if a worker isONLINEorOFFLINE. If the time since the last heartbeat exceeds theHEARTBEAT_EXPIRE_WINDOW, the worker is considered offline. - Tasks: Tracked via the
Taskclass.Statemaintains relationships between tasks, includingparent_idandchildren(usingWeakSetto avoid memory leaks).
Memory Management
To prevent unbounded memory growth in long-running monitoring tools, State uses LRUCache for both workers and tasks. The limits are configurable via max_workers_in_memory and max_tasks_in_memory.
Event Ordering
State uses a taskheap (a list managed with bisect) to keep track of tasks ordered by time. This allows for efficient retrieval of the most recent tasks via tasks_by_time().
Snapshots and Persistence
While the event stream is real-time, the Polaroid system in celery/events/snapshot.py allows for taking periodic "snapshots" of the cluster state. This is useful for persisting the state to a database for historical analysis.
The Polaroid Mechanism
A Polaroid instance uses a timer to periodically trigger a "shutter". When the shutter fires, it executes on_shutter(state), where the current State object can be inspected and saved.
evcam
The evcam function is a high-level utility that ties together a State, an EventReceiver, and a Polaroid (referred to as a "camera"). It provides a complete monitoring loop that captures events, updates the state, and takes snapshots at a defined frequency.
from celery.events.snapshot import evcam
# Starts a process that takes snapshots every 10 seconds
evcam(camera='my_project.Camera', freq=10.0, app=app)
Configuration Reference
The behavior of the monitoring system is controlled by several configuration options:
| Setting | Default | Description |
|---|---|---|
worker_send_task_events | False | Enables sending of task-related events from workers. |
task_send_sent_event | False | Enables sending the task-sent event from the client. |
event_exchange | celeryev | The name of the exchange used for events. |
event_queue_ttl | 5.0 | Message TTL for the receiver's queue. |
event_queue_prefix | celeryev | Prefix for the auto-generated receiver queue names. |
event_serializer | json | Serializer used for event messages. |
[!WARNING]
EventReceiverqueues cannot be configured as bothexclusiveanddurable. Attempting to do so will raise anImproperlyConfigurederror incelery/events/receiver.py.