Skip to main content

Persistent State Management

To persist worker state such as revoked task IDs and the logical clock across restarts, use the StateDB component by providing a path to a local database file.

Enabling Persistent State

You can enable state persistence by using the --statedb command-line argument or the worker_state_db configuration setting.

Via Command Line

When starting the worker, specify the path to the state file. You can use the %n placeholder to include the node name in the filename, which is useful when running multiple workers on the same machine.

celery -A proj worker --statedb=/var/run/celery/worker-%n.state

Via Configuration

Alternatively, set the worker_state_db option in your Celery application configuration:

from celery import Celery

app = Celery('tasks')
app.conf.update(
worker_state_db='/var/run/celery/worker.state',
)

How it Works

The persistence mechanism is managed by two main components:

  1. StateDB (celery.worker.components): A worker bootstep that initializes the persistence layer if a database path is provided. It registers an atexit handler to ensure state is saved when the worker shuts down normally.
  2. Persistent (celery.worker.state): The class responsible for the actual storage logic. It uses Python's shelve module to store data and zlib to compress the revoked task list.

When the worker starts, the StateDB.create() method instantiates Persistent, which immediately calls merge() to load existing state from the disk into the worker's memory.

# celery/worker/components.py

class StateDB(bootsteps.Step):
"""Bootstep that sets up between-restart state database file."""

def __init__(self, w, **kwargs):
self.enabled = w.statedb
w._persistence = None
super().__init__(w, **kwargs)

def create(self, w):
w._persistence = w.state.Persistent(w.state, w.statedb, w.app.clock)
atexit.register(w._persistence.save)

Data Persisted

The Persistent class specifically tracks two pieces of information:

  • Revoked Tasks: The celery.worker.state.revoked set, which contains IDs of tasks that should not be executed.
  • Logical Clock: The app.clock value, ensuring that the Lamport logical clock remains consistent across restarts.

The revoked tasks are stored under the key zrevoked in the database, compressed using zlib.

# celery/worker/state.py

def _sync_with(self, d):
self._revoked_tasks.purge()
d.update({
'__proto__': 3,
'zrevoked': self.compress(self._dumps(self._revoked_tasks)),
'clock': self.clock.forward() if self.clock else 0,
})
return d

Configuring Revocation Limits

Because the state database can grow over time, Celery provides environment variables to control the size and expiration of the revoked task set. These are defined in celery/worker/state.py:

  • CELERY_WORKER_REVOKES_MAX: Maximum number of revoked task IDs to keep (default: 50000).
  • CELERY_WORKER_REVOKE_EXPIRES: How many seconds a revoke remains active before expiring (default: 10800 or 3 hours).

Expired entries are automatically purged during the sync() and merge() operations.

Troubleshooting and Limitations

  • Forceful Shutdowns: State is saved using atexit.register. If the worker process is killed forcefully (e.g., SIGKILL), the save() method will not be called, and any state changes since the last sync will be lost.
  • Limited Scope: Only revoked task IDs and the logical clock are persisted. Active task states, reserved tasks, and the actual task queues are not stored in the statedb file; these are managed by the message broker.
  • File Format: The database uses Python's shelve and pickle modules. This makes the state file specific to the Python version used by the worker. Do not share state files between workers running different Python versions.
  • Permissions: Ensure the worker process has read and write permissions for the directory where the statedb file is located.