Asynchronous Task Execution
Asynchronous task execution in this project is primarily handled through the Task class, which provides two main methods for triggering tasks: delay and apply_async. While delay is a convenient shortcut for simple calls, apply_async offers granular control over how and when a task is executed.
Basic Execution with delay and apply_async
To trigger a task asynchronously with positional and keyword arguments, use the delay method. This is the most common way to offload work to a worker.
from proj.celery import app
@app.task
def add(x, y):
return x + y
# Using delay for simple execution
result = add.delay(16, 16)
print(result.get()) # 32
If you need to pass execution options (like a countdown or a specific queue), use apply_async. Note that apply_async requires arguments to be passed as a tuple (args) and keyword arguments as a dictionary (kwargs).
# Using apply_async with arguments and options
result = add.apply_async(args=(16, 16), countdown=5)
Scheduling Tasks with Countdowns and ETAs
You can delay the execution of a task by providing either a relative countdown (in seconds) or an absolute eta (Estimated Time of Arrival).
from datetime import datetime, timedelta, timezone
# Execute in 3 seconds
add.apply_async((1, 1), countdown=3)
# Execute at a specific time in the future
scheduled_time = datetime.now(timezone.utc) + timedelta(seconds=10)
add.apply_async((2, 2), eta=scheduled_time)
Setting Task Expiration
The expires option prevents a task from being executed if it has been sitting in the queue for too long. This is useful for time-sensitive operations that become irrelevant after a certain period.
# Task expires in 10 seconds if not picked up by a worker
add.apply_async((1, 1), expires=10)
# Task expires at a specific datetime
expiry_time = datetime.now(timezone.utc) + timedelta(minutes=1)
add.apply_async((1, 1), expires=expiry_time)
If a task expires before a worker starts it, the task will be marked as REVOKED.
Using Custom Task IDs
By default, Celery generates a unique UUID4 for every task. You can override this by providing your own task_id. This is helpful for tracking tasks across different systems or ensuring idempotency.
# Trigger a task with a pre-defined ID
result = add.apply_async((1, 1), task_id='unique-external-id-123')
assert result.id == 'unique-external-id-123'
Execution Limits and Priorities
You can control the execution environment of a task by setting time limits and priorities.
- Time Limits:
time_limit(hard limit) andsoft_time_limit(allows the task to catch an exception and clean up). - Priority: A number between 0 and 9. The behavior depends on the broker (e.g., RabbitMQ uses higher numbers for higher priority, while Redis uses 0 as the highest).
# Execute with a 30-second hard limit and 20-second soft limit
add.apply_async((1, 1), time_limit=30, soft_time_limit=20)
# Execute with high priority
add.apply_async((1, 1), priority=9)
Accessing Execution Options in the Task
Inside the task body, you can access the execution options and metadata via self.request (if the task is bound).
@app.task(bind=True)
def check_context(self):
print(f"Task ID: {self.request.id}")
print(f"Time Limit: {self.request.time_limit}")
print(f"Is Eager: {self.request.is_eager}")
Troubleshooting and Gotchas
- Time Limit Validation: If you provide both
soft_time_limitandtime_limit, the soft limit must be less than or equal to the hard limit. Otherwise,apply_asyncwill raise aValueError. - Eager Execution: If
task_always_eageris enabled in your configuration,apply_asyncwill execute the task locally and block until it returns, rather than sending it to a worker. - Retries and Exceptions: When calling
self.retry(), it raises acelery.exceptions.Retryexception. Any code following theraise self.retry(...)statement will not be executed. - Task ID Collisions: When manually setting
task_id, ensure the IDs are unique to avoid overwriting results or interfering with other tasks in the backend.