Skip to main content

Workflows & Canvas

The Canvas API in this codebase provides a functional domain-specific language (DSL) for composing complex distributed workflows from individual tasks. By treating tasks as first-class objects called Signatures, the system allows developers to define execution logic—such as sequences, parallel execution, and synchronization—independently of the task's actual execution.

The Atomic Unit: Signature

The foundation of the Canvas API is the Signature class (defined in celery/canvas.py), which inherits from dict. This design choice ensures that a signature is inherently serializable, as it simply wraps the task name, arguments, and execution options into a dictionary structure.

A Signature can be created using the signature() function or via task shortcuts like .s() (for standard signatures) and .si() (for immutable signatures).

# From celery/canvas.py
class Signature(dict):
"""Payload for a task invocation."""
# ...
def __init__(self, task=None, args=None, kwargs=None, options=None,
type=None, subtask_type=None, immutable=False, app=None, **ex):
# ...

The distinction between .s() and .si() is critical for workflow composition:

  • .s(*args): Creates a signature that can accept additional arguments from a previous task in a chain.
  • .si(*args): Creates an immutable signature that ignores any results passed from preceding tasks.

Sequential Workflows: Chains

A chain (or the | operator) represents a sequence of tasks where the output of one task is passed as the first argument to the next. In celery/canvas.py, the chain class manages this by linking tasks together.

The implementation of chain uses the | operator (via __or__) to facilitate a natural syntax:

# Example from t/integration/test_canvas.py
# add(4, 4) -> 8; add(8, 8) -> 16; add(16, 16) -> 32
c = add.s(4, 4) | add.s(8) | add.s(16)
assert c().get(timeout=TIMEOUT) == 32

Internally, chain handles the complexity of "flattening" nested structures. If a chain contains another chain, it is unrolled into a single flat sequence to prevent deep recursion during serialization and execution.

Parallel Workflows: Groups

The group primitive (defined in celery/canvas.py) allows for the parallel execution of multiple tasks. Unlike a chain, a group returns a GroupResult, which enables tracking the status of all subtasks collectively.

# Example from t/integration/test_canvas.py
g = group(add.s(i, i) for i in range(4))
res = g()
assert res.get(timeout=TIMEOUT) == [0, 2, 4, 6]

A key constraint in the implementation is that groups cannot have direct callbacks (link). If you attempt to link a task to a group, the system raises a TypeError, suggesting the use of a chord instead.

Synchronization: Chords

A chord is a primitive that consists of a "header" (a group of tasks) and a "body" (a callback task). The body is executed only after all tasks in the header have successfully completed.

The codebase provides an "upgrade" path where piping a group into a task automatically creates a chord:

# From celery/canvas.py
def __or__(self, other):
# group() | task -> chord
return chord(self, body=other, app=self._app)

This allows for complex synchronization patterns:

# Example from t/integration/test_canvas.py
# Sum the results of two parallel additions
c = chord([add.s(1, 2), add.s(3, 4)])(tsum.s())
assert c.get() == 10

Advanced Composition and Optimization

The prepare_steps method in _chain (the base class for chain) is responsible for the sophisticated logic of "upgrading" workflows. For example, if a group is followed by another task in a chain, it is automatically converted into a chord.

# Logic from celery/canvas.py: prepare_steps
if isinstance(task, group) and prev_task:
# automatically upgrade group(...) | s to chord(group, s)
tasks.pop()
results.pop()
task = chord(task, body=prev_task, ...)

Task Protocols

The implementation supports two versions of the task protocol:

  • Protocol 1: Uses the link attribute to create a tree-like structure of callbacks. This can lead to recursion depth issues during serialization (e.g., with pickle).
  • Protocol 2: Uses a chain field in the message payload. The prepare_steps method reverses the task list so the worker can simply pop() the next task, avoiding recursive nesting.

Metadata and Lineage: Stamping

For monitoring and tracking, the Canvas API includes a "stamping" mechanism. The StampingVisitor (an abstract base class in celery/canvas.py) allows developers to inject custom metadata (stamps) into every task within a canvas.

When canvas.stamp(visitor) is called, it traverses the entire structure—including nested chains, groups, and chords—applying headers or metadata at each step.

# From examples/stamping/examples.py
def run_example1():
s1 = chain(identity_task.si("foo11"), identity_task.si("foo12"))
# ...
canvas = group([s1, s2])
canvas.stamp(MonitoringIdStampingVisitor())
canvas.delay()

This visitor pattern ensures that cross-cutting concerns like request IDs or tracing headers can be propagated through complex workflows without manually modifying every task signature.

Design Tradeoffs

  1. Eager Mode: When task_always_eager is enabled, the canvas primitives attempt to simulate distributed execution locally. However, this bypasses the actual message broker, which can sometimes mask issues related to serialization or network latency.
  2. Chord Error Handling: Historically, error callbacks on chords only applied to the body. The codebase is currently transitioning (via task_allow_error_cb_on_chord_header) to allow error callbacks on the header tasks as well, reflecting a move toward more robust error propagation in complex workflows.
  3. Result Persistence: Because Signature inherits from dict, it does not hold a reference to its result until it is executed. This separation of definition and execution is powerful but requires developers to manage the AsyncResult objects returned at runtime to retrieve data.