Skip to main content

Dynamic Configuration with Annotations

To override task configurations like rate limits, queues, or even wrap task methods without modifying the original source code, use the Celery annotation system. This is particularly useful for "monkey-patching" tasks via configuration.

Configure Task Overrides with Dictionaries

The most common way to use annotations is by providing a dictionary to the task_annotations setting. This uses the MapAnnotation class internally to map task names to specific attributes.

from celery import Celery

app = Celery('tasks')

app.conf.task_annotations = {
'tasks.add': {'rate_limit': '10/m'},
'tasks.process_video': {'queue': 'video_queue'},
'*': {'on_failure': 'my_app.handlers.global_failure_handler'}
}

In this example:

  • tasks.add will have its rate_limit attribute set to '10/m'.
  • tasks.process_video will be routed to the 'video_queue'.
  • The * key acts as a wildcard, applying the on_failure handler to every task in the application.

Implement Dynamic Logic with Annotation Classes

For more complex scenarios where you need to decide overrides at runtime based on task properties, you can implement a custom annotation class. Your class should implement annotate(task) for task-specific logic and/or annotate_any() for global logic.

class PriorityAnnotation:
def annotate(self, task):
# Dynamically assign high priority to tasks in the 'urgent' namespace
if task.name.startswith('urgent.'):
return {'priority': 10}

def annotate_any(self):
# Apply a default priority to all other tasks
return {'priority': 5}

# Register the class by its full path or the class object itself
app.conf.task_annotations = ['my_module.PriorityAnnotation']

The celery.app.annotations.prepare function handles the instantiation of these classes. If you provide a string path, it uses mlazy and instantiate to load the class only when needed.

Wrap Task Methods Using the @ Prefix

The annotation system allows you to wrap existing task methods (like run or __call__) using "around" decorators. By prefixing an attribute name with @, Celery will pass the original method to your decorator instead of replacing it.

def log_task_execution(fun):
def _inner(self, *args, **kwargs):
print(f"Executing task: {self.name}")
return fun(self, *args, **kwargs)
return _inner

app.conf.task_annotations = {
'tasks.add': {'@__call__': log_task_execution}
}

When Task.annotate() is called during task binding, it detects the @ prefix and uses Task.add_around(attr, around) to wrap the method. This ensures the original method is preserved and accessible via the __wrapped__ attribute.

How Annotations are Resolved

Annotations are resolved and applied when a task is bound to an app. The celery.app.task.Task.annotate method iterates through all configured annotations using resolve_all (aliased as resolve_all_annotations in celery.app.task).

  1. Resolution: resolve_all yields results from annotate(task) and annotate_any() across all items in task_annotations.
  2. Application: For each key-value pair found:
    • If the key starts with @, it wraps the method using add_around.
    • Otherwise, it uses setattr(task_class, key, value) to override the attribute.

Troubleshooting and Gotchas

  • Binding Timing: Annotations are applied during Task.bind(app). If you modify task_annotations after tasks have already been bound, you may need to manually call task.annotate() to refresh the attributes.
  • Dictionary Copies: MapAnnotation.annotate and MapAnnotation.annotate_any return a dict() copy of the stored attributes. Modifying the returned dictionary will not affect the original annotation map.
  • Method Wrapping: When using the @ prefix, ensure your decorator correctly handles the self argument if wrapping instance methods like __call__ or run. In the log_task_execution example above, _inner accepts self because it wraps Task.__call__.