Chains: Sequential Task Execution
Chains allow you to link multiple task signatures together so they execute in a specific order. In a chain, each task is applied as a callback of the previous one, creating a sequential workflow where data can flow from one step to the next.
Creating Chains
There are two primary ways to create a chain in this codebase: using the celery.canvas.chain class or the pipe (|) operator.
The chain Class
The chain class (defined in celery/canvas.py) accepts a list of signatures or a generator expression. If you pass a single iterable to the constructor, it automatically expands it.
from celery import chain
from proj.tasks import add
# Using positional arguments
res = chain(add.s(2, 2), add.s(4), add.s(8))()
# Using a generator expression
lazy_chain = chain(add.s(i) for i in range(10))
res = lazy_chain(3)
The Pipe Operator
The _chain class implements the __or__ method, allowing you to use the | operator to compose workflows. This is often more readable for simple sequences.
# Equivalent to the chain() example above
workflow = add.s(2, 2) | add.s(4) | add.s(8)
res = workflow()
Data Flow and the "First Argument" Rule
By default, a chain passes the return value of the previous task as the first argument to the next task in the sequence.
In the example add.s(2, 2) | add.s(4), the following happens:
add(2, 2)executes and returns4.- The result
4is prepended to the arguments of the next signature. - The next task effectively becomes
add(4, 4), returning8.
Breaking the Flow with Immutable Signatures
If you want a task in the chain to execute without receiving the previous task's result, you must mark it as immutable using .si() (signature immutable) or .set(immutable=True).
# add(2, 2) returns 4, but the next task ignores it and runs add(10, 10)
workflow = add.s(2, 2) | add.si(10, 10)
Automatic Upgrades to Chords
The _chain implementation in celery/canvas.py includes logic to automatically optimize workflows. If a group is followed by another task in a chain, the implementation "upgrades" this sequence into a chord.
This transformation happens inside _chain.prepare_steps. For example:
from celery import group
# This chain:
workflow = group(add.s(i, i) for i in range(5)) | add.s(10)
# Is automatically treated as a chord:
# chord([add(0,0), add(1,1), ...], add.s(10))
The prepare_steps method is responsible for unpacking these nested structures. It iterates through the tasks in reverse order, setting up the necessary links and identifying where groups need to be converted to chords to ensure synchronization.
Result Tracking and Parents
When you execute a chain, it returns an AsyncResult for the last task in the sequence. To access the results of intermediate tasks, you can navigate the parent attribute of the result object.
# t/unit/tasks/test_canvas.py example
c = add.s(2, 2) | add.s(4) | add.s(8)
result = c.apply_async()
# result is the AsyncResult for add(..., 8)
# result.parent is the AsyncResult for add(..., 4)
# result.parent.parent is the AsyncResult for add(2, 2)
Internal Execution (Protocol 2)
The codebase handles chains differently depending on the configured task_protocol.
- Protocol 1: Uses the
link(callback) field. Each task contains a reference to the next task to be called upon success. This can lead to deep recursion issues for very long chains. - Protocol 2 (Default): Uses a
chainlist in the message header. Instead of nesting callbacks, the worker pops the next task from this list and dispatches it. This avoids the recursion limits associated with Protocol 1.
The _chain.run method manages this logic, calling prepare_steps to flatten the chain and ensure that root_id and parent_id are correctly propagated across all tasks in the sequence.
Error Handling
Error callbacks (errbacks) attached to a chain are propagated to every task within that chain. This ensures that if any task in the sequence fails, the error handler is triggered.
# From t/unit/tasks/test_canvas.py
c = (add.s(2, 2) | add.s(4)).on_error(error_handler.s())
When prepare_steps is called during execution, it iterates through the tasks and applies the link_error to each one, ensuring consistent error coverage across the entire sequential workflow.