Workflows & Canvas
The Canvas API in this codebase provides a functional domain-specific language (DSL) for composing complex distributed workflows from individual tasks. By treating tasks as first-class objects called Signatures, the system allows developers to define execution logic—such as sequences, parallel execution, and synchronization—independently of the task's actual execution.
The Atomic Unit: Signature
The foundation of the Canvas API is the Signature class (defined in celery/canvas.py), which inherits from dict. This design choice ensures that a signature is inherently serializable, as it simply wraps the task name, arguments, and execution options into a dictionary structure.
A Signature can be created using the signature() function or via task shortcuts like .s() (for standard signatures) and .si() (for immutable signatures).
# From celery/canvas.py
class Signature(dict):
"""Payload for a task invocation."""
# ...
def __init__(self, task=None, args=None, kwargs=None, options=None,
type=None, subtask_type=None, immutable=False, app=None, **ex):
# ...
The distinction between .s() and .si() is critical for workflow composition:
.s(*args): Creates a signature that can accept additional arguments from a previous task in a chain..si(*args): Creates an immutable signature that ignores any results passed from preceding tasks.
Sequential Workflows: Chains
A chain (or the | operator) represents a sequence of tasks where the output of one task is passed as the first argument to the next. In celery/canvas.py, the chain class manages this by linking tasks together.
The implementation of chain uses the | operator (via __or__) to facilitate a natural syntax:
# Example from t/integration/test_canvas.py
# add(4, 4) -> 8; add(8, 8) -> 16; add(16, 16) -> 32
c = add.s(4, 4) | add.s(8) | add.s(16)
assert c().get(timeout=TIMEOUT) == 32
Internally, chain handles the complexity of "flattening" nested structures. If a chain contains another chain, it is unrolled into a single flat sequence to prevent deep recursion during serialization and execution.
Parallel Workflows: Groups
The group primitive (defined in celery/canvas.py) allows for the parallel execution of multiple tasks. Unlike a chain, a group returns a GroupResult, which enables tracking the status of all subtasks collectively.
# Example from t/integration/test_canvas.py
g = group(add.s(i, i) for i in range(4))
res = g()
assert res.get(timeout=TIMEOUT) == [0, 2, 4, 6]
A key constraint in the implementation is that groups cannot have direct callbacks (link). If you attempt to link a task to a group, the system raises a TypeError, suggesting the use of a chord instead.
Synchronization: Chords
A chord is a primitive that consists of a "header" (a group of tasks) and a "body" (a callback task). The body is executed only after all tasks in the header have successfully completed.
The codebase provides an "upgrade" path where piping a group into a task automatically creates a chord:
# From celery/canvas.py
def __or__(self, other):
# group() | task -> chord
return chord(self, body=other, app=self._app)
This allows for complex synchronization patterns:
# Example from t/integration/test_canvas.py
# Sum the results of two parallel additions
c = chord([add.s(1, 2), add.s(3, 4)])(tsum.s())
assert c.get() == 10
Advanced Composition and Optimization
The prepare_steps method in _chain (the base class for chain) is responsible for the sophisticated logic of "upgrading" workflows. For example, if a group is followed by another task in a chain, it is automatically converted into a chord.
# Logic from celery/canvas.py: prepare_steps
if isinstance(task, group) and prev_task:
# automatically upgrade group(...) | s to chord(group, s)
tasks.pop()
results.pop()
task = chord(task, body=prev_task, ...)
Task Protocols
The implementation supports two versions of the task protocol:
- Protocol 1: Uses the
linkattribute to create a tree-like structure of callbacks. This can lead to recursion depth issues during serialization (e.g., withpickle). - Protocol 2: Uses a
chainfield in the message payload. Theprepare_stepsmethod reverses the task list so the worker can simplypop()the next task, avoiding recursive nesting.
Metadata and Lineage: Stamping
For monitoring and tracking, the Canvas API includes a "stamping" mechanism. The StampingVisitor (an abstract base class in celery/canvas.py) allows developers to inject custom metadata (stamps) into every task within a canvas.
When canvas.stamp(visitor) is called, it traverses the entire structure—including nested chains, groups, and chords—applying headers or metadata at each step.
# From examples/stamping/examples.py
def run_example1():
s1 = chain(identity_task.si("foo11"), identity_task.si("foo12"))
# ...
canvas = group([s1, s2])
canvas.stamp(MonitoringIdStampingVisitor())
canvas.delay()
This visitor pattern ensures that cross-cutting concerns like request IDs or tracing headers can be propagated through complex workflows without manually modifying every task signature.
Design Tradeoffs
- Eager Mode: When
task_always_eageris enabled, the canvas primitives attempt to simulate distributed execution locally. However, this bypasses the actual message broker, which can sometimes mask issues related to serialization or network latency. - Chord Error Handling: Historically, error callbacks on chords only applied to the body. The codebase is currently transitioning (via
task_allow_error_cb_on_chord_header) to allow error callbacks on the header tasks as well, reflecting a move toward more robust error propagation in complex workflows. - Result Persistence: Because
Signatureinherits fromdict, it does not hold a reference to its result until it is executed. This separation of definition and execution is powerful but requires developers to manage theAsyncResultobjects returned at runtime to retrieve data.