Defining and Registering Tasks
In this codebase, tasks are the fundamental units of work. They are represented by the Task class (found in celery.app.task) and managed by the TaskRegistry (found in celery.app.registry). This implementation allows for flexible task definition, lazy configuration, and centralized discovery.
The Task Base Class
The Task class serves as the blueprint for all executable units. While it provides the infrastructure for retries, shadowing, and result handling, its primary requirement is the implementation of the run() method.
Defining Tasks
There are two primary ways to define tasks in this project: using the @app.task decorator or subclassing the Task class directly.
Function-based Tasks (Decorator)
The most common approach is using the task decorator on a Celery application instance. This internally calls _task_from_fun in celery/app/base.py to wrap the function into a Task instance.
@app.task(bind=True, ignore_result=True)
def debug_task(self):
# 'bind=True' allows access to 'self', which is the Task instance
print(f'Request: {self.request!r}')
Class-based Tasks
For more complex logic, you can subclass Task directly. You must define the name attribute and implement the run() method.
from celery.app.task import Task
class MyTask(Task):
name = 'myapp.custom_task'
def run(self, x, y):
return x + y
Task Configuration and Attributes
The Task class defines several attributes that control its behavior. These can be set directly on the class or passed as arguments to the @app.task decorator.
serializer: The serialization method (e.g.,'json','pickle'). Defaults to thetask_serializerapp setting.rate_limit: Limits how many tasks of this type can be executed per unit of time (e.g.,'10/s').max_retries: The maximum number of times a task will be retried before giving up (default is 3).acks_late: IfTrue, the message is acknowledged after execution. This ensures the task is re-run if the worker crashes, but may lead to duplicate executions.
Lazy Binding
Tasks in this codebase use a "lazy binding" pattern. When a task is defined, it isn't immediately fully configured. Instead, the Task.bind(app) method is called when the task is first used or when the application is finalized. This method maps app-wide configuration to task attributes using the Task.from_config mapping:
# From celery/app/task.py
from_config = (
('serializer', 'task_serializer'),
('rate_limit', 'task_default_rate_limit'),
('priority', 'task_default_priority'),
# ...
)
The Task Registry
The TaskRegistry is a specialized dictionary that stores all tasks registered with a Celery application. It is accessible via app.tasks.
Registration Process
When a task is registered via TaskRegistry.register(task), the registry ensures the task is an instance (instantiating it if a class was passed) and assigns it to its unique name.
# From celery/app/registry.py
def register(self, task):
if task.name is None:
raise InvalidTaskError(
'Task class {!r} must specify .name attribute'.format(
type(task).__name__))
task = inspect.isclass(task) and task() or task
add_autoretry_behaviour(task)
self[task.name] = task
Task Discovery
The registry is populated through several mechanisms:
- Manual Registration: Calling
app.tasks.register(MyTask()). - Decorator Registration: The
@app.taskdecorator automatically adds the created task to the registry of the associated app. - Autodiscovery: The
app.autodiscover_tasks()method (incelery/app/base.py) searches through specified packages for atasks.pymodule and imports them, triggering the decorators and registering the tasks.
Name Uniqueness
Task names must be unique within the registry. If two tasks are registered with the same name, the second one will overwrite the first. By default, the name is generated using the function's module and name, but it can be overridden:
@app.task(name='custom_unique_name')
def my_task():
pass
Task Execution Context
When a task is executed, it has access to a "request" context via self.request. This context contains metadata about the current execution, such as the task ID, retries, and delivery information.
The Task class manages this using a request_stack (a celery.utils.threads.LocalStack), ensuring that even in multi-threaded environments, the correct request context is retrieved for the current task execution.
# Accessing request context in a bound task
@app.task(bind=True)
def check_context(self):
print(self.request.id) # Unique ID of the task
print(self.request.retries) # Current retry count