Skip to main content

Chords: Barrier Synchronization

To execute a callback task only after a group of parallel tasks has successfully completed, use a chord. A chord acts as a barrier synchronization primitive, collecting the results of multiple "header" tasks and passing them as a list to a single "body" task.

Basic Chord Implementation

You can create a chord by passing a list of task signatures (the header) and a callback signature (the body) to the chord class.

from celery import chord
from tasks import add, tsum

# Define the header (parallel tasks) and the body (callback)
header = [add.s(2, 2), add.s(4, 4)]
callback = tsum.s()

# Execute the chord
result = chord(header)(callback)

# The body 'tsum' receives [4, 8] and returns 12
print(result.get())

Using the Pipe Operator

A more common and concise way to create a chord is by chaining a group with a single task using the pipe (|) operator. Celery automatically upgrades this pattern to a _chord object.

from celery import group
from tasks import add, tsum

# Chaining a group to a task creates a chord automatically
workflow = group(add.s(1, 1), add.s(2, 2)) | tsum.s()

# Execute the workflow
result = workflow.apply_async()

Components of a Chord

The _chord class in celery.canvas manages two primary components:

  1. Header (tasks): A group of tasks that execute in parallel. The chord waits for every task in this group to complete successfully before triggering the body.
  2. Body (body): A single Signature that is executed once the header is finished. It receives the return values of all header tasks as a single list argument.

Dynamically Adding Tasks to a Chord

If you are inside a task that is already part of a chord, you can dynamically add more tasks to that same chord using self.add_to_chord(). This is currently primarily supported by the Redis result backend.

from celery import shared_task

@shared_task(bind=True)
def dynamic_task(self, x, y):
# Add a new task to the current chord header
from tasks import add
self.add_to_chord(add.s(x, y))
return x + y

Error Handling

By default, error callbacks (errbacks) linked to a chord are only applied to the body task. If a header task fails, the chord body is never executed.

To link error callbacks to the header tasks as well, you must enable the task_allow_error_cb_on_chord_header setting in your Celery configuration.

# Linking an error callback
c = chord([add.s(1, 2)], body=tsum.s())
c.link_error(on_chord_error.s())
c.apply_async()

Troubleshooting and Constraints

  • Result Backend Required: Chords rely on the result backend to track the completion of the header tasks. Ensure a backend like Redis or Memcached is configured.
  • Empty Headers: If the chord header is empty, the _chord.run() method will execute the body immediately, passing an empty list [] as the argument.
  • Immutability: If you mark header tasks as immutable using .si(), they will not receive partial arguments if the chord itself is called with arguments.
  • Recursive Chords: Chords can be nested. The _chord._descend() method is used internally to calculate the total number of tasks across nested groups or chains to ensure the barrier triggers at the correct time.