Skip to main content

Extending the Application

To customize the core behavior of your Celery application, you can subclass the main application class or implement a custom loader to control configuration and lifecycle events.

Subclassing the Application

You can extend the Celery class to add custom initialization logic or change the default component classes used by the app.

from celery import Celery

class MyCeleryApp(Celery):

# Override default component classes
log_cls = 'myapp.log:MyLogging'

def on_init(self):
"""Optional callback called at the end of __init__."""
self.custom_metadata = {'version': '1.0'}
print(f"App {self.main} initialized.")

app = MyCeleryApp('myapp', broker='redis://localhost')

Customizing Component Classes

The Celery class allows you to specify custom classes for its internal components either by overriding class attributes in a subclass or by passing them to the constructor.

Key component attributes in celery.app.base.Celery:

  • amqp_cls: Handles AMQP related functionality (default: 'celery.app.amqp:AMQP').
  • log_cls: Handles logging setup (default: 'celery.app.log:Logging').
  • loader_cls: Handles configuration loading.
  • task_cls: The base class for all tasks (default: 'celery.app.task:Task').
# Passing custom classes via constructor
app = Celery('myapp', task_cls='myapp.tasks:BaseTask')

Implementing Custom Loaders

Loaders are responsible for sourcing configuration and defining behavior during worker and task initialization. To create a custom loader, inherit from celery.loaders.base.BaseLoader.

from celery import Celery
from celery.loaders.base import BaseLoader

class MyCustomLoader(BaseLoader):
def read_configuration(self):
"""Define how configuration is loaded."""
return {
'task_serializer': 'json',
'result_backend': 'redis://localhost',
'imports': ('myapp.tasks',),
}

def on_worker_init(self):
"""Called when the worker starts."""
print("Worker is preparing to start...")

def on_task_init(self, task_id, task):
"""Called before every task execution."""
print(f"Starting task: {task.name}[{task_id}]")

# Use the custom loader
app = Celery(loader=MyCustomLoader)

Configuring the Loader via Environment

You can also specify a custom loader without modifying the Celery instantiation by setting the CELERY_LOADER environment variable:

export CELERY_LOADER="myapp.loaders:MyCustomLoader"
celery worker -A myapp

Managing Shared State with class_property

When extending components like the logging system, you may need properties that are accessible and modifiable at both the class and instance level. The celery.local.class_property utility facilitates this.

from celery.local import class_property

class MyLoggingExtension:
_setup_done = False

@class_property
def already_setup(cls):
return cls._setup_done

@already_setup.setter
def already_setup(cls, value):
cls._setup_done = value

# Usage
MyLoggingExtension.already_setup = True
assert MyLoggingExtension().already_setup is True

Troubleshooting and Best Practices

Pickling and Subclasses

If you subclass Celery and define a custom __reduce_args__ method, Celery will revert to an older pickling strategy (v1). For modern extensions, it is recommended to use __reduce_keys__ if you need to customize how the application is reconstructed across process boundaries.

Loader Exception Handling

Exceptions raised within BaseLoader.import_default_modules occur before the logging system is fully initialized. If your custom loader fails during module imports, the errors may be silenced or difficult to trace. Ensure that your read_configuration and import_task_module implementations handle critical errors explicitly or provide fallback mechanisms.

App Finalization

The Celery.task decorator returns a PromiseProxy if the application has not been finalized. If your extension logic depends on inspecting task attributes immediately after registration, ensure you call app.finalize() or access app.tasks (which triggers auto-finalization).

app = Celery()

@app.task
def add(x, y): return x + y

# Accessing app.tasks ensures all tasks are bound and finalized
print(app.tasks['myapp.tasks.add'])