Skip to main content

Groups: Parallel Task Execution

The group primitive in celery.canvas is the fundamental building block for parallel task execution. It allows a collection of tasks to be dispatched simultaneously, returning a single result object that tracks the state of the entire set.

The Group Signature

A group is a subclass of Signature defined in celery/canvas.py. Like other canvas primitives, it is lazy; creating a group does not execute the tasks immediately. Instead, it creates a blueprint that can be invoked later.

Construction Patterns

The group class supports two primary ways of defining its task set:

  1. Explicit List: Passing a list of task signatures.
  2. Generator Expressions: Passing a generator, which is useful for dynamic task sets where the number of tasks is determined at runtime.

In examples/eventlet/webcrawler.py, a generator is used to crawl multiple URLs in parallel:

# From examples/eventlet/webcrawler.py
subtasks = group(crawl.s(url, seen) for url in wanted_urls)
subtasks.delay()

Alternatively, tasks can be passed as individual arguments, as seen in examples/quorum-queues/myapp.py:

# From examples/quorum-queues/myapp.py
result = group(
add.s(*tasks[0]),
add.s(*tasks[1]),
add.s(*tasks[2]),
).apply_async(queue=queue)

Execution and Result Management

When a group is executed via apply_async() or by calling it directly, it returns a GroupResult.

GroupResult and Synchronization

The GroupResult (instantiated via self.app.GroupResult in celery/canvas.py) acts as a container for the AsyncResult objects of all tasks in the group.

Internally, group.apply_async uses a barrier object from kombu.utils.functional to synchronize results. As each task completes, it triggers a callback on the barrier. This mechanism allows the GroupResult to efficiently determine when the entire collection is ready.

# Internal logic in celery/canvas.py:apply_async
p = barrier()
results = list(self._apply_tasks(tasks, producer, app, p,
args=args, kwargs=kwargs, **options))
result = self.app.GroupResult(group_id, results, ready_barrier=p)
p.finalize()

Eager Execution

If the Celery application is configured with task_always_eager=True, the group will execute tasks locally and synchronously within the apply() method, still returning a GroupResult for consistency.

Composition and Chords

Groups are frequently used in combination with other canvas primitives. A common pattern is the "Chaining to a Group," which Celery automatically upgrades to a chord.

Automatic Chord Upgrade

The group class implements the __or__ operator to handle pipes. When a group is followed by another task in a chain (e.g., group(...) | task), it returns a chord where the group is the header and the task is the callback.

# From celery/canvas.py
def __or__(self, other):
# group() | task -> chord
return chord(self, body=other, app=self._app)

This is demonstrated in complex workflows like those in t/integration/test_canvas.py, where a group serves as the final stage of a chain:

# From t/integration/test_canvas.py
g = group(add.s(i) for i in range(4))
c = (
add.s(2, 2) | (
add.s(4) | add_replaced.s(8) | add.s(16) | add.s(32)
) | g
)
res = c()
# res.get() returns [64, 65, 66, 67]

Error Handling and Callbacks

Handling callbacks in groups requires care because of their parallel nature.

When you apply an error callback to a group using link_error, the group implementation clones that signature and applies it to every individual task within the group. This ensures that if any task fails, the error handler is triggered.

# From celery/canvas.py
def link_error(self, sig):
sig = maybe_signature(sig)
return tuple(child_task.link_error(sig.clone(immutable=True))
for child_task in self.tasks)

Directly adding a success link to a group via apply_async is explicitly forbidden and will raise a TypeError. This is because a group has multiple return values, and a standard link wouldn't know which result to follow or whether to wait for all of them. For this behavior, a chord should be used instead.

# From celery/canvas.py:apply_async
if link is not None:
raise TypeError('Cannot add link to group: use a chord')

Internal Mechanics: Unrolling

The group primitive supports nesting (groups within groups). To handle this, the _prepared method recursively "unrolls" the group into a flat generator of tasks before execution. This ensures that even complex, nested structures are dispatched as a flat set of parallel operations to the broker.

# From celery/canvas.py
def _prepared(self, tasks, partial_args, group_id, root_id, app, ...):
for index, task in enumerate(tasks):
# ...
if isinstance(task, group):
unroll = task._prepared(task.tasks, partial_args, group_id, root_id, app)
yield from unroll
else:
yield task, task.freeze(...), group_id

This unrolling process also handles "freezing" the tasks, which involves assigning them a group_id and root_id so their results can be tracked collectively by the backend.