Handling Group and Set Results
To manage and aggregate results from multiple tasks, use the ResultSet and GroupResult classes. These allow you to track the status of multiple concurrent tasks and retrieve their values as a single collection.
Aggregating Multiple Task Results
You can use ResultSet to group several AsyncResult objects and wait for all of them to complete.
from celery.result import ResultSet
from proj.tasks import add
# Create a set of results from multiple task calls
rs = ResultSet([add.delay(1, 1), add.delay(2, 2), add.delay(4, 4)])
# Wait for all tasks to finish and return their values in order
results = rs.get(timeout=10)
# [2, 4, 8]
The ResultSet.get() method (which may delegate to join_native for performance on supported backends like Redis) returns a list of return values in the same order as the results in the set.
Checking Completion Status
ResultSet provides several methods to inspect the collective state of the tasks:
# Check if all tasks have finished (success or failure)
if rs.ready():
print("All tasks completed")
# Check if all tasks were successful
if rs.successful():
print("All tasks finished without errors")
# Check if any task failed
if rs.failed():
print("At least one task raised an exception")
# Get the count of successful tasks
completed = rs.completed_count()
Handling Errors in Result Sets
By default, rs.get() will re-raise the first exception encountered if any task in the set fails. To retrieve results without stopping for exceptions, set propagate=False.
from proj.tasks import add, raise_error
from celery.result import ResultSet
rs = ResultSet([add.delay(1, 1), raise_error.delay()])
# Get results without raising exceptions
values = rs.get(propagate=False)
# The list will contain the exception instance for the failed task
# values -> [2, KeyError('...')]
assert rs.results[0].successful()
assert rs.results[1].failed()
Saving and Restoring Group States
GroupResult is a specialized ResultSet (returned by the group canvas element) that can be persisted to the result backend. This allows you to retrieve the group's progress later or from a different process using only the group ID.
from celery import group
from celery.result import GroupResult
from proj.tasks import add
# Create and execute a group
job = group(add.s(i, i) for i in range(5))
result = job.apply_async()
# Save the group result to the backend
result.save()
# Later, or in another process, restore it using the ID
saved_id = result.id
restored_result = GroupResult.restore(saved_id)
if restored_result.ready():
print(restored_result.get())
Managing Result Set Members
You can dynamically modify a ResultSet by adding or removing individual AsyncResult objects.
from celery.result import ResultSet
rs = ResultSet([])
# Add a result
res = add.delay(1, 1)
rs.add(res)
# Update from an iterable
rs.update([add.delay(2, 2), add.delay(3, 3)])
# Remove a result
rs.remove(res)
# Clear all results
rs.clear()
Performance with Native Joins
For backends that support it (like Redis, Memcached, and AMQP), ResultSet can use join_native to retrieve results more efficiently than polling each task individually.
# ResultSet.get() automatically uses join_native if supported
if rs.supports_native_join:
results = rs.get()
Troubleshooting
Deadlocks with Synchronous Subtasks
Warning: Never call rs.get() or rs.join() inside a Celery task. This can lead to resource exhaustion and deadlocks where worker processes are waiting for results that cannot be processed because all workers are busy waiting.
If you attempt this, Celery will raise a RuntimeError unless disable_sync_subtasks is explicitly set to False.
Result Backend Requirements
GroupResult.save() and GroupResult.restore() require a result backend to be configured. If no backend is available, these methods will fail. Ensure your app.conf.result_backend is set (e.g., to 'redis://').
Counting Successes
Note that ResultSet.completed_count() specifically returns the number of successful tasks. If a task is finished but failed, it is not included in this count. Use len([r for r in rs.results if r.ready()]) if you need the total number of finished tasks regardless of outcome.