Groups: Parallel Task Execution
The group primitive in celery.canvas is the fundamental building block for parallel task execution. It allows a collection of tasks to be dispatched simultaneously, returning a single result object that tracks the state of the entire set.
The Group Signature
A group is a subclass of Signature defined in celery/canvas.py. Like other canvas primitives, it is lazy; creating a group does not execute the tasks immediately. Instead, it creates a blueprint that can be invoked later.
Construction Patterns
The group class supports two primary ways of defining its task set:
- Explicit List: Passing a list of task signatures.
- Generator Expressions: Passing a generator, which is useful for dynamic task sets where the number of tasks is determined at runtime.
In examples/eventlet/webcrawler.py, a generator is used to crawl multiple URLs in parallel:
# From examples/eventlet/webcrawler.py
subtasks = group(crawl.s(url, seen) for url in wanted_urls)
subtasks.delay()
Alternatively, tasks can be passed as individual arguments, as seen in examples/quorum-queues/myapp.py:
# From examples/quorum-queues/myapp.py
result = group(
add.s(*tasks[0]),
add.s(*tasks[1]),
add.s(*tasks[2]),
).apply_async(queue=queue)
Execution and Result Management
When a group is executed via apply_async() or by calling it directly, it returns a GroupResult.
GroupResult and Synchronization
The GroupResult (instantiated via self.app.GroupResult in celery/canvas.py) acts as a container for the AsyncResult objects of all tasks in the group.
Internally, group.apply_async uses a barrier object from kombu.utils.functional to synchronize results. As each task completes, it triggers a callback on the barrier. This mechanism allows the GroupResult to efficiently determine when the entire collection is ready.
# Internal logic in celery/canvas.py:apply_async
p = barrier()
results = list(self._apply_tasks(tasks, producer, app, p,
args=args, kwargs=kwargs, **options))
result = self.app.GroupResult(group_id, results, ready_barrier=p)
p.finalize()
Eager Execution
If the Celery application is configured with task_always_eager=True, the group will execute tasks locally and synchronously within the apply() method, still returning a GroupResult for consistency.
Composition and Chords
Groups are frequently used in combination with other canvas primitives. A common pattern is the "Chaining to a Group," which Celery automatically upgrades to a chord.
Automatic Chord Upgrade
The group class implements the __or__ operator to handle pipes. When a group is followed by another task in a chain (e.g., group(...) | task), it returns a chord where the group is the header and the task is the callback.
# From celery/canvas.py
def __or__(self, other):
# group() | task -> chord
return chord(self, body=other, app=self._app)
This is demonstrated in complex workflows like those in t/integration/test_canvas.py, where a group serves as the final stage of a chain:
# From t/integration/test_canvas.py
g = group(add.s(i) for i in range(4))
c = (
add.s(2, 2) | (
add.s(4) | add_replaced.s(8) | add.s(16) | add.s(32)
) | g
)
res = c()
# res.get() returns [64, 65, 66, 67]
Error Handling and Callbacks
Handling callbacks in groups requires care because of their parallel nature.
Error Links (link_error)
When you apply an error callback to a group using link_error, the group implementation clones that signature and applies it to every individual task within the group. This ensures that if any task fails, the error handler is triggered.
# From celery/canvas.py
def link_error(self, sig):
sig = maybe_signature(sig)
return tuple(child_task.link_error(sig.clone(immutable=True))
for child_task in self.tasks)
Success Links (link)
Directly adding a success link to a group via apply_async is explicitly forbidden and will raise a TypeError. This is because a group has multiple return values, and a standard link wouldn't know which result to follow or whether to wait for all of them. For this behavior, a chord should be used instead.
# From celery/canvas.py:apply_async
if link is not None:
raise TypeError('Cannot add link to group: use a chord')
Internal Mechanics: Unrolling
The group primitive supports nesting (groups within groups). To handle this, the _prepared method recursively "unrolls" the group into a flat generator of tasks before execution. This ensures that even complex, nested structures are dispatched as a flat set of parallel operations to the broker.
# From celery/canvas.py
def _prepared(self, tasks, partial_args, group_id, root_id, app, ...):
for index, task in enumerate(tasks):
# ...
if isinstance(task, group):
unroll = task._prepared(task.tasks, partial_args, group_id, root_id, app)
yield from unroll
else:
yield task, task.freeze(...), group_id
This unrolling process also handles "freezing" the tasks, which involves assigning them a group_id and root_id so their results can be tracked collectively by the backend.