Skip to main content

Log Auditing and Task Analysis

To audit Celery worker logs and analyze task execution history, use the celery logtool utility. This tool parses log files to track task lifecycle events, identify failures, and generate summary reports.

Generate Task Statistics

To get a high-level summary of task execution, including total counts, success rates, and errors per task type, use the stats command.

celery logtool stats worker.log

This command uses the Audit class to scan the log files and produces a report formatted by REPORT_FORMAT in celery/bin/logtool.py:

Report
======
Task total: 100
Task errors: 5
Task success: 95
Task completed: 100
Tasks
=====
tasks.add: 50
tasks.mul: 50

The Audit.report() method generates this data, utilizing the _task_counts helper class to format the task-specific breakdown.

Identify Incomplete Tasks

If you suspect tasks are hanging or workers are crashing before completion, use the incomplete command to find tasks that were "Received" but never reached a "Ready" state.

celery logtool incomplete worker.log

Internally, this calls Audit.incomplete_tasks(), which performs a symmetric difference between received task IDs and completed task IDs:

# From celery/bin/logtool.py
def incomplete_tasks(self):
return self.ids ^ self.ready

Extract Errors and Tracebacks

To quickly isolate failures without manually searching through large log files, use the errors and traces commands.

Extracting error lines:

celery logtool errors worker.log

Extracting full tracebacks:

celery logtool traces worker.log

The traces command identifies tracebacks by looking for lines that do not match the standard log start pattern (RE_LOG_START) following a valid log entry.

Programmatic Log Auditing

For custom analysis, you can use the Audit class directly in Python. It supports a callback system to handle specific log events as they are encountered.

from celery.bin.logtool import Audit

def my_error_handler(line, task_name, task_id, result):
print(f"Custom Alert: Task {task_name}[{task_id}] failed with {result}")

# Initialize Audit with a custom error callback
audit = Audit(on_task_error=my_error_handler)

# Process log files
audit.run(['worker.log', 'worker.log.1'])

# Access collected data
print(f"Total tasks processed: {len(audit.ids)}")
print(f"Task types encountered: {audit.task_types}")

Custom Event Callbacks

The Audit class provides three primary hooks for custom processing:

  1. on_task_error(line, task_name, task_id, result): Triggered when a task result does not contain the string "succeeded".
  2. on_trace(line): Triggered when a line is identified as part of a traceback (non-matching log start).
  3. on_debug(line): Triggered for log lines that match the start pattern but are not "Received" or "Ready" events.

Troubleshooting and Limitations

  • Log Format Dependency: The Audit class relies on specific regex patterns (e.g., RE_LOG_START, RE_TASK_RECEIVED) defined in celery/bin/logtool.py. If you use a custom log format that changes the timestamp or the "Received"/"Task" prefixes, the tool will not capture events correctly.
  • Memory Usage: The Audit instance stores all task IDs in self.ids and results in self.results. When auditing extremely large log files (millions of tasks), ensure the machine has sufficient RAM to hold these sets and dictionaries.
  • Traceback Detection: Tracebacks are captured by checking if a line fails to match RE_LOG_START (which expects [YYYY-MM-DD ). If your tracebacks or multi-line logs happen to start with a date-like string, they may be misidentified.