Running the Beat Service
To run the Celery Beat service for periodic task scheduling, you can use the Beat application class or embed the service directly into your own processes using threaded or process-based wrappers.
Starting the Beat Service Programmatically
You can start the Beat service from your code by instantiating celery.apps.beat.Beat and calling its run() method. This is the same mechanism used by the celery beat command-line tool.
from celery import Celery
from celery.apps.beat import Beat
app = Celery('myapp')
# Initialize the Beat service
beat_service = Beat(
app=app,
loglevel='INFO',
logfile='beat.log',
pidfile='celerybeat.pid',
schedule='celerybeat-schedule'
)
# Start the scheduler loop
beat_service.run()
Configuring Logging and PID Files
The Beat class manages logging setup and process synchronization through several initialization parameters:
loglevel: Sets the logging threshold (e.g.,'DEBUG','INFO','WARNING').logfile: Path to the file where logs should be written. IfNone, logs are sent to stderr.pidfile: Path to a file that will store the process ID.Beat.start_scheduler()usesplatforms.create_pidlock(self.pidfile)to ensure only one instance runs.schedule: The filename for the persistent schedule database (defaults tocelerybeat-schedule).
When run() is called, it executes setup_logging(), which configures the application logger with your specified level and file:
# Internal logging setup in celery.apps.beat.Beat
def setup_logging(self, colorize: bool | None = None) -> None:
if colorize is None and self.no_color is not None:
colorize = not self.no_color
self.app.log.setup(self.loglevel, self.logfile,
self.redirect_stdouts, self.redirect_stdouts_level,
colorize=colorize)
Running Beat as an Embedded Service
If you need to run the scheduler within an existing process (such as a worker), use the _Threaded or _Process classes from celery.beat.
Using a Threaded Service
The _Threaded class runs the Beat service in a background thread. This is useful for lightweight embedding where shared memory is preferred.
from celery.beat import _Threaded
# Create and start the threaded service
threaded_beat = _Threaded(app, max_interval=60)
threaded_beat.start()
# To stop the service gracefully
threaded_beat.stop()
Using a Process-based Service
The _Process class runs the service in a separate process using multiprocessing. It automatically handles closing open file descriptors and resetting signal handlers for the new process.
from celery.beat import _Process
# Create and start the process-based service
process_beat = _Process(app, max_interval=60)
process_beat.start()
# To stop the service
process_beat.stop()
Signal Handling and Persistence
The Beat service ensures that the schedule is synchronized to disk before the process exits. It installs signal handlers for SIGTERM and SIGINT that trigger a sync of the Service instance.
# Signal handler installation in celery.apps.beat.Beat
def install_sync_handler(self, service: beat.Service) -> None:
"""Install a `SIGTERM` + `SIGINT` handler saving the schedule."""
def _sync(signum: Signals, frame: FrameType) -> None:
service.sync()
raise SystemExit()
platforms.signals.update(SIGTERM=_sync, SIGINT=_sync)
Troubleshooting and Gotchas
- Schedule Corruption: If the schedule database (e.g.,
celerybeat-schedule.db) becomes corrupted or if you change thetimezoneorUTCsettings in your app, thePersistentSchedulermay clear the existing schedule to prevent inconsistencies. - Zombie PID Files: If the process crashes without running the signal handlers, you may need to manually remove the
pidfilebefore restarting. - Embedded Process FDs: When using
_Process, the service callsplatforms.close_open_fds(), which closesstdin,stdout, andstderr. Ensure your logging is configured to write to a file if you need to capture output from an embedded process.