Skip to main content

Tracking Task State with AsyncResult

In this tutorial, you will learn how to use the AsyncResult class to track the lifecycle of a Celery task. You will build a script that launches a task, monitors its progress without blocking, retrieves the final value, and handles potential errors.

Prerequisites

To follow this guide, you must have a Celery application configured with a Result Backend (such as Redis, RabbitMQ/RPC, or a database). Without a backend, task states cannot be persisted or retrieved.

from celery import Celery

# Example configuration using RPC as a backend
app = Celery('tasks', broker='pyamqp://guest@localhost//', backend='rpc://')

@app.task
def add(x, y):
return x + y

Step 1: Launching a Task and Obtaining a Result Handle

When you call a task using .delay() or .apply_async(), Celery does not return the actual result of the function. Instead, it returns an AsyncResult instance. This object acts as a "promise" or a handle to the future result.

from celery.result import AsyncResult

# Launch the task
result = add.delay(4, 4)

# The result object contains the unique Task ID
print(f"Task ID: {result.id}")
print(f"Type: {type(result)}")

The AsyncResult object (defined in celery/result.py) is initialized with the task's UUID and the application's backend. It allows you to check the status of the task at any time using that ID.

Step 2: Checking Task Status Without Blocking

You can check if a task has finished without stopping your program's execution. This is useful for building progress bars or UI updates.

import time

while not result.ready():
print(f"Current State: {result.state}")
time.sleep(1)

if result.successful():
print("Task completed successfully!")
elif result.failed():
print("Task failed.")
  • result.ready(): Returns True if the task has executed (reached a final state like SUCCESS or FAILURE).
  • result.state: Returns the current string status (e.g., PENDING, STARTED, SUCCESS).
  • result.successful(): Specifically checks if the state is SUCCESS.

Step 3: Retrieving the Result with a Timeout

To get the actual return value of the task, use the .get() method. In production, you should always use a timeout to prevent your application from hanging indefinitely if a worker fails.

try:
# Wait up to 10 seconds for the result
value = result.get(timeout=10)
print(f"The result is: {value}")
except TimeoutError:
print("The task took too long to complete.")

The get() method blocks the current thread until the task is ready.

Warning: Never call result.get() inside a Celery task. This can lead to deadlocks where tasks are waiting for each other while consuming all available worker slots. The AsyncResult.get implementation in this codebase includes a disable_sync_subtasks check (enabled by default) to help prevent this.

Step 4: Handling Task Failures and Tracebacks

If a task raises an exception, result.get() will re-raise that exception in your local process by default. You can inspect the AsyncResult object to get details about what went wrong.

@app.task
def division_error():
return 1 / 0

result = division_error.delay()

try:
result.get(propagate=True)
except ZeroDivisionError as exc:
print(f"Caught expected error: {exc}")
print(f"Remote Traceback:\n{result.traceback}")
  • propagate=True: (Default) Re-raises the remote exception locally.
  • result.result: Contains the exception instance if the task failed.
  • result.traceback: Contains the stack trace from the worker where the failure occurred.

Step 5: Cleaning Up and Revoking Tasks

Result backends consume resources. Once you are finished with a result, you should tell Celery to clean it up. Additionally, if a task is no longer needed, you can attempt to cancel it.

# 1. Revoke a task that hasn't started yet (or terminate it if it has)
result.revoke(terminate=True, signal='SIGKILL')

# 2. Remove the result from the backend to free up space
result.forget()

# 3. Verify if the result still exists in the backend
exists = result.exists()
print(f"Does result exist in backend? {exists}")
  • revoke(): Sends a signal to workers to ignore the task. If terminate=True is passed, it will also attempt to kill the process currently executing the task.
  • forget(): Removes the task's metadata and result from the backend. You should call this (or get()) on every task to avoid resource leaks in your backend.
  • exists(): A utility method to check if the backend actually has a record for this task ID, helping distinguish between a "Pending" task and one that never existed.

Complete Working Example

Here is how these pieces fit together in a single workflow:

from celery.result import AsyncResult
from proj.celery import app
from proj.tasks import add

def process_task(x, y):
# Dispatch
res = add.delay(x, y)

try:
# Monitor
print(f"Processing task {res.id}...")
final_val = res.get(timeout=5)
print(f"Success: {final_val}")
except Exception as e:
print(f"Task {res.id} failed: {e}")
finally:
# Always cleanup
res.forget()

if __name__ == "__main__":
process_task(10, 20)

Next Steps

  • Explore ResultSet and GroupResult to manage multiple AsyncResult objects at once.
  • Use result.collect() for complex workflows where tasks return other tasks (chains or groups), allowing you to iterate through the entire result tree as it completes.