Skip to main content

Asynchronous Task Execution

Asynchronous task execution in this project is primarily handled through the Task class, which provides two main methods for triggering tasks: delay and apply_async. While delay is a convenient shortcut for simple calls, apply_async offers granular control over how and when a task is executed.

Basic Execution with delay and apply_async

To trigger a task asynchronously with positional and keyword arguments, use the delay method. This is the most common way to offload work to a worker.

from proj.celery import app

@app.task
def add(x, y):
return x + y

# Using delay for simple execution
result = add.delay(16, 16)
print(result.get()) # 32

If you need to pass execution options (like a countdown or a specific queue), use apply_async. Note that apply_async requires arguments to be passed as a tuple (args) and keyword arguments as a dictionary (kwargs).

# Using apply_async with arguments and options
result = add.apply_async(args=(16, 16), countdown=5)

Scheduling Tasks with Countdowns and ETAs

You can delay the execution of a task by providing either a relative countdown (in seconds) or an absolute eta (Estimated Time of Arrival).

from datetime import datetime, timedelta, timezone

# Execute in 3 seconds
add.apply_async((1, 1), countdown=3)

# Execute at a specific time in the future
scheduled_time = datetime.now(timezone.utc) + timedelta(seconds=10)
add.apply_async((2, 2), eta=scheduled_time)

Setting Task Expiration

The expires option prevents a task from being executed if it has been sitting in the queue for too long. This is useful for time-sensitive operations that become irrelevant after a certain period.

# Task expires in 10 seconds if not picked up by a worker
add.apply_async((1, 1), expires=10)

# Task expires at a specific datetime
expiry_time = datetime.now(timezone.utc) + timedelta(minutes=1)
add.apply_async((1, 1), expires=expiry_time)

If a task expires before a worker starts it, the task will be marked as REVOKED.

Using Custom Task IDs

By default, Celery generates a unique UUID4 for every task. You can override this by providing your own task_id. This is helpful for tracking tasks across different systems or ensuring idempotency.

# Trigger a task with a pre-defined ID
result = add.apply_async((1, 1), task_id='unique-external-id-123')
assert result.id == 'unique-external-id-123'

Execution Limits and Priorities

You can control the execution environment of a task by setting time limits and priorities.

  • Time Limits: time_limit (hard limit) and soft_time_limit (allows the task to catch an exception and clean up).
  • Priority: A number between 0 and 9. The behavior depends on the broker (e.g., RabbitMQ uses higher numbers for higher priority, while Redis uses 0 as the highest).
# Execute with a 30-second hard limit and 20-second soft limit
add.apply_async((1, 1), time_limit=30, soft_time_limit=20)

# Execute with high priority
add.apply_async((1, 1), priority=9)

Accessing Execution Options in the Task

Inside the task body, you can access the execution options and metadata via self.request (if the task is bound).

@app.task(bind=True)
def check_context(self):
print(f"Task ID: {self.request.id}")
print(f"Time Limit: {self.request.time_limit}")
print(f"Is Eager: {self.request.is_eager}")

Troubleshooting and Gotchas

  • Time Limit Validation: If you provide both soft_time_limit and time_limit, the soft limit must be less than or equal to the hard limit. Otherwise, apply_async will raise a ValueError.
  • Eager Execution: If task_always_eager is enabled in your configuration, apply_async will execute the task locally and block until it returns, rather than sending it to a worker.
  • Retries and Exceptions: When calling self.retry(), it raises a celery.exceptions.Retry exception. Any code following the raise self.retry(...) statement will not be executed.
  • Task ID Collisions: When manually setting task_id, ensure the IDs are unique to avoid overwriting results or interfering with other tasks in the backend.