Custom Scheduler Implementations
Custom scheduler implementations in Celery Beat allow for dynamic task scheduling, enabling backends like databases or distributed stores to manage periodic tasks instead of relying on static configuration files. This is achieved by extending the Scheduler and ScheduleEntry classes found in celery/beat.py.
Core Architecture
The scheduling system is built around two primary entities:
ScheduleEntry: Encapsulates the state and configuration of a single periodic task. It tracks execution metadata such aslast_run_atandtotal_run_count, and holds thescheduleobject (e.g., acrontaborinterval) that determines when the task is due.Scheduler: The engine that manages a collection of entries. It maintains an internal heap (_heap) of entries ordered by their next scheduled run time to ensure efficient "ticking."
The Ticking Mechanism
The Service.start loop in celery/beat.py repeatedly calls Scheduler.tick(). This method is the heartbeat of the scheduler:
def tick(self, event_t=event_t, min=min, heappop=heapq.heappop,
heappush=heapq.heappush):
# ... (heap initialization)
H = self._heap
if not H:
return max_interval
event = H[0]
entry = event[2]
is_due, next_time_to_run = self.is_due(entry)
if is_due:
verify = heappop(H)
if verify is event:
next_entry = self.reserve(entry)
self.apply_entry(entry, producer=self.producer)
heappush(H, event_t(self._when(next_entry, next_time_to_run),
event[1], next_entry))
return 0
# ...
When a task is due, the scheduler "reserves" it (updating its run count and timestamp) and dispatches it via apply_entry. The return value of tick() tells the service how long to sleep before the next check.
Extending ScheduleEntry
Custom backends often require ScheduleEntry to carry additional metadata, such as a database primary key or a modified update logic.
The ScheduleEntry uses a functional update pattern via the next() operator (implemented as _next_instance). When a task is reserved, a new instance of the entry is created with updated timestamps:
def _next_instance(self, last_run_at=None):
"""Return new instance, with date and count fields updated."""
return self.__class__(**dict(
self,
last_run_at=last_run_at or self.default_now(),
total_run_count=self.total_run_count + 1,
))
When subclassing, ensure that __init__ can handle the dictionary unpacking used in _next_instance and that any custom persistence fields are preserved during this transition.
Implementing a Custom Scheduler
To create a custom scheduler, you typically subclass Scheduler or PersistentScheduler.
Lifecycle Methods
setup_schedule(): Called during initialization. This is where you should load your initial schedule from the backend. The baseScheduleruses this to merge the staticbeat_schedulefrom configuration.sync(): Called periodically (controlled bysync_everyandsync_every_tasks) and at shutdown. This is the primary hook for persisting the state of entries (likelast_run_at) back to your storage.scheduleproperty: You must provide a mapping-like object (usually via a property) that the base class can use to access entries.
Example: Persistent Backend Pattern
The PersistentScheduler class provides a blueprint for file-based persistence using the shelve module. It demonstrates how to handle environment changes:
class PersistentScheduler(Scheduler):
def setup_schedule(self):
self._store = self._open_schedule()
# Timezone/UTC change detection
tz = self.app.conf.timezone
stored_tz = self._store.get('tz')
if stored_tz is not None and stored_tz != tz:
warning('Reset: Timezone changed from %r to %r', stored_tz, tz)
self._store.clear() # Reset DB to prevent corruption
self.merge_inplace(self.app.conf.beat_schedule)
self.install_default_entries(self.schedule)
self.sync()
def sync(self):
if self._store is not None:
self._store.sync()
Custom Task Dispatching
If you need to change how tasks are sent to the broker, override send_task. This is common in testing environments, as seen in t/unit/app/test_beat.py:
class mScheduler(beat.Scheduler):
def __init__(self, *args, **kwargs):
self.sent = []
super().__init__(*args, **kwargs)
def send_task(self, name=None, args=None, kwargs=None, **options):
self.sent.append({'name': name, 'args': args, 'kwargs': kwargs})
return self.app.AsyncResult(uuid())
Design Tradeoffs and Constraints
Heap Management vs. Dynamic Updates
The Scheduler uses a heapq for $O(1)$ access to the next due task. However, this means that if you modify the underlying schedule data (e.g., adding a task to a database), the heap becomes stale. The base Scheduler detects changes in the schedule dict during tick() by comparing it to old_schedulers:
if (self._heap is None or
not self.schedules_equal(self.old_schedulers, self.schedule)):
self.old_schedulers = copy.copy(self.schedule)
self.populate_heap()
For high-frequency dynamic updates, rebuilding the entire heap via populate_heap() can become a performance bottleneck.
Timezone Sensitivity
PersistentScheduler is designed to be defensive regarding timezones. If the timezone or enable_utc settings change, it clears the entire schedule database. This is because last_run_at timestamps are stored as naive or aware datetimes that may no longer be valid or comparable under a new timezone configuration, potentially leading to tasks running too often or not at all.
Idempotency and Lazy Loading
The Scheduler may be instantiated with lazy=True for introspection purposes (e.g., by the celery beat CLI). Subclasses must ensure that setup_schedule() is not called when lazy is set, avoiding unnecessary connections to databases or file locks during simple configuration checks.