Stamping Workflows with Metadata
In complex distributed workflows, tracking the execution of a single request across multiple tasks, groups, and chords is a significant challenge. Standard task headers are often transient or overwritten during the cloning and merging operations that Celery performs to build its canvas structures.
The Stamping API provides a structured mechanism to attach persistent metadata—"stamps"—to every component of a canvas. This ensures that tracking information, such as monitoring IDs or request contexts, survives the recursive transformations inherent in Celery's workflow engine.
The Visitor Pattern in Canvas
Celery implements stamping using the Visitor design pattern. Instead of embedding specific tracking logic within the Signature class, the logic is encapsulated in a StampingVisitor. This decouples the "what" (the metadata to attach) from the "how" (traversing the recursive tree of signatures, chains, and groups).
The StampingVisitor (defined in celery.canvas) is an abstract base class providing hooks for every stage of canvas traversal:
on_signature(sig, **headers): Called for individual task signatures.on_group_start(group, **headers)/on_group_end: Called when entering and exiting a group.on_chain_start(chain, **headers)/on_chain_end: Called when entering and exiting a chain.on_chord_header_start(sig, **headers)/on_chord_body: Specific hooks for the header tasks and the final callback of a chord.on_callback(callback, **headers)/on_errback: Hooks for linked success and error handlers.
Each hook returns a dictionary of headers that are merged into the task's execution options.
Implementing a Custom Visitor
To use the stamping API, you subclass StampingVisitor and implement the hooks relevant to your tracking needs. A common use case is attaching a unique monitoring ID to every task in a workflow.
from uuid import uuid4
from celery.canvas import StampingVisitor, Signature
class MonitoringIdStampingVisitor(StampingVisitor):
def on_signature(self, sig: Signature, **headers) -> dict:
# Generate a unique ID for every task signature encountered
mtask_id = str(uuid4())
return {"mtask_id": mtask_id}
Applying Stamps to Workflows
Stamping is initiated by calling the .stamp() method on any canvas object. This method recursively traverses the structure, applying the visitor's logic to every nested element.
from celery import group
from my_project.tasks import add
# Create a complex canvas
canvas = group(add.s(i, i) for i in range(10))
# Apply the visitor
canvas.stamp(MonitoringIdStampingVisitor())
# Execute the stamped canvas
canvas.delay()
When canvas.stamp() is called:
- It calls the visitor's start hooks (e.g.,
on_group_start). - It iterates through the nested tasks.
- For each task, it calls
on_signatureand merges the resulting headers into the signature'soptions. - It recursively stamps any linked callbacks (
link) or errbacks (link_error).
Handling Duplicate Stamps
The stamp() method accepts an append_stamps boolean argument. If set to True, if a stamp key already exists in the signature, the new value is appended to a list rather than overwriting the existing value. This is useful for audit trails where you want to see every visitor that has touched a task.
Internal Mechanics and Metadata Integrity
To prevent stamps from being accidentally lost during signature operations (like sig.clone() or sig.merge()), Celery maintains an internal list called stamped_headers within the signature's options.
When a visitor returns headers, Celery performs the following in Signature._stamp_headers:
- Adds the new header keys to the
stamped_headerslist. - Merges the values into the signature's
optionsdictionary. - Ensures that these specific keys are treated as "stamps" during subsequent merges, protecting them from being overwritten by standard execution options.
Stamping and Generators
Celery workflows often use generators to handle large groups (e.g., group(add.s(i) for i in range(10000))). To avoid exhausting the generator during the stamping process, Celery uses a specialized _stamp_regen_task function. When a group or chord contains a _regen object (a wrapper for generators), the stamping logic maps the visitor over the generator lazily, ensuring that tasks are stamped only as they are yielded for execution.
Design Tradeoffs
While stamping provides robust tracking, it introduces a recursive traversal of the canvas structure. For extremely large or deeply nested workflows, this traversal adds a small overhead before the tasks are dispatched to the broker. However, this design choice favors metadata integrity and workflow observability over the marginal performance gain of flat, unstamped signatures.
Additionally, because stamping modifies the options of signatures in-place (or via cloning in chains), it is intended to be the final step before calling .delay() or .apply_async(). Applying stamps to a signature that is subsequently heavily modified by manual option overrides may lead to unexpected header states if the stamped_headers list is not respected.