Skip to main content

Introduction to Celery Beat

Celery Beat is the central periodic task scheduler for the Celery ecosystem. Unlike workers that execute tasks, Beat is responsible for dispatching task messages to the broker at specific intervals. In this codebase, the implementation is split between the application layer that manages the process and the service layer that handles the scheduling logic.

The Beat Application

The Beat class in celery.apps.beat serves as the entry point for the standalone scheduler process. It handles the high-level orchestration required to run Beat as a reliable service, including:

  • Initialization: Setting up the loader via self.app.loader.init_worker() and finalizing the Celery app.
  • Process Management: Creating PID files and setting the process title (e.g., celery beat).
  • Logging: Configuring the logging system based on user-provided log levels and files.
  • Signal Handling: Installing handlers for SIGTERM and SIGINT via install_sync_handler to ensure the schedule is synchronized to disk before the process exits.

When Beat.run() is called, it triggers start_scheduler(), which instantiates the underlying Service engine:

# celery/apps/beat.py

def start_scheduler(self) -> None:
if self.pidfile:
platforms.create_pidlock(self.pidfile)
service = self.Service(
app=self.app,
max_interval=self.max_interval,
scheduler_cls=self.scheduler_cls,
schedule_filename=self.schedule,
)
# ... logging and signal setup ...
try:
self.install_sync_handler(service)
service.start()
except Exception as exc:
# ... error handling ...

The Service Engine

The Service class in celery.beat is the runtime engine of the scheduler. Its primary responsibility is to maintain a "heartbeat" loop that repeatedly queries the scheduler for the next task's due time.

The core of this engine is the start() method, which runs a while loop until the service is shut down:

# celery/beat.py

def start(self, embedded_process=False):
# ... initialization and signals ...
try:
while not self._is_shutdown.is_set():
interval = self.scheduler.tick()
if interval and interval > 0.0:
debug('beat: Waking up %s.',
humanize_seconds(interval, prefix='in '))
time.sleep(interval)
if self.scheduler.should_sync():
self.scheduler._do_sync()
except (KeyboardInterrupt, SystemExit):
self._is_shutdown.set()
finally:
self.sync()

In every iteration, self.scheduler.tick() determines how long the service should sleep before the next task is due.

Scheduling Logic and Drift

The Scheduler class manages the schedule entries and a heap of upcoming events. The tick() method is the most critical part of the scheduling logic:

  1. It checks the heap for the next scheduled event.
  2. If an entry is due, it calls apply_entry(), which dispatches the task via apply_async().
  3. It returns the number of seconds to wait until the next task is ready.

To ensure tasks are not executed slightly before their scheduled time due to floating-point precision, the scheduler applies a small negative drift (defaulting to -0.010 seconds) in the adjust() method:

# celery/beat.py

def adjust(self, n, drift=-0.010):
if n and n > 0:
return n + drift
return n

Persistence with PersistentScheduler

By default, Celery Beat uses the PersistentScheduler, which subclasses Scheduler to store the last run times of tasks in a local database file (typically a shelve file named celerybeat-schedule).

Handling Configuration Changes

The PersistentScheduler is sensitive to changes in timezones or UTC settings. If it detects that the timezone or enable_utc configuration has changed since the last run, it clears the local database to prevent scheduling inconsistencies:

# celery/beat.py in PersistentScheduler.setup_schedule

tz = self.app.conf.timezone
stored_tz = self._store.get('tz')
if stored_tz is not None and stored_tz != tz:
warning('Reset: Timezone changed from %r to %r', stored_tz, tz)
self._store.clear() # Timezone changed, reset db!

Database Integrity

The scheduler also includes logic to detect and recover from corrupted schedule files. If the shelve database fails to open or retrieve keys, _destroy_open_corrupted_schedule is called to remove the corrupted files and start fresh.

Execution Modes

There are two primary ways to run the Beat service in this codebase:

  1. Standalone Process: Using the celery beat command, which invokes celery.apps.beat.Beat. This is the recommended approach for production.
  2. Embedded Service: Using the -B or --beat flag with the celery worker command. This starts an EmbeddedService (defined in celery.beat) as a child thread or process of the worker.

Restrictions

The embedded service is managed by the Beat bootstep in celery.worker.components. Notably, this codebase explicitly blocks the use of embedded Beat when using greenlet-based pools like eventlet or gevent, as these require a standalone process to function correctly:

# celery/worker/components.py

def create(self, w):
from celery.beat import EmbeddedService
pool_module = w.pool_cls if isinstance(w.pool_cls, str) else w.pool_cls.__module__
if pool_module.endswith(('gevent', 'eventlet')):
raise ImproperlyConfigured(ERR_B_GREEN)
# ...