Result Backends & Persistence
Result backends in this codebase provide the infrastructure for storing and retrieving task states, return values, and exception information. When a task is executed, its outcome is persisted to a backend, allowing clients to query the result asynchronously using a task ID.
Core Architecture
The result system is built around a provider-consumer model. Workers act as providers, storing results via a backend implementation, while clients act as consumers, retrieving those results through the AsyncResult API.
The Backend Interface
All result backends inherit from the Backend class (defined in celery/backends/base.py). This base class defines the standard lifecycle for task results:
mark_as_started: Updates the task state toSTARTED.mark_as_done: Stores the return value and sets the state toSUCCESS.mark_as_failure: Stores the exception and traceback, setting the state toFAILURE.store_result: The underlying method used to persist state and metadata to the storage engine.
The Backend class also handles serialization of results and exceptions. It uses the kombu.serialization registry to encode data before storage and decode it upon retrieval.
AsyncResult: The User API
The AsyncResult class in celery/result.py is the primary interface for interacting with task outcomes. When you call a task using .delay() or .apply_async(), an AsyncResult instance is returned.
from celery.result import AsyncResult
# result is an AsyncResult instance
result = my_task.delay(1, 2)
# Check if the task has finished
if result.ready():
print(f"Task state: {result.state}")
print(f"Task value: {result.get()}")
Key methods in AsyncResult include:
get(timeout=None, propagate=True): Blocks until the result is available. IfpropagateisTrue(default), it re-raises any exception that occurred during task execution.forget(): Removes the result from the backend to free up resources.revoke(): Sends a signal to workers to terminate or ignore the task.ready(): ReturnsTrueif the task has reached a final state (SUCCESS, FAILURE, REVOKED).
Backend Implementations
This project supports several backend types, each optimized for different use cases.
Redis Backend
The RedisBackend (celery/backends/redis.py) is a high-performance option that supports native join operations and real-time result notifications.
Unlike most backends that require polling, the Redis backend uses a ResultConsumer that leverages Redis Pub/Sub. When a result is stored, the backend publishes a message to a channel named after the task ID. The AsyncResult.get() method then waits on this Pub/Sub channel, providing near-instantaneous result retrieval without the overhead of repeated network requests.
# Internal implementation of result storage in RedisBackend
def _set(self, key, value):
with self.client.pipeline() as pipe:
if self.expires:
pipe.setex(key, self.expires, value)
else:
pipe.set(key, value)
# Notify any waiting clients via Pub/Sub
pipe.publish(key, value)
pipe.execute()
Database Backend
The DatabaseBackend (celery/backends/database/__init__.py) uses SQLAlchemy to store results in relational databases. This is ideal for long-term persistence and environments where a database is already available.
It uses two primary models defined in celery/backends/database/models.py:
Task: Stores individual task results, includingtask_id,status,result, andtraceback.TaskSet: Stores results for groups of tasks (used for chords and groups).
If result_extended is enabled, it uses the TaskExtended model to store additional metadata such as task arguments (args, kwargs), the worker hostname, and the queue name.
Configuration & Persistence
Result behavior is controlled via several configuration options defined in celery/app/defaults.py:
result_backend: The URL of the storage service (e.g.,redis://localhost:6379/0ordb+sqlite:///results.db).result_expires: The duration (default: 1 day) for which results are kept before being automatically deleted by the backend (if supported, like Redis) or thecelery backend_cleanuptask.result_serializer: The serialization format for results (default:json).result_extended: WhenTrue, the backend stores extra metadata like task arguments and worker identity.
Best Practices & Resource Management
Avoiding Deadlocks
A critical rule in this codebase is to never call result.get() inside a task if it waits for another task. This can lead to resource exhaustion and deadlocks where all workers are waiting for results that cannot be processed because all workers are busy waiting.
The AsyncResult.get() method includes a check (assert_will_not_block) that raises a RuntimeError if it detects it is being called within a task context, unless explicitly allowed via the allow_join_result context manager.
Resource Cleanup
Backends do not automatically know when you are finished with a result. To prevent resource leakage (especially in Redis or memory-heavy backends), you should eventually call .forget() or ensure result_expires is configured appropriately.
result = my_task.delay()
try:
value = result.get(timeout=10)
finally:
# Ensure the backend entry is removed
result.forget()
Handling Collections
For workflows involving multiple tasks, ResultSet and GroupResult (in celery/result.py) provide a way to manage collections of AsyncResult objects. They allow you to check the status of an entire group or wait for all results in the set simultaneously using join().