Skip to main content

Implementing Task Retries

In distributed systems, transient failures like network timeouts or temporary service unavailability are common. This tutorial walks you through implementing robust task retries using the Task class to ensure your background jobs are resilient.

What You Will Build

You will create a task that simulates a network request to an external API. You will implement:

  1. Manual Retries: Explicitly catching exceptions and re-queueing the task.
  2. Automatic Retries: Using decorator configurations to handle specific errors.
  3. Exponential Backoff: Increasing the delay between retries to avoid overwhelming failing services.

Prerequisites

You must have a Celery application instance configured. For this tutorial, we assume an app instance is available.

Step 1: Implementing Manual Retries

When you need fine-grained control over when a task should be retried, use the self.retry method. To access this method, you must set bind=True in your task decorator.

Create a task that retries when a specific exception occurs:

from celery import shared_task

class APIError(Exception):
"""Custom exception for API failures."""
pass

@shared_task(bind=True)
def send_notification(self, user_id, message):
try:
# Simulate a failing network request
print(f"Sending notification to {user_id}...")
raise APIError("Connection timed out")
except APIError as exc:
# Retry in 60 seconds
raise self.retry(exc=exc, countdown=60)

In this example, bind=True makes the task instance available as the first argument (self). The self.retry method re-queues the task. We use raise self.retry(...) to ensure that the current task execution stops immediately and the worker is notified of the retry state.

Step 2: Using Automatic Retries

For common transient failures, you can use the autoretry_for argument in the task decorator. This reduces boilerplate code by automatically catching specified exceptions.

@shared_task(
autoretry_for=(APIError, ConnectionError),
retry_kwargs={'max_retries': 5},
default_retry_delay=30
)
def sync_user_data(user_id):
# If APIError or ConnectionError is raised,
# Celery automatically retries up to 5 times
# with a 30-second delay between attempts.
print(f"Syncing data for {user_id}...")
raise APIError("Service Unavailable")

The autoretry_for parameter takes a tuple of exception classes. If any of these are raised during task execution, Celery will catch them and call self.retry for you.

Step 3: Configuring Exponential Backoff and Jitter

To prevent a "thundering herd" problem where many tasks retry at the exact same time, you should use exponential backoff and jitter.

  • Exponential Backoff: Increases the delay after each failure (e.g., 1s, 2s, 4s, 8s).
  • Jitter: Adds a random element to the delay to spread out the load.
@shared_task(
bind=True,
autoretry_for=(Exception,),
retry_backoff=5, # Initial delay factor in seconds
retry_backoff_max=600, # Maximum delay (10 minutes)
retry_jitter=True, # Add randomization to the delay
retry_kwargs={'max_retries': 3}
)
def error_backoff_test(self):
print("Attempting operation...")
raise Exception("Transient failure")

In this configuration:

  • retry_backoff=5 means the first retry happens after 5s, the second after 10s, the third after 20s, and so on.
  • retry_jitter=True (the default) adds a random number of seconds to these intervals to ensure workers don't synchronize their retry attempts.

Step 4: Handling Maximum Retry Limits

By default, Task.max_retries is set to 3. When a task exceeds this limit, it raises a MaxRetriesExceededError. You can handle this by overriding the limit or catching the error.

from celery.exceptions import MaxRetriesExceededError

@shared_task(bind=True, max_retries=2)
def critical_task(self):
try:
raise RuntimeError("Critical failure")
except RuntimeError as exc:
try:
raise self.retry(exc=exc)
except MaxRetriesExceededError:
print("Max retries reached. Logging permanent failure.")
# Perform cleanup or notify administrators
raise

Complete Result

Combining these strategies results in a highly resilient task:

@shared_task(
bind=True,
autoretry_for=(APIError,),
retry_backoff=True, # Uses default factor of 1
retry_backoff_max=300,
retry_kwargs={'max_retries': 10}
)
def resilient_api_call(self, data):
"""
A task that retries up to 10 times with exponential backoff
when an APIError occurs.
"""
# Task logic here...
pass

Next Steps

  • Explore Task Acknowledgment settings like acks_late in celery/app/task.py to ensure tasks are only removed from the queue after successful execution.
  • Use on_retry and on_failure handlers in your Task class to implement custom logging for every retry attempt.