Skip to main content

NoSQL and Distributed Key-Value Stores

This codebase provides a suite of specialized backends for high-scale result storage, leveraging NoSQL databases and distributed key-value stores. These implementations are designed to handle high-throughput task results while offering features like automatic expiration, optimistic concurrency control, and cloud-native integrations.

MongoDB Result Storage

The MongoBackend (found in celery.backends.mongodb.py) provides a flexible storage solution that supports MongoDB's advanced features like replica sets and sharding.

Connection and Configuration

The backend can be configured using a standard MongoDB URI. The implementation ensures URI compliance via _ensure_mongodb_uri_compliance, automatically prefixing the scheme if it is missing.

# Example URI for a replica set
uri = 'mongodb://user:pass@node1:27017,node2:27017/celery_db?replicaSet=rs0'
backend = MongoBackend(app=app, url=uri)

Performance and Indexing

To ensure efficient cleanup of expired results, the MongoBackend automatically manages background indexing on the date_done field. This is handled in the collection and group_collection properties:

@cached_property
def collection(self):
"""Get the meta-data task collection."""
collection = self.database[self.taskmeta_collection]
# Ensure an index on date_done is there in the background
collection.create_index('date_done', background=True)
return collection

Cassandra and AstraDB

The CassandraBackend (in celery/backends/cassandra.py) supports both traditional Apache Cassandra clusters and DataStax AstraDB via secure connect bundles.

Consistency and Connectivity

The backend allows fine-grained control over consistency levels for both reads and writes, defaulting to LOCAL_QUORUM. It distinguishes between standard server connections and AstraDB cloud connections:

if self.servers:
self._cluster = cassandra.cluster.Cluster(
self.servers, port=self.port,
auth_provider=self.auth_provider,
**self.cassandra_options)
else:
# AstraDB 'bundle_path' usage
self._cluster = cassandra.cluster.Cluster(
cloud={'secure_connect_bundle': self.bundle_path},
auth_provider=self.auth_provider,
**self.cassandra_options)

Table Management

To prevent race conditions, the backend restricts table creation to "writers" (typically workers). When _get_connection(write=True) is called, the worker attempts to execute Q_CREATE_RESULT_TABLE, which defines a schema optimized for result retrieval using date_done as a clustering column.

AWS DynamoDB

The DynamoDBBackend (in celery/backends/dynamodb.py) is a cloud-native implementation that utilizes AWS DynamoDB's native TTL and atomic increment features.

Native TTL Support

Unlike backends that require manual cleanup, DynamoDB handles expiration natively. The backend calculates the TTL timestamp during the put_item operation:

if self._has_ttl():
put_request['Item'].update({
self._ttl_field.name: {
self._ttl_field.data_type:
str(int(timestamp + self.time_to_live_seconds))
}
})

Atomic Increments for Chords

The DynamoDBBackend implements incr, allowing it to support Celery chords efficiently by using DynamoDB's UpdateExpression for atomic counter increments:

def incr(self, key: bytes) -> int:
"""Atomically increase the chord_count and return the new count"""
request_parameters = self._prepare_inc_count_request(str(key))
item_response = self.client.update_item(**request_parameters)
new_count = item_response["Attributes"][self._count_filed.name][self._count_filed.data_type]
return int(new_count)

Elasticsearch Conflict Resolution

The ElasticsearchBackend (in celery/backends/elasticsearch.py) implements optimistic concurrency control to prevent task state corruption.

State Protection Logic

The _update method ensures that a task's state cannot be regressed. Specifically, it prevents an UNREADY state (like a retry) from overriding a READY state (like success or failure):

if meta_present_on_backend['status'] == states.SUCCESS:
# if stored state is already in success, do nothing
return {'result': 'noop'}
elif meta_present_on_backend['status'] in states.READY_STATES and state in states.UNREADY_STATES:
# if stored state is in ready state and current not, do nothing
return {'result': 'noop'}

It uses Elasticsearch's if_primary_term and if_seq_no parameters to ensure that updates only succeed if the document has not been modified since it was last read.

Distributed Key-Value Stores

Consul

The ConsulBackend (in celery/backends/consul.py) leverages Consul's session mechanism for auto-expiry. When a result is stored, a session is created with a TTL. The key is then "acquired" by that session; when the session expires, Consul automatically deletes the associated key.

Azure CosmosDB

The CosmosDBSQLBackend (in celery/backends/cosmosdbsql.py) provides an implementation for the CosmosDB SQL API. It manages resource lifecycle by automatically creating the database and collection (partitioned by id) if they do not exist during the first client access.

Comparison of Auto-Expiration Mechanisms

The implementation of result expiration varies significantly across these backends:

BackendMechanismImplementation Detail
CassandraNative TTLUses USING TTL in the CQL INSERT statement.
DynamoDBNative TTLSets a ttl attribute on the item; AWS handles deletion.
ConsulSession TTLCreates a Consul session with behavior='delete'.
MongoDBBackground CleanupThe cleanup() method deletes documents where date_done is older than the expiry.
ElasticsearchManual/ExternalDoes not implement native auto-expiry within the backend class.