Skip to main content

Task Discovery and Loaders

Loaders in Celery act as the interface between the Celery framework and the environment in which it runs. They are responsible for reading configuration, discovering task modules, and providing hooks for worker and task lifecycle events.

The Loader Hierarchy

The loader system is built around a base class that defines the interface, with specific implementations for different application styles.

BaseLoader

The celery.loaders.base.BaseLoader is the abstract foundation. It provides the core logic for:

  • Configuration Management: Methods like read_configuration and config_from_object.
  • Task Discovery: The autodiscover_tasks method and import_default_modules.
  • Lifecycle Hooks: Placeholders for worker and task events (e.g., on_worker_init, on_task_init).

AppLoader

The celery.loaders.app.AppLoader is the default loader used when you create an explicit Celery application instance:

from celery import Celery
app = Celery('my_project') # Uses AppLoader by default

Default Loader

The celery.loaders.default.Loader is used when no custom app is specified. It is designed to look for a configuration module named celeryconfig.py by default.

# celery/loaders/default.py

class Loader(BaseLoader):
"""The loader used by the default app."""

def read_configuration(self, fail_silently=True):
"""Read configuration from :file:`celeryconfig.py`."""
configname = os.environ.get('CELERY_CONFIG_MODULE',
DEFAULT_CONFIG_MODULE)
try:
usercfg = self._import_config_module(configname)
except ImportError:
# ... error handling ...
return self.setup_settings({})
else:
self.configured = True
return self.setup_settings(usercfg)

Configuration Loading

Loaders provide multiple ways to ingest configuration settings into the Celery app.

Environment-Based Configuration

The read_configuration method checks the CELERY_CONFIG_MODULE environment variable. If set, it imports that module and uses its attributes as configuration.

Object-Based Configuration

The config_from_object method allows passing a Python object, a dictionary, or a string path to a module. It uses _smart_import to resolve string paths:

# celery/loaders/base.py

def _smart_import(self, path, imp=None):
imp = self.import_module if imp is None else imp
if ':' in path:
# Path includes attribute (e.g., ``os.path:abspath``).
return symbol_by_name(path, imp=imp)

try:
return imp(path)
except ImportError:
# Not a module name, so try module + attribute.
return symbol_by_name(path, imp=imp)

Task Discovery

One of the most critical roles of the loader is finding tasks within a project structure.

Automatic Discovery

The autodiscover_tasks method iterates through a list of packages and looks for a specific module (defaulting to tasks.py).

# celery/loaders/base.py

def autodiscover_tasks(self, packages, related_name='tasks'):
self.task_modules.update(
mod.__name__ for mod in autodiscover_tasks(packages or (),
related_name) if mod)

The underlying find_related_module function handles the logic of locating these modules. If a package name is provided (like a Django app name), it attempts to import {package}.tasks.

Django Integration

In Django projects, autodiscover_tasks is typically called without arguments. In this case, Celery uses "fixups" (specifically celery.fixups.django.DjangoFixup) to automatically provide the list of installed apps as the packages argument.

# Example from examples/django/proj/celery.py
app = Celery('proj')
app.config_from_object('django.conf:settings', namespace='CELERY')

# Automatically finds tasks.py in all INSTALLED_APPS
app.autodiscover_tasks()

Worker Lifecycle Hooks

Loaders allow the application to respond to various stages of the worker's lifecycle. These methods are called by the worker at specific times:

  • on_worker_init: Called when the celery worker process starts.
  • on_worker_process_init: Called when a child pool process starts.
  • on_task_init: Called immediately before a task is executed.
  • on_process_cleanup: Called after a task is executed to clean up resources.

Safe Module Imports

When the worker initializes, it calls import_default_modules. The BaseLoader implementation ensures that exceptions during this phase are caught and re-raised manually, as the logging system may not be fully operational yet:

# celery/loaders/base.py

def import_default_modules(self):
responses = signals.import_modules.send(sender=self.app)
for _, response in responses:
if isinstance(response, Exception):
raise response
return [self.import_task_module(m) for m in self.default_modules]

This ensures that configuration or import errors in the user's task modules are visible and not silently ignored during worker startup.