Skip to main content

Coordinating with Leader Election

The leader election mechanism in this codebase is implemented via the Gossip protocol, which allows workers to coordinate and ensure that specific actions—most commonly the execution of a task—are performed by only one worker in the cluster. This consensus-based approach relies on logical clocks to provide a deterministic ordering of events across distributed nodes.

Core Concepts

The Gossip Bootstep

The Gossip class (found in celery/worker/consumer/gossip.py) is a worker bootstep that facilitates worker-to-worker communication. It is responsible for:

  • Maintaining a Lamport logical clock (self.clock) to order events.
  • Tracking the state of other workers (join, leave, and lost events) via self.state.
  • Handling election-related events (worker.elect and worker.elect.ack).

Logical Clocks and Determinism

Leader election in this project is not based on "first-come, first-served" in real-time, but rather on the logical clock value at the time the election was initiated. When multiple workers participate in an election, they compare their logical clocks. The worker with the lowest clock value is chosen as the leader. If clock values are identical, the worker's hostname and PID (combined as full_hostname) serve as a tie-breaker.

The Election Lifecycle

The election process follows a structured sequence of broadcasts and acknowledgments to reach consensus.

1. Initiation

An election can be triggered in two primary ways:

  • Control API: Calling app.control.election(id, topic, action) broadcasts an "election" control command to all workers.
  • Task Signatures: The Signature.election() method in celery/canvas.py uses the control API to initiate an election for a specific task.
# Example from celery/canvas.py
def election(self):
tid = self.options.get('task_id') or uuid()
with app.producer_or_acquire(None) as producer:
app.control.election(tid, 'task', self.clone(task_id=tid),
connection=producer.connection)
return self.type.AsyncResult(tid)

2. Candidate Announcement (worker-elect)

When a worker receives the election control command, it calls Gossip.election(). This method broadcasts a worker-elect event to all other workers in the cluster.

# celery/worker/consumer/gossip.py
def election(self, id, topic, action=None):
self.consensus_replies[id] = []
self.dispatcher.send(
'worker-elect',
id=id, topic=topic, action=action, cver=1,
)

3. Consensus Building (worker-elect-ack)

Every worker (including the initiator) receives these worker-elect events. Upon receipt, the on_elect handler:

  1. Extracts the candidate's clock and hostname.
  2. Pushes the candidate into a local min-heap (self.consensus_requests[id]).
  3. Sends a worker-elect-ack back to the cluster.
# celery/worker/consumer/gossip.py
def on_elect(self, event):
(id_, clock, hostname, pid, topic, action, _) = self._cons_stamp_fields(event)
heappush(
self.consensus_requests[id_],
(clock, f'{hostname}.{pid}', topic, action),
)
self.dispatcher.send('worker-elect-ack', id=id_)

4. Leader Determination

The initiator collects acknowledgments in self.consensus_replies[id]. Once it has received ACKs from all workers currently perceived as "alive" in its local state, it determines the winner by sorting the heap.

# celery/worker/consumer/gossip.py
if len(replies) >= len(alive_workers):
_, leader, topic, action = self.clock.sort_heap(
self.consensus_requests[id],
)
if leader == self.full_hostname:
# This worker won the election
handler = self.election_handlers[topic]
handler(action)

Handling Election Results

The Gossip class maintains a mapping of topics to handlers in self.election_handlers. By default, the project supports the task topic.

The 'task' Topic

When a worker wins an election with the task topic, it executes the call_task method. This method takes the action (which is a task signature) and triggers it using apply_async().

# celery/worker/consumer/gossip.py
def call_task(self, task):
try:
self.app.signature(task).apply_async()
except Exception as exc:
logger.exception('Could not call task: %r', exc)

Because only the elected leader executes this handler, the task is only dispatched to the broker once, effectively preventing duplicate execution of coordinated tasks across the cluster.

Requirements and Constraints

  • Transport Compatibility: The Gossip protocol and leader election are only supported for amqp and redis transports. This is checked during initialization via Gossip.compatible_transport.
  • Worker Awareness: The mechanism relies on workers being aware of each other. The Mingle bootstep (which Gossip requires) handles the initial synchronization of worker states and logical clocks when a node joins the cluster.
  • Node Liveness: If a worker is perceived as alive but fails to respond with an ACK, the election may be delayed. The Gossip.periodic method regularly cleans up workers that have missed heartbeats to ensure the alive_workers set remains accurate.