Skip to main content

Monitoring & Events

The monitoring and events system provides a real-time stream of cluster activity, allowing for live observation of worker health, task progress, and overall cluster state. This system is built on an event-driven architecture where producers (dispatchers) broadcast state changes and consumers (receivers) process these events to build a coherent view of the system.

Event Dispatching

The EventDispatcher class in celery/events/dispatcher.py is the primary interface for sending events to the broker. It is used by workers to signal heartbeats and task state transitions, and by clients to signal when a task has been sent.

Core Features

  • Buffering: The dispatcher can buffer events while the connection is down if buffer_while_offline is enabled. It also supports grouping events (e.g., sending multiple task events in one message) via buffer_group and buffer_limit.
  • Logical Clock: Every event includes a logical clock value (self.clock.forward()), ensuring that events can be ordered correctly even if physical clocks across the cluster are not perfectly synchronized.
  • Groups: Events can be filtered by groups (e.g., "task", "worker"). The dispatcher will only send events belonging to the groups specified in its groups argument.

Usage Example

Workers typically use the dispatcher within a context manager to ensure resources are released:

from celery.events.dispatcher import EventDispatcher

with EventDispatcher(connection, enabled=True) as dispatcher:
dispatcher.send('worker-online', custom_field='value')

Event Receiving

The EventReceiver class in celery/events/receiver.py consumes events from the broker. It creates a temporary, often exclusive queue to receive the event stream from the exchange defined by event_exchange (defaulting to celeryev).

Clock Synchronization

The receiver is responsible for maintaining the local view of the logical clock. When an event is received, it adjusts the local clock using self.clock.adjust(clock).

A notable exception is the task-sent event. Since clients do not synchronize their clocks with the rest of the cluster, the receiver applies a CLIENT_CLOCK_SKEW to these events to prevent them from disrupting the logical ordering of worker-generated events.

Capturing Events

The capture method starts a blocking loop that processes incoming events and dispatches them to registered handlers:

def my_handler(event):
print(f"Received event: {event['type']}")

with app.connection() as conn:
recv = app.events.Receiver(conn, handlers={'*': my_handler})
recv.capture(limit=None)

Cluster State Management

The State class in celery/events/state.py maintains an in-memory representation of the entire cluster. It processes the raw event stream and updates internal Worker and Task objects.

Tracking Entities

  • Workers: Tracked via the Worker class. State monitors heartbeats to determine if a worker is ONLINE or OFFLINE. If the time since the last heartbeat exceeds the HEARTBEAT_EXPIRE_WINDOW, the worker is considered offline.
  • Tasks: Tracked via the Task class. State maintains relationships between tasks, including parent_id and children (using WeakSet to avoid memory leaks).

Memory Management

To prevent unbounded memory growth in long-running monitoring tools, State uses LRUCache for both workers and tasks. The limits are configurable via max_workers_in_memory and max_tasks_in_memory.

Event Ordering

State uses a taskheap (a list managed with bisect) to keep track of tasks ordered by time. This allows for efficient retrieval of the most recent tasks via tasks_by_time().

Snapshots and Persistence

While the event stream is real-time, the Polaroid system in celery/events/snapshot.py allows for taking periodic "snapshots" of the cluster state. This is useful for persisting the state to a database for historical analysis.

The Polaroid Mechanism

A Polaroid instance uses a timer to periodically trigger a "shutter". When the shutter fires, it executes on_shutter(state), where the current State object can be inspected and saved.

evcam

The evcam function is a high-level utility that ties together a State, an EventReceiver, and a Polaroid (referred to as a "camera"). It provides a complete monitoring loop that captures events, updates the state, and takes snapshots at a defined frequency.

from celery.events.snapshot import evcam

# Starts a process that takes snapshots every 10 seconds
evcam(camera='my_project.Camera', freq=10.0, app=app)

Configuration Reference

The behavior of the monitoring system is controlled by several configuration options:

SettingDefaultDescription
worker_send_task_eventsFalseEnables sending of task-related events from workers.
task_send_sent_eventFalseEnables sending the task-sent event from the client.
event_exchangeceleryevThe name of the exchange used for events.
event_queue_ttl5.0Message TTL for the receiver's queue.
event_queue_prefixceleryevPrefix for the auto-generated receiver queue names.
event_serializerjsonSerializer used for event messages.

[!WARNING] EventReceiver queues cannot be configured as both exclusive and durable. Attempting to do so will raise an ImproperlyConfigured error in celery/events/receiver.py.