Chords: Barrier Synchronization
To execute a callback task only after a group of parallel tasks has successfully completed, use a chord. A chord acts as a barrier synchronization primitive, collecting the results of multiple "header" tasks and passing them as a list to a single "body" task.
Basic Chord Implementation
You can create a chord by passing a list of task signatures (the header) and a callback signature (the body) to the chord class.
from celery import chord
from tasks import add, tsum
# Define the header (parallel tasks) and the body (callback)
header = [add.s(2, 2), add.s(4, 4)]
callback = tsum.s()
# Execute the chord
result = chord(header)(callback)
# The body 'tsum' receives [4, 8] and returns 12
print(result.get())
Using the Pipe Operator
A more common and concise way to create a chord is by chaining a group with a single task using the pipe (|) operator. Celery automatically upgrades this pattern to a _chord object.
from celery import group
from tasks import add, tsum
# Chaining a group to a task creates a chord automatically
workflow = group(add.s(1, 1), add.s(2, 2)) | tsum.s()
# Execute the workflow
result = workflow.apply_async()
Components of a Chord
The _chord class in celery.canvas manages two primary components:
- Header (
tasks): Agroupof tasks that execute in parallel. The chord waits for every task in this group to complete successfully before triggering the body. - Body (
body): A singleSignaturethat is executed once the header is finished. It receives the return values of all header tasks as a single list argument.
Dynamically Adding Tasks to a Chord
If you are inside a task that is already part of a chord, you can dynamically add more tasks to that same chord using self.add_to_chord(). This is currently primarily supported by the Redis result backend.
from celery import shared_task
@shared_task(bind=True)
def dynamic_task(self, x, y):
# Add a new task to the current chord header
from tasks import add
self.add_to_chord(add.s(x, y))
return x + y
Error Handling
By default, error callbacks (errbacks) linked to a chord are only applied to the body task. If a header task fails, the chord body is never executed.
To link error callbacks to the header tasks as well, you must enable the task_allow_error_cb_on_chord_header setting in your Celery configuration.
# Linking an error callback
c = chord([add.s(1, 2)], body=tsum.s())
c.link_error(on_chord_error.s())
c.apply_async()
Troubleshooting and Constraints
- Result Backend Required: Chords rely on the result backend to track the completion of the header tasks. Ensure a backend like Redis or Memcached is configured.
- Empty Headers: If the chord header is empty, the
_chord.run()method will execute the body immediately, passing an empty list[]as the argument. - Immutability: If you mark header tasks as immutable using
.si(), they will not receive partial arguments if the chord itself is called with arguments. - Recursive Chords: Chords can be nested. The
_chord._descend()method is used internally to calculate the total number of tasks across nested groups or chains to ensure the barrier triggers at the correct time.