Skip to main content

Creating State Snapshots

To periodically capture and process snapshots of your Celery cluster state—for example, to log task statuses to a database or external storage—you can implement a custom "camera" by subclassing the Polaroid class.

Implementing a Custom Camera

To create a camera, subclass celery.events.snapshot.Polaroid and override the on_shutter method. This method receives a state object (an instance of celery.events.State) containing the current view of workers and tasks.

from celery.events.snapshot import Polaroid

class MyCamera(Polaroid):
# If True, the state is cleared after every snapshot
clear_after = True

def on_shutter(self, state):
"""Called at every 'freq' interval."""
for task_id, task in state.tasks.items():
print(f'Task {task_id} is currently in state {task.state}')

def on_cleanup(self):
"""Called at every 'cleanup_freq' interval (default: 1 hour)."""
print("Performing periodic maintenance on stored snapshots...")

Running the Snapshot Process

You can run your custom camera using the evcam helper function or via the Celery command-line interface.

Using evcam programmatically

The evcam function in celery.events.snapshot sets up an event receiver and starts the snapshot timer. Note that this is a blocking call.

from celery.events.snapshot import evcam

# camera: fully qualified path to your Polaroid subclass
evcam(camera='proj.cameras.MyCamera', freq=2.0, loglevel='INFO')

Using the Command Line

You can start the camera process using the celery events command:

celery events --camera=proj.cameras.MyCamera --frequency=2.0

Throttling Snapshots

If you need to limit the rate at which snapshots are taken (e.g., to prevent overloading a database during event spikes), use the maxrate parameter. This uses a TokenBucket to enforce a maximum frequency.

# Limit to 10 snapshots per minute
evcam(camera='proj.cameras.MyCamera', freq=0.1, maxrate='10/m')

In code, this is handled by the Polaroid constructor:

# From celery/events/snapshot.py
def __init__(self, state, freq=1.0, maxrate=None,
cleanup_freq=3600.0, timer=None, app=None):
# ...
self.maxrate = maxrate and TokenBucket(rate(maxrate))

Using Signals for Decoupled Processing

Instead of overriding on_shutter and on_cleanup, you can connect to the signals provided by the Polaroid class. This is useful for adding multiple observers to the same snapshot process.

from proj.cameras import MyCamera
from celery.app.events import State

def log_snapshot(sender, **kwargs):
# sender is the 'state' object
print(f"Snapshot captured for {len(sender.tasks)} tasks")

# Connect to the shutter signal
MyCamera.shutter_signal.connect(log_snapshot)

Troubleshooting and Behavior

  • Empty Hooks: The base Polaroid class has empty on_shutter and on_cleanup methods. You must override them or connect to signals to perform any actual work.
  • State Consistency: The capture method uses state.freeze_while(self.shutter), which ensures that the cluster state is not modified by incoming events while your on_shutter logic is executing.
  • Cleanup Frequency: By default, on_cleanup is called every 3600 seconds (1 hour). You can adjust this by passing cleanup_freq to the Polaroid constructor or via evcam.
  • Manual Control: If you are using Polaroid outside of evcam, you must call install() to start the timers and cancel() to stop them. Polaroid also supports the context manager pattern:
    with MyCamera(state, freq=1.0) as camera:
    # Timers are active here
    pass
    # Timers are cancelled automatically on exit