NoSQL and Distributed Key-Value Stores
This codebase provides a suite of specialized backends for high-scale result storage, leveraging NoSQL databases and distributed key-value stores. These implementations are designed to handle high-throughput task results while offering features like automatic expiration, optimistic concurrency control, and cloud-native integrations.
MongoDB Result Storage
The MongoBackend (found in celery.backends.mongodb.py) provides a flexible storage solution that supports MongoDB's advanced features like replica sets and sharding.
Connection and Configuration
The backend can be configured using a standard MongoDB URI. The implementation ensures URI compliance via _ensure_mongodb_uri_compliance, automatically prefixing the scheme if it is missing.
# Example URI for a replica set
uri = 'mongodb://user:pass@node1:27017,node2:27017/celery_db?replicaSet=rs0'
backend = MongoBackend(app=app, url=uri)
Performance and Indexing
To ensure efficient cleanup of expired results, the MongoBackend automatically manages background indexing on the date_done field. This is handled in the collection and group_collection properties:
@cached_property
def collection(self):
"""Get the meta-data task collection."""
collection = self.database[self.taskmeta_collection]
# Ensure an index on date_done is there in the background
collection.create_index('date_done', background=True)
return collection
Cassandra and AstraDB
The CassandraBackend (in celery/backends/cassandra.py) supports both traditional Apache Cassandra clusters and DataStax AstraDB via secure connect bundles.
Consistency and Connectivity
The backend allows fine-grained control over consistency levels for both reads and writes, defaulting to LOCAL_QUORUM. It distinguishes between standard server connections and AstraDB cloud connections:
if self.servers:
self._cluster = cassandra.cluster.Cluster(
self.servers, port=self.port,
auth_provider=self.auth_provider,
**self.cassandra_options)
else:
# AstraDB 'bundle_path' usage
self._cluster = cassandra.cluster.Cluster(
cloud={'secure_connect_bundle': self.bundle_path},
auth_provider=self.auth_provider,
**self.cassandra_options)
Table Management
To prevent race conditions, the backend restricts table creation to "writers" (typically workers). When _get_connection(write=True) is called, the worker attempts to execute Q_CREATE_RESULT_TABLE, which defines a schema optimized for result retrieval using date_done as a clustering column.
AWS DynamoDB
The DynamoDBBackend (in celery/backends/dynamodb.py) is a cloud-native implementation that utilizes AWS DynamoDB's native TTL and atomic increment features.
Native TTL Support
Unlike backends that require manual cleanup, DynamoDB handles expiration natively. The backend calculates the TTL timestamp during the put_item operation:
if self._has_ttl():
put_request['Item'].update({
self._ttl_field.name: {
self._ttl_field.data_type:
str(int(timestamp + self.time_to_live_seconds))
}
})
Atomic Increments for Chords
The DynamoDBBackend implements incr, allowing it to support Celery chords efficiently by using DynamoDB's UpdateExpression for atomic counter increments:
def incr(self, key: bytes) -> int:
"""Atomically increase the chord_count and return the new count"""
request_parameters = self._prepare_inc_count_request(str(key))
item_response = self.client.update_item(**request_parameters)
new_count = item_response["Attributes"][self._count_filed.name][self._count_filed.data_type]
return int(new_count)
Elasticsearch Conflict Resolution
The ElasticsearchBackend (in celery/backends/elasticsearch.py) implements optimistic concurrency control to prevent task state corruption.
State Protection Logic
The _update method ensures that a task's state cannot be regressed. Specifically, it prevents an UNREADY state (like a retry) from overriding a READY state (like success or failure):
if meta_present_on_backend['status'] == states.SUCCESS:
# if stored state is already in success, do nothing
return {'result': 'noop'}
elif meta_present_on_backend['status'] in states.READY_STATES and state in states.UNREADY_STATES:
# if stored state is in ready state and current not, do nothing
return {'result': 'noop'}
It uses Elasticsearch's if_primary_term and if_seq_no parameters to ensure that updates only succeed if the document has not been modified since it was last read.
Distributed Key-Value Stores
Consul
The ConsulBackend (in celery/backends/consul.py) leverages Consul's session mechanism for auto-expiry. When a result is stored, a session is created with a TTL. The key is then "acquired" by that session; when the session expires, Consul automatically deletes the associated key.
Azure CosmosDB
The CosmosDBSQLBackend (in celery/backends/cosmosdbsql.py) provides an implementation for the CosmosDB SQL API. It manages resource lifecycle by automatically creating the database and collection (partitioned by id) if they do not exist during the first client access.
Comparison of Auto-Expiration Mechanisms
The implementation of result expiration varies significantly across these backends:
| Backend | Mechanism | Implementation Detail |
|---|---|---|
| Cassandra | Native TTL | Uses USING TTL in the CQL INSERT statement. |
| DynamoDB | Native TTL | Sets a ttl attribute on the item; AWS handles deletion. |
| Consul | Session TTL | Creates a Consul session with behavior='delete'. |
| MongoDB | Background Cleanup | The cleanup() method deletes documents where date_done is older than the expiry. |
| Elasticsearch | Manual/External | Does not implement native auto-expiry within the backend class. |