Introduction to the Event System
Celery's event system provides a decoupled, real-time monitoring architecture for tracking cluster activity. Instead of polling workers for their status, the system relies on an event-driven model where workers and clients push state changes to a dedicated exchange, and monitoring tools consume these events to reconstruct the cluster's health and task history.
The primary entry point for interacting with this system is the Events class, typically accessed via app.events.
The Events Factory
The celery.app.events.Events class acts as a central factory and proxy for the three pillars of the event system:
- Dispatcher: For sending events.
- Receiver: For capturing events.
- State: For maintaining an in-memory representation of the cluster.
These components are lazily loaded and automatically configured using the application's settings (e.g., event_exchange, event_serializer).
# Accessing the core components via the app instance
dispatcher = app.events.Dispatcher(connection)
receiver = app.events.Receiver(connection, handlers={'*': my_handler})
state = app.events.State()
Sending Events: EventDispatcher
The EventDispatcher (in celery.events.dispatcher) is responsible for publishing event messages to the broker. It is used extensively by workers to signal task lifecycles (e.g., task-received, task-started) and worker heartbeats.
Key Features
- Buffering: If
buffer_while_offlineis enabled, the dispatcher stores events in an internal_outbound_bufferwhen the broker connection is lost. These can be sent later usingflush(). - Event Groups: Events can be categorized into groups (like
taskorworker). The dispatcher can be configured to only send specific groups or to buffer certain groups (liketask) to reduce message overhead by sending them in batches. - Logical Clock: It integrates with Celery's logical clock (
self.app.clock) to ensure events can be ordered correctly even across different machines.
Implementation in Workers
Workers initialize the dispatcher during the Events bootstep in celery/worker/consumer/events.py:
class Events(bootsteps.StartStopStep):
def start(self, c):
dis = c.event_dispatcher = c.app.events.Dispatcher(
c.connection_for_write(),
hostname=c.hostname,
enabled=self.send_events,
groups=self.groups,
buffer_group=['task'] if c.hub else None,
)
Capturing Events: EventReceiver
The EventReceiver (in celery.events.receiver) is a ConsumerMixin that listens to the event exchange. It allows developers to register handlers for specific event types or a catch-all handler using the "*" key.
Real-time Monitoring Example
The evdump utility in celery/events/dumper.py demonstrates how to use the Receiver to capture and print every event in the cluster:
def evdump(app=None, out=sys.stdout):
app = app_or_default(app)
dumper = Dumper(out=out)
conn = app.connection_for_read().clone()
# Initialize the receiver with a catch-all handler
recv = app.events.Receiver(conn, handlers={'*': dumper.on_event})
# Start the consumer loop
recv.capture()
Tracking Cluster State: State
The State class (in celery.events.state) provides a high-level API for tracking the current status of all workers and tasks in the cluster. It consumes raw events and updates internal Worker and Task objects.
State Management Features
- In-Memory Storage: Uses
LRUCacheto limit memory usage for workers (max_workers_in_memory) and tasks (max_tasks_in_memory). - Clock Skew Detection: The
Workerclass monitors the drift between the event's timestamp and the local reception time. If the drift exceedsHEARTBEAT_DRIFT_MAX(default 16 seconds), it logs a warning. - Task Indexing: Provides optimized lookups via
tasks_by_typeandtasks_by_workerusingWeakSetto avoid memory leaks.
Reconstructing Task History
When an event like task-succeeded arrives, the State object finds the corresponding Task instance and updates its attributes. It handles out-of-order events using merge_rules, ensuring that critical metadata (like task names or arguments) from a RECEIVED event isn't overwritten by a later event that might lack that data.
Operational Constraints
Transport Limitations
The EventDispatcher explicitly disables event sending for certain broker transports that are not suitable for high-volume eventing. Specifically, SQL-based transports are blocked:
DISABLED_TRANSPORTS = {'sql'}
Queue Configuration
Event queues are designed to be transient. The EventReceiver enforces a constraint where a queue cannot be both exclusive and durable. Attempting to configure both will result in an ImproperlyConfigured exception.
Exchange and Serializer
By default, events are sent to the celeryev exchange using json serialization. These can be customized via the event_exchange and event_serializer settings in the Celery configuration.