Persistent State Management
To persist worker state such as revoked task IDs and the logical clock across restarts, use the StateDB component by providing a path to a local database file.
Enabling Persistent State
You can enable state persistence by using the --statedb command-line argument or the worker_state_db configuration setting.
Via Command Line
When starting the worker, specify the path to the state file. You can use the %n placeholder to include the node name in the filename, which is useful when running multiple workers on the same machine.
celery -A proj worker --statedb=/var/run/celery/worker-%n.state
Via Configuration
Alternatively, set the worker_state_db option in your Celery application configuration:
from celery import Celery
app = Celery('tasks')
app.conf.update(
worker_state_db='/var/run/celery/worker.state',
)
How it Works
The persistence mechanism is managed by two main components:
StateDB(celery.worker.components): A worker bootstep that initializes the persistence layer if a database path is provided. It registers anatexithandler to ensure state is saved when the worker shuts down normally.Persistent(celery.worker.state): The class responsible for the actual storage logic. It uses Python'sshelvemodule to store data andzlibto compress the revoked task list.
When the worker starts, the StateDB.create() method instantiates Persistent, which immediately calls merge() to load existing state from the disk into the worker's memory.
# celery/worker/components.py
class StateDB(bootsteps.Step):
"""Bootstep that sets up between-restart state database file."""
def __init__(self, w, **kwargs):
self.enabled = w.statedb
w._persistence = None
super().__init__(w, **kwargs)
def create(self, w):
w._persistence = w.state.Persistent(w.state, w.statedb, w.app.clock)
atexit.register(w._persistence.save)
Data Persisted
The Persistent class specifically tracks two pieces of information:
- Revoked Tasks: The
celery.worker.state.revokedset, which contains IDs of tasks that should not be executed. - Logical Clock: The
app.clockvalue, ensuring that the Lamport logical clock remains consistent across restarts.
The revoked tasks are stored under the key zrevoked in the database, compressed using zlib.
# celery/worker/state.py
def _sync_with(self, d):
self._revoked_tasks.purge()
d.update({
'__proto__': 3,
'zrevoked': self.compress(self._dumps(self._revoked_tasks)),
'clock': self.clock.forward() if self.clock else 0,
})
return d
Configuring Revocation Limits
Because the state database can grow over time, Celery provides environment variables to control the size and expiration of the revoked task set. These are defined in celery/worker/state.py:
CELERY_WORKER_REVOKES_MAX: Maximum number of revoked task IDs to keep (default:50000).CELERY_WORKER_REVOKE_EXPIRES: How many seconds a revoke remains active before expiring (default:10800or 3 hours).
Expired entries are automatically purged during the sync() and merge() operations.
Troubleshooting and Limitations
- Forceful Shutdowns: State is saved using
atexit.register. If the worker process is killed forcefully (e.g.,SIGKILL), thesave()method will not be called, and any state changes since the last sync will be lost. - Limited Scope: Only revoked task IDs and the logical clock are persisted. Active task states, reserved tasks, and the actual task queues are not stored in the
statedbfile; these are managed by the message broker. - File Format: The database uses Python's
shelveandpicklemodules. This makes the state file specific to the Python version used by the worker. Do not share state files between workers running different Python versions. - Permissions: Ensure the worker process has read and write permissions for the directory where the
statedbfile is located.