Capturing and Handling Events
To capture and process real-time events from a Celery cluster, use the EventReceiver class to subscribe to the event stream and dispatch messages to custom handlers.
Basic Event Capture
The most common way to capture events is to use a catch-all handler and the capture() method, which starts a blocking event loop.
from celery import Celery
app = Celery('monitoring_app')
def on_event(event):
# event is a dictionary containing event data
print(f"Received event: {event['type']} from {event['hostname']}")
def start_monitoring():
# Use the connection from the app to create a receiver
with app.connection_for_read() as conn:
recv = app.events.Receiver(conn, handlers={
'*': on_event,
})
print("Starting event capture...")
recv.capture()
if __name__ == '__main__':
start_monitoring()
Handling Specific Event Types
Instead of a catch-all handler, you can map specific event types (like task-received or worker-heartbeat) to dedicated functions. The EventReceiver will automatically dispatch events based on the type field in the message body.
def on_task_succeeded(event):
print(f"Task {event['uuid']} succeeded in {event['runtime']}s")
def on_worker_online(event):
print(f"Worker {event['hostname']} joined the cluster")
handlers = {
'task-succeeded': on_task_succeeded,
'worker-online': on_worker_online,
'*': on_event, # Optional catch-all for other event types
}
with app.connection_for_read() as conn:
recv = app.events.Receiver(conn, handlers=handlers)
recv.capture()
Iterative Event Processing
If you need to perform other actions between processing events or want to avoid a blocking loop, use itercapture(). This method returns a generator that yields whenever an event is processed.
with app.connection_for_read() as conn:
recv = app.events.Receiver(conn, handlers={'*': on_event})
# Process events with a timeout
# limit: number of events to capture before stopping
# timeout: how long to wait for an event before yielding control
for _ in recv.itercapture(limit=None, timeout=1.0, wakeup=True):
print("Still waiting for events or just finished processing one...")
# Perform periodic maintenance here
Configuration Options
The EventReceiver behavior is influenced by several Celery configuration settings. You can also override these during initialization:
| Parameter | Config Setting | Description |
|---|---|---|
queue_prefix | event_queue_prefix | Prefix for the temporary event queue name. |
routing_key | N/A | The routing key to bind to (default: #). |
queue_ttl | event_queue_ttl | Message TTL for the event queue. |
queue_expires | event_queue_expires | Expiry time for the event queue itself. |
queue_exclusive | event_queue_exclusive | Whether the queue is exclusive to this connection. |
queue_durable | event_queue_durable | Whether the queue should survive broker restarts. |
Example with custom configuration:
recv = app.events.Receiver(
conn,
handlers={'*': on_event},
queue_prefix='custom_monitor',
queue_ttl=60.0, # 60 seconds
queue_expires=120.0 # 120 seconds
)
Troubleshooting
Queue Configuration Conflict
The EventReceiver will raise an ImproperlyConfigured exception if you attempt to set a queue as both exclusive and durable. You must choose one or the other.
# This will raise ImproperlyConfigured
recv = app.events.Receiver(
conn,
queue_exclusive=True,
queue_durable=True
)
Connection Resilience
In production environments, it is recommended to wrap the receiver in a reconnection loop, as seen in celery/events/dumper.py:
from celery.utils.time import humanize_seconds
def run_monitor(app):
conn = app.connection_for_read().clone()
def _error_handler(exc, interval):
print(f"Connection error: {exc}. Retrying in {humanize_seconds(interval)}")
while True:
try:
conn.ensure_connection(_error_handler)
recv = app.events.Receiver(conn, handlers={'*': on_event})
recv.capture()
except (KeyboardInterrupt, SystemExit):
conn.close()
break
except conn.connection_errors + conn.channel_errors:
print("Connection lost, attempting reconnect...")
Clock Synchronization
For task-sent events, the EventReceiver uses its own internal clock (adjusted by CLIENT_CLOCK_SKEW) because client clocks are often not synchronized with the rest of the cluster. For other events, it adjusts its internal clock based on the clock field in the event body.