Skip to main content

The Task Request Lifecycle

The task request lifecycle in Celery is managed by the celery.worker.request.Request class, which acts as a state machine for an individual task's execution. This lifecycle begins when a message is received from the broker and ends when the task is either acknowledged, rejected, or retried.

Message Reception and Strategy

When a task message arrives at the worker, it is first handled by a "strategy" defined in celery.worker.strategy. The default strategy (celery.worker.strategy.default) is responsible for converting the raw message into a Request object.

Protocol Conversion

Celery supports multiple message protocols. The strategy handles the conversion of older protocol versions (v1 or hybrid) to the internal protocol v2 format using proto1_to_proto2 or hybrid_to_proto2. This ensures that the Request object receives a consistent set of headers and body structure.

Request Instantiation

The strategy creates a specialized Request class for the specific task using create_request_cls. This class is then instantiated with the message, acknowledgment callbacks, and task metadata:

# From celery/worker/strategy.py
req = Req(
message,
on_ack=ack, on_reject=reject, app=app, hostname=hostname,
eventer=eventer, task=task, connection_errors=connection_errors,
body=body, headers=headers, decoded=decoded, utc=utc,
)

Pre-Execution Checks

Before a task is sent to the execution pool, the worker performs several checks to determine if the task should proceed.

Revocation and Expiration

The Request.revoked() method checks if a task has been cancelled or if it has expired. Revocation can happen by task ID or via "stamped headers," which allow for revoking groups of tasks.

# From celery/worker/request.py
def revoked(self):
# ...
if self.expires:
expired = self.maybe_expire()
revoked_by_id = self.id in revoked_tasks
# ... check stamped headers ...
if any((expired, revoked_by_id, revoked_by_header)):
self._announce_revoked(...)
return True
return False

If a task is revoked, it is discarded immediately, and an acknowledgment is sent to the broker to remove it from the queue.

ETA and Rate Limiting

If a task has an eta (Estimated Time of Arrival) or if rate limits are enabled, the strategy schedules the task using the consumer's timer (consumer.timer.call_at) or places it in a bucket for rate limiting instead of executing it immediately.

Execution via the Pool

Once a task is ready for execution, the worker calls Request.execute_using_pool(pool). This method dispatches the task to the concurrency layer (e.g., billiard processes or threads).

# From celery/worker/request.py
def execute_using_pool(self, pool: BasePool, **kwargs):
# ...
result = pool.apply_async(
trace,
args=(self._type, task_id, self._request_dict, self._body,
self._content_type, self._content_encoding),
accept_callback=self.on_accepted,
timeout_callback=self.on_timeout,
callback=self.on_success,
error_callback=self.on_failure,
# ...
)
self._apply_result = maybe(ref, result)
return result

The Request object provides several callbacks that the pool invokes at different stages of execution:

  • on_accepted: Called when a worker process/thread picks up the task. It records the worker_pid and time_start.
  • on_success: Called when the task completes without raising an unhandled exception.
  • on_failure: Called when the task raises an exception or the worker process crashes.
  • on_timeout: Called if the task exceeds its time_limit or soft_time_limit.

Acknowledgment Lifecycle

Acknowledgment is the mechanism by which Celery tells the broker that a message has been processed and can be deleted. The timing of this depends on the acks_late setting.

Immediate Acknowledgment (Default)

By default, tasks are acknowledged as soon as they are accepted by the worker pool in on_accepted. This prevents the task from being redelivered if the worker crashes during execution.

# From celery/worker/request.py
def on_accepted(self, pid, time_accepted):
self.worker_pid = pid
self.time_start = time() - (monotonic() - time_accepted)
if not self.task.acks_late:
self.acknowledge()
self.send_event('task-started')

Late Acknowledgment (acks_late=True)

If acks_late is enabled, the task is only acknowledged after it has successfully completed (on_success) or failed in a way that doesn't require a retry (on_failure). This provides higher reliability at the cost of potential duplicate executions if a worker crashes.

Termination and Cleanup

The Request class handles various termination scenarios to ensure the worker remains stable.

Timeouts

When a task times out, on_failure or on_timeout is triggered. A critical part of the on_timeout implementation is memory management. To prevent memory leaks caused by reference cycles in tracebacks, the code explicitly clears frame locals:

# From celery/worker/request.py
def on_timeout(self, soft, timeout):
# ...
try:
# ...
traceback_clear(exc)
finally:
if einfo is not None:
del einfo
exc.__traceback__ = None

Cancellation

If the worker needs to shut down or a connection is lost, Request.cancel or Request.terminate may be called. These methods interact with the execution pool to stop the specific job and optionally mark the task for retry in the backend.