Skip to main content

Defining and Registering Tasks

In this codebase, tasks are the fundamental units of work. They are represented by the Task class (found in celery.app.task) and managed by the TaskRegistry (found in celery.app.registry). This implementation allows for flexible task definition, lazy configuration, and centralized discovery.

The Task Base Class

The Task class serves as the blueprint for all executable units. While it provides the infrastructure for retries, shadowing, and result handling, its primary requirement is the implementation of the run() method.

Defining Tasks

There are two primary ways to define tasks in this project: using the @app.task decorator or subclassing the Task class directly.

Function-based Tasks (Decorator)

The most common approach is using the task decorator on a Celery application instance. This internally calls _task_from_fun in celery/app/base.py to wrap the function into a Task instance.

@app.task(bind=True, ignore_result=True)
def debug_task(self):
# 'bind=True' allows access to 'self', which is the Task instance
print(f'Request: {self.request!r}')

Class-based Tasks

For more complex logic, you can subclass Task directly. You must define the name attribute and implement the run() method.

from celery.app.task import Task

class MyTask(Task):
name = 'myapp.custom_task'

def run(self, x, y):
return x + y

Task Configuration and Attributes

The Task class defines several attributes that control its behavior. These can be set directly on the class or passed as arguments to the @app.task decorator.

  • serializer: The serialization method (e.g., 'json', 'pickle'). Defaults to the task_serializer app setting.
  • rate_limit: Limits how many tasks of this type can be executed per unit of time (e.g., '10/s').
  • max_retries: The maximum number of times a task will be retried before giving up (default is 3).
  • acks_late: If True, the message is acknowledged after execution. This ensures the task is re-run if the worker crashes, but may lead to duplicate executions.

Lazy Binding

Tasks in this codebase use a "lazy binding" pattern. When a task is defined, it isn't immediately fully configured. Instead, the Task.bind(app) method is called when the task is first used or when the application is finalized. This method maps app-wide configuration to task attributes using the Task.from_config mapping:

# From celery/app/task.py
from_config = (
('serializer', 'task_serializer'),
('rate_limit', 'task_default_rate_limit'),
('priority', 'task_default_priority'),
# ...
)

The Task Registry

The TaskRegistry is a specialized dictionary that stores all tasks registered with a Celery application. It is accessible via app.tasks.

Registration Process

When a task is registered via TaskRegistry.register(task), the registry ensures the task is an instance (instantiating it if a class was passed) and assigns it to its unique name.

# From celery/app/registry.py
def register(self, task):
if task.name is None:
raise InvalidTaskError(
'Task class {!r} must specify .name attribute'.format(
type(task).__name__))
task = inspect.isclass(task) and task() or task
add_autoretry_behaviour(task)
self[task.name] = task

Task Discovery

The registry is populated through several mechanisms:

  1. Manual Registration: Calling app.tasks.register(MyTask()).
  2. Decorator Registration: The @app.task decorator automatically adds the created task to the registry of the associated app.
  3. Autodiscovery: The app.autodiscover_tasks() method (in celery/app/base.py) searches through specified packages for a tasks.py module and imports them, triggering the decorators and registering the tasks.

Name Uniqueness

Task names must be unique within the registry. If two tasks are registered with the same name, the second one will overwrite the first. By default, the name is generated using the function's module and name, but it can be overridden:

@app.task(name='custom_unique_name')
def my_task():
pass

Task Execution Context

When a task is executed, it has access to a "request" context via self.request. This context contains metadata about the current execution, such as the task ID, retries, and delivery information.

The Task class manages this using a request_stack (a celery.utils.threads.LocalStack), ensuring that even in multi-threaded environments, the correct request context is retrieved for the current task execution.

# Accessing request context in a bound task
@app.task(bind=True)
def check_context(self):
print(self.request.id) # Unique ID of the task
print(self.request.retries) # Current retry count