Task Tracing and Error Handling
Task execution in Celery is managed through a tracing mechanism that wraps the user-defined task function. This system, primarily implemented in celery/app/trace.py, is responsible for state transitions, result persistence, signal dispatching, and robust error handling.
The Execution Loop
The core of task execution is the "tracer" function, generated by build_tracer. This factory function creates a highly optimized closure, trace_task, which encapsulates the entire lifecycle of a task execution.
The tracer manages several critical responsibilities:
- Request Context: It pushes the task request onto the
request_stackand ensures it is popped in afinallyblock. - Lifecycle Signals: It triggers
task_prerun,task_postrun, andtask_successsignals. - State Transitions: It catches exceptions and maps them to Celery states (e.g.,
SUCCESS,FAILURE,RETRY). - Result Persistence: It interacts with the result backend to store the task's outcome.
# Simplified view of the tracing logic in celery/app/trace.py
try:
# -*- TRACE -*-
try:
if task_before_start:
task_before_start(uuid, args, kwargs)
R = retval = fun(*args, **kwargs)
state = SUCCESS
except Reject as exc:
I, R = Info(REJECTED, exc), ExceptionInfo(internal=True)
# ... handle rejection
except Exception as exc:
I, R, state, retval = on_error(task_request, exc)
# ... handle failure
State Management with TraceInfo
The TraceInfo class (aliased as Info within build_tracer) acts as a container for the task's execution state and return value. It provides specialized handlers for different terminal and semi-terminal states.
Handling Failures
When a task raises an unhandled exception, TraceInfo.handle_failure is invoked. This method performs several critical safety operations:
- Pickleability: It uses
get_pickleable_exceptionto ensure the exception can be safely serialized and sent back to the result backend or parent process. - Backend Update: It calls
task.backend.mark_as_failure, which persists the exception and traceback. - Hooks and Signals: It triggers the
on_failuretask hook and sends thetask_failuresignal.
Handling Retries
Retries are handled via TraceInfo.handle_retry. Unlike failures, retries are often expected transitions. The handler uses task.backend.mark_as_retry and triggers the on_retry hook. A key design detail is that the Retry exception itself contains the original exception, which is extracted and logged.
Memory Management and Tracebacks
A significant challenge in Python's exception handling is memory retention caused by traceback objects. Tracebacks hold references to frame objects, which in turn hold references to all local variables in the scope where the exception occurred.
To prevent memory leaks (specifically addressed in Issue #8882), this project utilizes traceback_clear. This utility is called in both TraceInfo handlers and the main trace_task loop to break reference cycles:
# Example from TraceInfo.handle_failure in celery/app/trace.py
try:
# ... logic to mark failure ...
return einfo
finally:
# MEMORY LEAK FIX: Clean up any direct traceback references
if tb_ref is not None:
del tb_ref
traceback_clear(exc)
Deduplication and Late Acknowledgment
For tasks configured with late acknowledgment (task_acks_late), there is a risk of duplicate execution if a worker crashes after completing a task but before acknowledging the message.
The tracer implements a deduplication strategy using the worker_deduplicate_successful_tasks configuration. Before executing a task, the tracer checks if the task ID is already marked as SUCCESS in the result backend. If it is, the tracer skips execution and re-dispatches any associated chains or callbacks to ensure the workflow continues without re-running the idempotent-unsafe task body.
# Deduplication logic in build_tracer
if deduplicate_successful_tasks and redelivered:
if task_request.id in successful_requests:
return trace_ok_t(R, I, T, Rstr)
r = AsyncResult(task_request.id, app=app)
if r.state == SUCCESS:
# ... skip execution and dispatch callbacks ...
Internal Error Handling
Errors that occur outside the task body (e.g., during result encoding or signal dispatch) are treated as "internal errors." These are handled by _signal_internal_error and report_internal_error. These functions ensure that even if the tracing infrastructure itself encounters an issue, the error is logged, and the worker remains stable, typically by reporting a FAILURE state with internal=True metadata.