Skip to main content

Capturing and Handling Events

To capture and process real-time events from a Celery cluster, use the EventReceiver class to subscribe to the event stream and dispatch messages to custom handlers.

Basic Event Capture

The most common way to capture events is to use a catch-all handler and the capture() method, which starts a blocking event loop.

from celery import Celery

app = Celery('monitoring_app')

def on_event(event):
# event is a dictionary containing event data
print(f"Received event: {event['type']} from {event['hostname']}")

def start_monitoring():
# Use the connection from the app to create a receiver
with app.connection_for_read() as conn:
recv = app.events.Receiver(conn, handlers={
'*': on_event,
})
print("Starting event capture...")
recv.capture()

if __name__ == '__main__':
start_monitoring()

Handling Specific Event Types

Instead of a catch-all handler, you can map specific event types (like task-received or worker-heartbeat) to dedicated functions. The EventReceiver will automatically dispatch events based on the type field in the message body.

def on_task_succeeded(event):
print(f"Task {event['uuid']} succeeded in {event['runtime']}s")

def on_worker_online(event):
print(f"Worker {event['hostname']} joined the cluster")

handlers = {
'task-succeeded': on_task_succeeded,
'worker-online': on_worker_online,
'*': on_event, # Optional catch-all for other event types
}

with app.connection_for_read() as conn:
recv = app.events.Receiver(conn, handlers=handlers)
recv.capture()

Iterative Event Processing

If you need to perform other actions between processing events or want to avoid a blocking loop, use itercapture(). This method returns a generator that yields whenever an event is processed.

with app.connection_for_read() as conn:
recv = app.events.Receiver(conn, handlers={'*': on_event})

# Process events with a timeout
# limit: number of events to capture before stopping
# timeout: how long to wait for an event before yielding control
for _ in recv.itercapture(limit=None, timeout=1.0, wakeup=True):
print("Still waiting for events or just finished processing one...")
# Perform periodic maintenance here

Configuration Options

The EventReceiver behavior is influenced by several Celery configuration settings. You can also override these during initialization:

ParameterConfig SettingDescription
queue_prefixevent_queue_prefixPrefix for the temporary event queue name.
routing_keyN/AThe routing key to bind to (default: #).
queue_ttlevent_queue_ttlMessage TTL for the event queue.
queue_expiresevent_queue_expiresExpiry time for the event queue itself.
queue_exclusiveevent_queue_exclusiveWhether the queue is exclusive to this connection.
queue_durableevent_queue_durableWhether the queue should survive broker restarts.

Example with custom configuration:

recv = app.events.Receiver(
conn,
handlers={'*': on_event},
queue_prefix='custom_monitor',
queue_ttl=60.0, # 60 seconds
queue_expires=120.0 # 120 seconds
)

Troubleshooting

Queue Configuration Conflict

The EventReceiver will raise an ImproperlyConfigured exception if you attempt to set a queue as both exclusive and durable. You must choose one or the other.

# This will raise ImproperlyConfigured
recv = app.events.Receiver(
conn,
queue_exclusive=True,
queue_durable=True
)

Connection Resilience

In production environments, it is recommended to wrap the receiver in a reconnection loop, as seen in celery/events/dumper.py:

from celery.utils.time import humanize_seconds

def run_monitor(app):
conn = app.connection_for_read().clone()

def _error_handler(exc, interval):
print(f"Connection error: {exc}. Retrying in {humanize_seconds(interval)}")

while True:
try:
conn.ensure_connection(_error_handler)
recv = app.events.Receiver(conn, handlers={'*': on_event})
recv.capture()
except (KeyboardInterrupt, SystemExit):
conn.close()
break
except conn.connection_errors + conn.channel_errors:
print("Connection lost, attempting reconnect...")

Clock Synchronization

For task-sent events, the EventReceiver uses its own internal clock (adjusted by CLIENT_CLOCK_SKEW) because client clocks are often not synchronized with the rest of the cluster. For other events, it adjusts its internal clock based on the clock field in the event body.