Skip to main content

Running the Beat Service

To run the Celery Beat service for periodic task scheduling, you can use the Beat application class or embed the service directly into your own processes using threaded or process-based wrappers.

Starting the Beat Service Programmatically

You can start the Beat service from your code by instantiating celery.apps.beat.Beat and calling its run() method. This is the same mechanism used by the celery beat command-line tool.

from celery import Celery
from celery.apps.beat import Beat

app = Celery('myapp')

# Initialize the Beat service
beat_service = Beat(
app=app,
loglevel='INFO',
logfile='beat.log',
pidfile='celerybeat.pid',
schedule='celerybeat-schedule'
)

# Start the scheduler loop
beat_service.run()

Configuring Logging and PID Files

The Beat class manages logging setup and process synchronization through several initialization parameters:

  • loglevel: Sets the logging threshold (e.g., 'DEBUG', 'INFO', 'WARNING').
  • logfile: Path to the file where logs should be written. If None, logs are sent to stderr.
  • pidfile: Path to a file that will store the process ID. Beat.start_scheduler() uses platforms.create_pidlock(self.pidfile) to ensure only one instance runs.
  • schedule: The filename for the persistent schedule database (defaults to celerybeat-schedule).

When run() is called, it executes setup_logging(), which configures the application logger with your specified level and file:

# Internal logging setup in celery.apps.beat.Beat
def setup_logging(self, colorize: bool | None = None) -> None:
if colorize is None and self.no_color is not None:
colorize = not self.no_color
self.app.log.setup(self.loglevel, self.logfile,
self.redirect_stdouts, self.redirect_stdouts_level,
colorize=colorize)

Running Beat as an Embedded Service

If you need to run the scheduler within an existing process (such as a worker), use the _Threaded or _Process classes from celery.beat.

Using a Threaded Service

The _Threaded class runs the Beat service in a background thread. This is useful for lightweight embedding where shared memory is preferred.

from celery.beat import _Threaded

# Create and start the threaded service
threaded_beat = _Threaded(app, max_interval=60)
threaded_beat.start()

# To stop the service gracefully
threaded_beat.stop()

Using a Process-based Service

The _Process class runs the service in a separate process using multiprocessing. It automatically handles closing open file descriptors and resetting signal handlers for the new process.

from celery.beat import _Process

# Create and start the process-based service
process_beat = _Process(app, max_interval=60)
process_beat.start()

# To stop the service
process_beat.stop()

Signal Handling and Persistence

The Beat service ensures that the schedule is synchronized to disk before the process exits. It installs signal handlers for SIGTERM and SIGINT that trigger a sync of the Service instance.

# Signal handler installation in celery.apps.beat.Beat
def install_sync_handler(self, service: beat.Service) -> None:
"""Install a `SIGTERM` + `SIGINT` handler saving the schedule."""
def _sync(signum: Signals, frame: FrameType) -> None:
service.sync()
raise SystemExit()
platforms.signals.update(SIGTERM=_sync, SIGINT=_sync)

Troubleshooting and Gotchas

  • Schedule Corruption: If the schedule database (e.g., celerybeat-schedule.db) becomes corrupted or if you change the timezone or UTC settings in your app, the PersistentScheduler may clear the existing schedule to prevent inconsistencies.
  • Zombie PID Files: If the process crashes without running the signal handlers, you may need to manually remove the pidfile before restarting.
  • Embedded Process FDs: When using _Process, the service calls platforms.close_open_fds(), which closes stdin, stdout, and stderr. Ensure your logging is configured to write to a file if you need to capture output from an embedded process.