Tracking Task State with AsyncResult
In this tutorial, you will learn how to use the AsyncResult class to track the lifecycle of a Celery task. You will build a script that launches a task, monitors its progress without blocking, retrieves the final value, and handles potential errors.
Prerequisites
To follow this guide, you must have a Celery application configured with a Result Backend (such as Redis, RabbitMQ/RPC, or a database). Without a backend, task states cannot be persisted or retrieved.
from celery import Celery
# Example configuration using RPC as a backend
app = Celery('tasks', broker='pyamqp://guest@localhost//', backend='rpc://')
@app.task
def add(x, y):
return x + y
Step 1: Launching a Task and Obtaining a Result Handle
When you call a task using .delay() or .apply_async(), Celery does not return the actual result of the function. Instead, it returns an AsyncResult instance. This object acts as a "promise" or a handle to the future result.
from celery.result import AsyncResult
# Launch the task
result = add.delay(4, 4)
# The result object contains the unique Task ID
print(f"Task ID: {result.id}")
print(f"Type: {type(result)}")
The AsyncResult object (defined in celery/result.py) is initialized with the task's UUID and the application's backend. It allows you to check the status of the task at any time using that ID.
Step 2: Checking Task Status Without Blocking
You can check if a task has finished without stopping your program's execution. This is useful for building progress bars or UI updates.
import time
while not result.ready():
print(f"Current State: {result.state}")
time.sleep(1)
if result.successful():
print("Task completed successfully!")
elif result.failed():
print("Task failed.")
result.ready(): ReturnsTrueif the task has executed (reached a final state likeSUCCESSorFAILURE).result.state: Returns the current string status (e.g.,PENDING,STARTED,SUCCESS).result.successful(): Specifically checks if the state isSUCCESS.
Step 3: Retrieving the Result with a Timeout
To get the actual return value of the task, use the .get() method. In production, you should always use a timeout to prevent your application from hanging indefinitely if a worker fails.
try:
# Wait up to 10 seconds for the result
value = result.get(timeout=10)
print(f"The result is: {value}")
except TimeoutError:
print("The task took too long to complete.")
The get() method blocks the current thread until the task is ready.
Warning: Never call
result.get()inside a Celery task. This can lead to deadlocks where tasks are waiting for each other while consuming all available worker slots. TheAsyncResult.getimplementation in this codebase includes adisable_sync_subtaskscheck (enabled by default) to help prevent this.
Step 4: Handling Task Failures and Tracebacks
If a task raises an exception, result.get() will re-raise that exception in your local process by default. You can inspect the AsyncResult object to get details about what went wrong.
@app.task
def division_error():
return 1 / 0
result = division_error.delay()
try:
result.get(propagate=True)
except ZeroDivisionError as exc:
print(f"Caught expected error: {exc}")
print(f"Remote Traceback:\n{result.traceback}")
propagate=True: (Default) Re-raises the remote exception locally.result.result: Contains the exception instance if the task failed.result.traceback: Contains the stack trace from the worker where the failure occurred.
Step 5: Cleaning Up and Revoking Tasks
Result backends consume resources. Once you are finished with a result, you should tell Celery to clean it up. Additionally, if a task is no longer needed, you can attempt to cancel it.
# 1. Revoke a task that hasn't started yet (or terminate it if it has)
result.revoke(terminate=True, signal='SIGKILL')
# 2. Remove the result from the backend to free up space
result.forget()
# 3. Verify if the result still exists in the backend
exists = result.exists()
print(f"Does result exist in backend? {exists}")
revoke(): Sends a signal to workers to ignore the task. Ifterminate=Trueis passed, it will also attempt to kill the process currently executing the task.forget(): Removes the task's metadata and result from the backend. You should call this (orget()) on every task to avoid resource leaks in your backend.exists(): A utility method to check if the backend actually has a record for this task ID, helping distinguish between a "Pending" task and one that never existed.
Complete Working Example
Here is how these pieces fit together in a single workflow:
from celery.result import AsyncResult
from proj.celery import app
from proj.tasks import add
def process_task(x, y):
# Dispatch
res = add.delay(x, y)
try:
# Monitor
print(f"Processing task {res.id}...")
final_val = res.get(timeout=5)
print(f"Success: {final_val}")
except Exception as e:
print(f"Task {res.id} failed: {e}")
finally:
# Always cleanup
res.forget()
if __name__ == "__main__":
process_task(10, 20)
Next Steps
- Explore
ResultSetandGroupResultto manage multipleAsyncResultobjects at once. - Use
result.collect()for complex workflows where tasks return other tasks (chains or groups), allowing you to iterate through the entire result tree as it completes.