Task Discovery and Loaders
Loaders in Celery act as the interface between the Celery framework and the environment in which it runs. They are responsible for reading configuration, discovering task modules, and providing hooks for worker and task lifecycle events.
The Loader Hierarchy
The loader system is built around a base class that defines the interface, with specific implementations for different application styles.
BaseLoader
The celery.loaders.base.BaseLoader is the abstract foundation. It provides the core logic for:
- Configuration Management: Methods like
read_configurationandconfig_from_object. - Task Discovery: The
autodiscover_tasksmethod andimport_default_modules. - Lifecycle Hooks: Placeholders for worker and task events (e.g.,
on_worker_init,on_task_init).
AppLoader
The celery.loaders.app.AppLoader is the default loader used when you create an explicit Celery application instance:
from celery import Celery
app = Celery('my_project') # Uses AppLoader by default
Default Loader
The celery.loaders.default.Loader is used when no custom app is specified. It is designed to look for a configuration module named celeryconfig.py by default.
# celery/loaders/default.py
class Loader(BaseLoader):
"""The loader used by the default app."""
def read_configuration(self, fail_silently=True):
"""Read configuration from :file:`celeryconfig.py`."""
configname = os.environ.get('CELERY_CONFIG_MODULE',
DEFAULT_CONFIG_MODULE)
try:
usercfg = self._import_config_module(configname)
except ImportError:
# ... error handling ...
return self.setup_settings({})
else:
self.configured = True
return self.setup_settings(usercfg)
Configuration Loading
Loaders provide multiple ways to ingest configuration settings into the Celery app.
Environment-Based Configuration
The read_configuration method checks the CELERY_CONFIG_MODULE environment variable. If set, it imports that module and uses its attributes as configuration.
Object-Based Configuration
The config_from_object method allows passing a Python object, a dictionary, or a string path to a module. It uses _smart_import to resolve string paths:
# celery/loaders/base.py
def _smart_import(self, path, imp=None):
imp = self.import_module if imp is None else imp
if ':' in path:
# Path includes attribute (e.g., ``os.path:abspath``).
return symbol_by_name(path, imp=imp)
try:
return imp(path)
except ImportError:
# Not a module name, so try module + attribute.
return symbol_by_name(path, imp=imp)
Task Discovery
One of the most critical roles of the loader is finding tasks within a project structure.
Automatic Discovery
The autodiscover_tasks method iterates through a list of packages and looks for a specific module (defaulting to tasks.py).
# celery/loaders/base.py
def autodiscover_tasks(self, packages, related_name='tasks'):
self.task_modules.update(
mod.__name__ for mod in autodiscover_tasks(packages or (),
related_name) if mod)
The underlying find_related_module function handles the logic of locating these modules. If a package name is provided (like a Django app name), it attempts to import {package}.tasks.
Django Integration
In Django projects, autodiscover_tasks is typically called without arguments. In this case, Celery uses "fixups" (specifically celery.fixups.django.DjangoFixup) to automatically provide the list of installed apps as the packages argument.
# Example from examples/django/proj/celery.py
app = Celery('proj')
app.config_from_object('django.conf:settings', namespace='CELERY')
# Automatically finds tasks.py in all INSTALLED_APPS
app.autodiscover_tasks()
Worker Lifecycle Hooks
Loaders allow the application to respond to various stages of the worker's lifecycle. These methods are called by the worker at specific times:
on_worker_init: Called when thecelery workerprocess starts.on_worker_process_init: Called when a child pool process starts.on_task_init: Called immediately before a task is executed.on_process_cleanup: Called after a task is executed to clean up resources.
Safe Module Imports
When the worker initializes, it calls import_default_modules. The BaseLoader implementation ensures that exceptions during this phase are caught and re-raised manually, as the logging system may not be fully operational yet:
# celery/loaders/base.py
def import_default_modules(self):
responses = signals.import_modules.send(sender=self.app)
for _, response in responses:
if isinstance(response, Exception):
raise response
return [self.import_task_module(m) for m in self.default_modules]
This ensures that configuration or import errors in the user's task modules are visible and not silently ignored during worker startup.