Implementing Task Retries
In distributed systems, transient failures like network timeouts or temporary service unavailability are common. This tutorial walks you through implementing robust task retries using the Task class to ensure your background jobs are resilient.
What You Will Build
You will create a task that simulates a network request to an external API. You will implement:
- Manual Retries: Explicitly catching exceptions and re-queueing the task.
- Automatic Retries: Using decorator configurations to handle specific errors.
- Exponential Backoff: Increasing the delay between retries to avoid overwhelming failing services.
Prerequisites
You must have a Celery application instance configured. For this tutorial, we assume an app instance is available.
Step 1: Implementing Manual Retries
When you need fine-grained control over when a task should be retried, use the self.retry method. To access this method, you must set bind=True in your task decorator.
Create a task that retries when a specific exception occurs:
from celery import shared_task
class APIError(Exception):
"""Custom exception for API failures."""
pass
@shared_task(bind=True)
def send_notification(self, user_id, message):
try:
# Simulate a failing network request
print(f"Sending notification to {user_id}...")
raise APIError("Connection timed out")
except APIError as exc:
# Retry in 60 seconds
raise self.retry(exc=exc, countdown=60)
In this example, bind=True makes the task instance available as the first argument (self). The self.retry method re-queues the task. We use raise self.retry(...) to ensure that the current task execution stops immediately and the worker is notified of the retry state.
Step 2: Using Automatic Retries
For common transient failures, you can use the autoretry_for argument in the task decorator. This reduces boilerplate code by automatically catching specified exceptions.
@shared_task(
autoretry_for=(APIError, ConnectionError),
retry_kwargs={'max_retries': 5},
default_retry_delay=30
)
def sync_user_data(user_id):
# If APIError or ConnectionError is raised,
# Celery automatically retries up to 5 times
# with a 30-second delay between attempts.
print(f"Syncing data for {user_id}...")
raise APIError("Service Unavailable")
The autoretry_for parameter takes a tuple of exception classes. If any of these are raised during task execution, Celery will catch them and call self.retry for you.
Step 3: Configuring Exponential Backoff and Jitter
To prevent a "thundering herd" problem where many tasks retry at the exact same time, you should use exponential backoff and jitter.
- Exponential Backoff: Increases the delay after each failure (e.g., 1s, 2s, 4s, 8s).
- Jitter: Adds a random element to the delay to spread out the load.
@shared_task(
bind=True,
autoretry_for=(Exception,),
retry_backoff=5, # Initial delay factor in seconds
retry_backoff_max=600, # Maximum delay (10 minutes)
retry_jitter=True, # Add randomization to the delay
retry_kwargs={'max_retries': 3}
)
def error_backoff_test(self):
print("Attempting operation...")
raise Exception("Transient failure")
In this configuration:
retry_backoff=5means the first retry happens after 5s, the second after 10s, the third after 20s, and so on.retry_jitter=True(the default) adds a random number of seconds to these intervals to ensure workers don't synchronize their retry attempts.
Step 4: Handling Maximum Retry Limits
By default, Task.max_retries is set to 3. When a task exceeds this limit, it raises a MaxRetriesExceededError. You can handle this by overriding the limit or catching the error.
from celery.exceptions import MaxRetriesExceededError
@shared_task(bind=True, max_retries=2)
def critical_task(self):
try:
raise RuntimeError("Critical failure")
except RuntimeError as exc:
try:
raise self.retry(exc=exc)
except MaxRetriesExceededError:
print("Max retries reached. Logging permanent failure.")
# Perform cleanup or notify administrators
raise
Complete Result
Combining these strategies results in a highly resilient task:
@shared_task(
bind=True,
autoretry_for=(APIError,),
retry_backoff=True, # Uses default factor of 1
retry_backoff_max=300,
retry_kwargs={'max_retries': 10}
)
def resilient_api_call(self, data):
"""
A task that retries up to 10 times with exponential backoff
when an APIError occurs.
"""
# Task logic here...
pass
Next Steps
- Explore Task Acknowledgment settings like
acks_lateincelery/app/task.pyto ensure tasks are only removed from the queue after successful execution. - Use
on_retryandon_failurehandlers in yourTaskclass to implement custom logging for every retry attempt.