Skip to main content

Nesting and Composing Workflows

In the vik-advani-celery-8f0842b codebase, complex distributed workflows are built by composing four primary primitives: Signature, group, chain, and chord. The implementation relies on a recursive "unrolling" mechanism that flattens nested structures into a format the worker can execute efficiently.

The Composition Engine: The Pipe Operator

The primary way to compose workflows is through the pipe operator (|), which is implemented via the __or__ method in the Signature class and its subclasses. This operator does not just append tasks; it performs "workflow upgrades" based on the types being combined.

For example, when a group is piped into a Signature, the implementation in celery/canvas.py automatically upgrades the relationship to a chord:

# From celery/canvas.py: group.__or__
def __or__(self, other):
# group() | task -> chord
return chord(self, body=other, app=self._app)

Similarly, piping a chord into another task attaches that task to the chord's body, effectively extending the synchronization point:

# From celery/canvas.py: _chord.__or__
def __or__(self, other):
if (not isinstance(other, (group, _chain)) and
isinstance(other, Signature)):
# chord | task -> attach to body
sig = self.clone()
sig.body = sig.body | other
return sig

Recursive Unrolling and Preparation

When a complex workflow like chain(group(A, B), C, group(D, E)) is executed, the codebase must transform this tree-like structure into a linear sequence of execution steps. This is handled by _chain.prepare_steps.

The prepare_steps method (found in celery/canvas.py) iterates through the tasks in reverse order. It performs several critical transformations:

  1. Splicing Nested Chains: If it encounters a nested _chain, it unrolls the tasks into the current stack.
  2. Upgrading Groups to Chords: If a group is followed by another task, it is converted into a chord where the group is the header and the subsequent task is the body.
  3. Linking Results: It sets the parent attribute on the AsyncResult objects to build the result graph.
# Simplified logic from _chain.prepare_steps in celery/canvas.py
while steps:
task = steps_pop()
if isinstance(task, _chain):
steps_extend(task.tasks)
continue

if isinstance(task, group) and prev_task:
# Upgrade group | task to chord
task = chord(task, body=prev_task, ...)

This unrolling ensures that even deeply nested workflows are flattened before they are sent to the broker, preventing excessive recursion during serialization.

Multi-Stage Workflow Patterns

The integration tests in t/integration/test_canvas.py demonstrate how these primitives are combined to create multi-stage pipelines. A common pattern is alternating between parallel execution and synchronization:

# Based on t/integration/test_canvas.py
c = add.si(1, 0) # Stage 1: Single task
c = c | group(add.s(1), add.s(1)) # Stage 2: Parallel group (upgraded to chord)
c = c | tsum.s() # Stage 3: Aggregate results
c = c | add.s(1) # Stage 4: Sequential step
c = c | group(add.s(1), add.s(1)) # Stage 5: Final parallel fan-out

In this example, the transition from group(...) | tsum.s() creates a chord. The tsum.s() task will only execute once both tasks in the group have completed, receiving their results as a list.

Tradeoffs and Design Constraints

Serialization and Recursion

The implementation of _chain specifically addresses Python's recursion limits. In older versions of the protocol (Protocol 1), chains were built by linking tasks together in a tree structure, which could cause RecursionError during pickle serialization.

The current implementation (Protocol 2) uses a chain message field. As seen in _chain.prepare_steps, the tasks are reversed so the worker can simply pop() the next task from the list, avoiding deep recursive calls.

Eager Mode Behavior

When task_always_eager is enabled, the composition logic changes its execution strategy. Instead of sending tasks to the broker, apply_async calls apply directly. In _chord.apply, the header is executed, its results are gathered, and then the body is applied immediately:

# From celery/canvas.py: _chord.apply
def apply(self, args=None, kwargs=None, propagate=True, body=None, **options):
# ...
return body.apply(
args=(tasks.apply(args, kwargs).get(propagate=propagate),),
)

Error Propagation

Error handling in composed workflows is complex. The link_error method is used to attach errbacks. In a group, the errback is cloned and attached to every task in the group, as any one of them failing should trigger the error logic. In a chord, the task_allow_error_cb_on_chord_header setting determines whether the errback is applied to the header tasks or just the body.

# From celery/canvas.py: _chord.link_error
if self.app.conf.task_allow_error_cb_on_chord_header:
for task in maybe_list(self.tasks) or []:
task.link_error(errback.clone(immutable=True))

This design ensures that errors at any stage of a nested workflow can be captured, though it requires careful configuration to avoid duplicate error triggers in large groups.