Creating State Snapshots
To periodically capture and process snapshots of your Celery cluster state—for example, to log task statuses to a database or external storage—you can implement a custom "camera" by subclassing the Polaroid class.
Implementing a Custom Camera
To create a camera, subclass celery.events.snapshot.Polaroid and override the on_shutter method. This method receives a state object (an instance of celery.events.State) containing the current view of workers and tasks.
from celery.events.snapshot import Polaroid
class MyCamera(Polaroid):
# If True, the state is cleared after every snapshot
clear_after = True
def on_shutter(self, state):
"""Called at every 'freq' interval."""
for task_id, task in state.tasks.items():
print(f'Task {task_id} is currently in state {task.state}')
def on_cleanup(self):
"""Called at every 'cleanup_freq' interval (default: 1 hour)."""
print("Performing periodic maintenance on stored snapshots...")
Running the Snapshot Process
You can run your custom camera using the evcam helper function or via the Celery command-line interface.
Using evcam programmatically
The evcam function in celery.events.snapshot sets up an event receiver and starts the snapshot timer. Note that this is a blocking call.
from celery.events.snapshot import evcam
# camera: fully qualified path to your Polaroid subclass
evcam(camera='proj.cameras.MyCamera', freq=2.0, loglevel='INFO')
Using the Command Line
You can start the camera process using the celery events command:
celery events --camera=proj.cameras.MyCamera --frequency=2.0
Throttling Snapshots
If you need to limit the rate at which snapshots are taken (e.g., to prevent overloading a database during event spikes), use the maxrate parameter. This uses a TokenBucket to enforce a maximum frequency.
# Limit to 10 snapshots per minute
evcam(camera='proj.cameras.MyCamera', freq=0.1, maxrate='10/m')
In code, this is handled by the Polaroid constructor:
# From celery/events/snapshot.py
def __init__(self, state, freq=1.0, maxrate=None,
cleanup_freq=3600.0, timer=None, app=None):
# ...
self.maxrate = maxrate and TokenBucket(rate(maxrate))
Using Signals for Decoupled Processing
Instead of overriding on_shutter and on_cleanup, you can connect to the signals provided by the Polaroid class. This is useful for adding multiple observers to the same snapshot process.
from proj.cameras import MyCamera
from celery.app.events import State
def log_snapshot(sender, **kwargs):
# sender is the 'state' object
print(f"Snapshot captured for {len(sender.tasks)} tasks")
# Connect to the shutter signal
MyCamera.shutter_signal.connect(log_snapshot)
Troubleshooting and Behavior
- Empty Hooks: The base
Polaroidclass has emptyon_shutterandon_cleanupmethods. You must override them or connect to signals to perform any actual work. - State Consistency: The
capturemethod usesstate.freeze_while(self.shutter), which ensures that the cluster state is not modified by incoming events while youron_shutterlogic is executing. - Cleanup Frequency: By default,
on_cleanupis called every 3600 seconds (1 hour). You can adjust this by passingcleanup_freqto thePolaroidconstructor or viaevcam. - Manual Control: If you are using
Polaroidoutside ofevcam, you must callinstall()to start the timers andcancel()to stop them.Polaroidalso supports the context manager pattern:with MyCamera(state, freq=1.0) as camera:
# Timers are active here
pass
# Timers are cancelled automatically on exit