Skip to main content

Monitoring with Heartbeats

The heartbeat mechanism in this codebase provides real-time visibility into worker health, resource utilization, and software status. It is implemented primarily through the Heart class and integrated into the worker lifecycle via a dedicated bootstep.

Core Concepts

The heartbeat system is an event-based monitoring tool. It is distinct from AMQP protocol-level heartbeats (often configured via broker_heartbeat), which are used by the message broker to detect stale TCP connections. In contrast, the worker heartbeats described here are high-level events containing rich metadata about the worker's internal state.

The Heart Class

The Heart class (found in celery/worker/heartbeat.py) is a timer-driven service. It uses a kombu.asynchronous.timer.Timer to schedule periodic execution of its _send method.

class Heart:
def __init__(self, timer, eventer, interval=None):
self.timer = timer
self.eventer = eventer
self.interval = float(interval or 2.0)
self.tref = None

# Make event dispatcher start/stop us when enabled/disabled.
self.eventer.on_enabled.add(self.start)
self.eventer.on_disabled.add(self.stop)

The Heart instance is tightly coupled with the eventer (celery.events.EventDispatcher). It only sends heartbeats if the event dispatcher is enabled, and it automatically starts or stops in response to the dispatcher's state changes.

Data Payload and System Monitoring

Every heartbeat event carries a snapshot of the worker's current performance and environment. This data is collected from several internal modules:

  1. Task Statistics:
    • active: The number of currently executing tasks, derived from len(active_requests) in celery.worker.state.
    • processed: The total number of tasks processed by the worker since startup, tracked in all_total_count[0] in celery.worker.state.
  2. System Load:
    • loadavg: The system load average (1, 5, and 15-minute intervals) retrieved via load_average() in celery.utils.sysinfo. On Windows, this defaults to (0.0, 0.0, 0.0).
  3. Software Information:
    • Includes the software identifier (py-celery), version, and operating system platform, defined in SOFTWARE_INFO within celery.worker.state.

The _send method assembles this data:

def _send(self, event, retry=True):
if self._send_sent_signal is not None:
self._send_sent_signal(sender=self)
return self.eventer.send(event, freq=self.interval,
active=len(active_requests),
processed=all_total_count[0],
loadavg=load_average(),
retry=retry,
**SOFTWARE_INFO)

Lifecycle Events

The heartbeat mechanism manages three distinct event types to signal worker transitions:

  • worker-online: Sent immediately when Heart.start() is called. This signals to the cluster that a new worker is available.
  • worker-heartbeat: Sent repeatedly at the configured interval (defaulting to 2 seconds). This confirms the worker is still alive and processing.
  • worker-offline: Sent when Heart.stop() is called during a graceful shutdown. Notably, this event is sent with retry=False to ensure the shutdown process is not delayed by network retries if the broker is unreachable.

Integration via Bootsteps

The heartbeat service is integrated into the worker's consumer via the Heart bootstep in celery/worker/consumer/heart.py. This bootstep ensures that the heartbeat service is initialized with the correct timer and event dispatcher from the consumer context.

class Heart(bootsteps.StartStopStep):
requires = (Events,)

def start(self, c):
c.heart = heartbeat.Heart(
c.timer, c.event_dispatcher, self.heartbeat_interval,
)
c.heart.start()

The bootstep allows the heartbeat mechanism to be disabled globally if without_heartbeat is set, or for the interval to be customized via heartbeat_interval.

Local Hooks: The heartbeat_sent Signal

For local monitoring or custom logging within the worker process, the Heart class triggers the heartbeat_sent signal (from celery.signals) every time a heartbeat is dispatched.

# Only send heartbeat_sent signal if it has receivers.
self._send_sent_signal = (
heartbeat_sent.send if heartbeat_sent.receivers else None)

This optimization ensures that the signal overhead is only incurred if there are active listeners connected to heartbeat_sent. Developers can use this signal to perform side effects, such as updating a local health check file or incrementing internal metrics counters, every time the worker confirms its liveness to the cluster.