Task Definition & Routing
Tasks in this codebase are the fundamental units of work, defined as subclasses of the Task class and managed through a central registry. Routing logic decouples task definitions from the messaging topology, allowing developers to distribute tasks across different queues and exchanges without modifying the task code itself.
Task Definition
Tasks are typically created using the @app.task decorator provided by the Celery application instance in celery/app/base.py. This decorator transforms a standard Python function into a Task instance.
The Task Class
The Task base class, located in celery/app/task.py, provides the core functionality for asynchronous execution. When a function is decorated with @app.task, the application creates a new class that inherits from Task (or a custom base class if specified) and overrides the run method with the original function.
Key attributes of the Task class include:
name: The unique identifier for the task in the registry.acks_late: IfTrue, the message is acknowledged after execution instead of before.ignore_result: IfTrue, the worker will not store the task's return value or state.max_retries: The maximum number of times a task can be retried before failing.
Lazy Binding
Tasks are lazily bound to the application. This means that while a task is defined, it doesn't fully initialize its configuration until it is actually used or the application is "finalized." The Task.bind(app) method in celery/app/task.py handles this process, pulling settings from the application's configuration (e.g., task_serializer, task_acks_late) and applying them to the task instance.
# Example of task definition from examples/app/myapp.py
from celery import Celery
app = Celery('myapp', broker='amqp://guest@localhost//')
@app.task
def add(x, y):
return x + y
Task Registry
The TaskRegistry, defined in celery/app/registry.py, is a specialized dictionary that maps task names to their corresponding Task instances.
Naming Convention
By default, task names are automatically generated based on the function name and the module where it is defined. This is handled by Celery.gen_task_name in celery/app/base.py. For example, a function add in module tasks.py might be named tasks.add.
Registration Process
When the @app.task decorator is invoked, it calls _task_from_fun in celery/app/base.py. This method:
- Generates the task name.
- Creates the task class.
- Adds the task instance to
app._tasks(theTaskRegistry). - Binds the task to the app if the app is already finalized.
Task Routing
Routing is the process of determining which queue, exchange, and routing key a task message should be sent to. This logic is encapsulated in the Router class found in celery/app/routes.py.
The Routing Table
The application maintains a routing table prepared from the task_routes configuration setting. The prepare function in celery/app/routes.py converts these configurations into MapRoute objects, which support:
- Direct Mapping: Exact task name matches.
- Glob Patterns: Using
*to match multiple tasks (e.g.,feed.tasks.*). - Regular Expressions: Compiled regex objects for complex matching.
Routing Logic
When a task is called via apply_async, the application uses app.amqp.router.route to find the destination. The Router.route method follows this priority:
- Explicit Options: Arguments passed directly to
apply_async(e.g.,add.apply_async(queue='priority')). - Task Routes: Matches found in the
task_routesconfiguration. - Default Queue: The
task_default_queuesetting (defaults tocelery).
# Example routing configuration
app.conf.task_routes = {
'feed.tasks.*': {'queue': 'feeds'},
'web.tasks.*': {'queue': 'web'},
}
Task Request Context
During execution, a task has access to its "request context" via self.request. This is an instance of the Context class from celery/app/task.py.
The context contains metadata about the current execution, including:
id: The unique task UUID.argsandkwargs: The arguments the task was called with.retries: The current retry count.delivery_info: Information about the message delivery (queue, exchange, routing key).
To access the context, the task must be "bound" by passing bind=True to the decorator:
@app.task(bind=True)
def dump_context(self, x, y):
print(f'Executing task {self.request.id} with args {self.request.args}')
The Context object is managed using a stack (request_stack in Task), ensuring that nested task calls or concurrent executions in the same process do not leak state between requests.