Skip to main content

Using SQL Databases for Persistence

The SQLAlchemy-based result backend provides a robust way to store task and group results in a relational database. This implementation, centered around the DatabaseBackend class, leverages SQLAlchemy to support a wide variety of database dialects while providing features like extended metadata storage and automatic table management.

Core Architecture

The persistence layer is built on three primary components:

  1. DatabaseBackend: The main interface that Celery uses to store and retrieve results. It manages the high-level logic for result serialization, retries, and cleanup.
  2. SQLAlchemy Models: Defined in celery.backends.database.models, these classes (Task, TaskExtended, TaskSet) map Celery results to database tables.
  3. SessionManager: Located in celery.backends.database.session, this component handles the lifecycle of SQLAlchemy engines and sessions, ensuring thread safety and fork safety.

Data Flow for Result Storage

When a task finishes, the DatabaseBackend._store_result method is called. It follows this internal flow:

  1. Obtains a session from the SessionManager via ResultSession().
  2. Uses the session_cleanup context manager to ensure the session is closed or rolled back on error.
  3. Queries for an existing Task record by task_id. If it doesn't exist, it creates a new instance of self.task_cls.
  4. Calls _update_result to map the result metadata (status, result, traceback, etc.) to the model's columns.
  5. Commits the transaction to the database.

Schema Management

The backend uses two main tables by default:

  • celery_taskmeta: Stores individual task results.
  • celery_tasksetmeta: Stores results for groups (tasksets).

Customizing Tables and Schemas

You can customize the table names and schemas through configuration. The DatabaseBackend.__init__ method applies these settings to the models using their configure class method:

# Example configuration via Celery app
app.conf.database_table_names = {
'task': 'my_custom_task_table',
'group': 'my_custom_group_table',
}
app.conf.database_table_schemas = {
'task': 'results_schema',
'group': 'results_schema',
}

The Task.configure and TaskSet.configure methods dynamically update the SQLAlchemy __table__.name and __table__.schema attributes before the tables are created.

Automatic Table Creation

By default, the backend attempts to create the necessary tables during initialization if database_create_tables_at_setup is True. The SessionManager.prepare_models method handles this, including a retry mechanism with exponential backoff to mitigate race conditions when multiple workers start simultaneously.

Extended Result Metadata

Standard task results include the status, return value, and traceback. By enabling result_extended = True, the backend switches from using the Task model to the TaskExtended model.

TaskExtended inherits from Task and adds several columns to capture more context from the task execution:

  • name: The name of the task.
  • args / kwargs: The arguments the task was called with (stored as LargeBinary).
  • worker: The name of the worker that executed the task.
  • retries: The number of times the task was retried.
  • queue: The queue the task was delivered to.

In DatabaseBackend.__init__, the model selection is handled as follows:

if self.extended_result:
self.task_cls = TaskExtended

Session and Connection Management

Managing database connections in a distributed environment requires careful handling of process forking and connection pooling.

Fork Safety

The SessionManager is designed to be fork-safe. It uses kombu.utils.compat.register_after_fork to detect when a worker process has forked. When get_engine is called in a child process, it ensures that a new SQLAlchemy engine is created rather than sharing the parent's connection pool, which would lead to corrupted socket communication.

Session Lifecycle

The session_cleanup context manager is used throughout DatabaseBackend to ensure resources are released:

@contextmanager
def session_cleanup(session):
try:
yield
except Exception:
session.rollback()
raise
finally:
session.close()

If your application requires extremely short-lived connections (e.g., to avoid holding idle connections in a large worker fleet), you can enable database_short_lived_sessions. This forces the SessionManager to create a new sessionmaker for every request instead of caching it.

Engine Customization

For low-level tuning, database_engine_options allows passing a dictionary of arguments directly to SQLAlchemy's create_engine. By default, the backend uses:

  • pool_pre_ping=True: To verify connections before use.
  • pool_recycle=3600: To rotate connections every hour.

Additionally, database_engine_callback can be used to provide a callable that receives the engine object immediately after creation, allowing for the registration of SQLAlchemy event listeners.

Maintenance and Cleanup

Results in the database are not permanent by default. The DatabaseBackend.cleanup() method is responsible for removing expired records based on the expires setting (configured via result_expires).

def cleanup(self):
"""Delete expired meta-data."""
session = self.ResultSession()
expires = self.expires
now = self.app.now()
with session_cleanup(session):
session.query(self.task_cls).filter(
self.task_cls.date_done < (now - expires)).delete()
session.query(self.taskset_cls).filter(
self.taskset_cls.date_done < (now - expires)).delete()
session.commit()

This method targets both celery_taskmeta and celery_tasksetmeta tables, deleting rows where the date_done timestamp is older than the expiration threshold.