Skip to main content

Scheduler Persistence and State

In a distributed task queue, ensuring that periodic tasks run exactly when intended—even across service restarts—is a critical requirement. This project implements this through a combination of in-memory heap management and a file-based persistence layer. The core logic resides in the celery.beat module, primarily within the Scheduler and PersistentScheduler classes.

The Scheduling Heartbeat

The scheduling process is orchestrated by the Service class, which runs a continuous loop. In its start method, it repeatedly calls self.scheduler.tick() to determine the next action:

# celery/beat.py:644
while not self._is_shutdown.is_set():
interval = self.scheduler.tick()
if interval and interval > 0.0:
debug('beat: Waking up %s.',
humanize_seconds(interval, prefix='in '))
time.sleep(interval)
if self.scheduler.should_sync():
self.scheduler._do_sync()

The tick() method is the engine of the scheduler. It uses a min-heap (self._heap) to efficiently track which task is due next. When a task is due, the scheduler "reserves" it, updates its internal state, and dispatches it via a producer.

State Tracking via ScheduleEntry

Every task in the schedule is represented by a ScheduleEntry object. This class is responsible for tracking the execution state of a specific task:

  • last_run_at: The timestamp of the last time the task was scheduled.
  • total_run_count: A counter of how many times the task has been executed.

When a task is executed, the reserve method creates a new version of the entry with updated timestamps and counters:

# celery/beat.py:134
def _next_instance(self, last_run_at=None):
"""Return new instance, with date and count fields updated."""
return self.__class__(**dict(
self,
last_run_at=last_run_at or self.default_now(),
total_run_count=self.total_run_count + 1,
))

This state is what allows the scheduler to calculate the next run time accurately using the task's specific schedule (e.g., crontab or solar).

Persistent Storage with Shelve

While the base Scheduler keeps this state in memory, the PersistentScheduler subclass extends this by backing the schedule with a shelve database. This ensures that if the celery beat process is restarted, it can resume from where it left off rather than resetting all task timers.

The PersistentScheduler manages a local file (defaulting to celerybeat-schedule) and handles various database formats through the known_suffixes attribute:

# celery/beat.py:505
class PersistentScheduler(Scheduler):
"""Scheduler backed by :mod:`shelve` database."""
persistence = shelve
known_suffixes = ('', '.db', '.dat', '.bak', '.dir')

The Sync Strategy

Writing to disk on every single task execution would be a significant performance bottleneck. Instead, the project implements a buffered sync strategy. The Scheduler.should_sync() method determines if it is time to flush the in-memory state to the shelve database based on two criteria:

  1. Time-based: Controlled by sync_every (defaulting to 3 minutes).
  2. Task-based: Controlled by sync_every_tasks (if configured).
# celery/beat.py:381
def should_sync(self):
return (
(not self._last_sync or
(time.monotonic() - self._last_sync) > self.sync_every) or
(self.sync_every_tasks and
self._tasks_since_sync >= self.sync_every_tasks)
)

Reliability and Consistency

The implementation includes several safeguards to maintain the integrity of the schedule state.

Timezone and UTC Resets

If the global configuration for timezone or enable_utc changes, the PersistentScheduler will proactively clear the existing database. This is a deliberate design choice to prevent tasks from firing at incorrect times due to stale timezone offsets stored in the persistent file.

# celery/beat.py:545
tz = self.app.conf.timezone
stored_tz = self._store.get('tz')
if stored_tz is not None and stored_tz != tz:
warning('Reset: Timezone changed from %r to %r', stored_tz, tz)
self._store.clear() # Timezone changed, reset db!

Corruption Recovery

Local database files can become corrupted due to unexpected shutdowns or filesystem issues. PersistentScheduler is designed to detect these failures during the setup_schedule phase. If it encounters errors like UnpicklingError or dbm.error, it automatically removes the corrupted file and starts fresh:

# celery/beat.py:525
def _destroy_open_corrupted_schedule(self, exc):
error('Removing corrupted schedule file %r: %r',
self.schedule_filename, exc, exc_info=True)
self._remove_db()
return self._open_schedule()

Drift Adjustment

To account for the precision limits of time.sleep(), the scheduler applies a small negative drift (-0.010s) in the adjust method. This ensures the process wakes up slightly before the task is due, minimizing the risk of missing a scheduling window.