Coordinating with Leader Election
The leader election mechanism in this codebase is implemented via the Gossip protocol, which allows workers to coordinate and ensure that specific actions—most commonly the execution of a task—are performed by only one worker in the cluster. This consensus-based approach relies on logical clocks to provide a deterministic ordering of events across distributed nodes.
Core Concepts
The Gossip Bootstep
The Gossip class (found in celery/worker/consumer/gossip.py) is a worker bootstep that facilitates worker-to-worker communication. It is responsible for:
- Maintaining a Lamport logical clock (
self.clock) to order events. - Tracking the state of other workers (join, leave, and lost events) via
self.state. - Handling election-related events (
worker.electandworker.elect.ack).
Logical Clocks and Determinism
Leader election in this project is not based on "first-come, first-served" in real-time, but rather on the logical clock value at the time the election was initiated. When multiple workers participate in an election, they compare their logical clocks. The worker with the lowest clock value is chosen as the leader. If clock values are identical, the worker's hostname and PID (combined as full_hostname) serve as a tie-breaker.
The Election Lifecycle
The election process follows a structured sequence of broadcasts and acknowledgments to reach consensus.
1. Initiation
An election can be triggered in two primary ways:
- Control API: Calling
app.control.election(id, topic, action)broadcasts an "election" control command to all workers. - Task Signatures: The
Signature.election()method incelery/canvas.pyuses the control API to initiate an election for a specific task.
# Example from celery/canvas.py
def election(self):
tid = self.options.get('task_id') or uuid()
with app.producer_or_acquire(None) as producer:
app.control.election(tid, 'task', self.clone(task_id=tid),
connection=producer.connection)
return self.type.AsyncResult(tid)
2. Candidate Announcement (worker-elect)
When a worker receives the election control command, it calls Gossip.election(). This method broadcasts a worker-elect event to all other workers in the cluster.
# celery/worker/consumer/gossip.py
def election(self, id, topic, action=None):
self.consensus_replies[id] = []
self.dispatcher.send(
'worker-elect',
id=id, topic=topic, action=action, cver=1,
)
3. Consensus Building (worker-elect-ack)
Every worker (including the initiator) receives these worker-elect events. Upon receipt, the on_elect handler:
- Extracts the candidate's clock and hostname.
- Pushes the candidate into a local min-heap (
self.consensus_requests[id]). - Sends a
worker-elect-ackback to the cluster.
# celery/worker/consumer/gossip.py
def on_elect(self, event):
(id_, clock, hostname, pid, topic, action, _) = self._cons_stamp_fields(event)
heappush(
self.consensus_requests[id_],
(clock, f'{hostname}.{pid}', topic, action),
)
self.dispatcher.send('worker-elect-ack', id=id_)
4. Leader Determination
The initiator collects acknowledgments in self.consensus_replies[id]. Once it has received ACKs from all workers currently perceived as "alive" in its local state, it determines the winner by sorting the heap.
# celery/worker/consumer/gossip.py
if len(replies) >= len(alive_workers):
_, leader, topic, action = self.clock.sort_heap(
self.consensus_requests[id],
)
if leader == self.full_hostname:
# This worker won the election
handler = self.election_handlers[topic]
handler(action)
Handling Election Results
The Gossip class maintains a mapping of topics to handlers in self.election_handlers. By default, the project supports the task topic.
The 'task' Topic
When a worker wins an election with the task topic, it executes the call_task method. This method takes the action (which is a task signature) and triggers it using apply_async().
# celery/worker/consumer/gossip.py
def call_task(self, task):
try:
self.app.signature(task).apply_async()
except Exception as exc:
logger.exception('Could not call task: %r', exc)
Because only the elected leader executes this handler, the task is only dispatched to the broker once, effectively preventing duplicate execution of coordinated tasks across the cluster.
Requirements and Constraints
- Transport Compatibility: The Gossip protocol and leader election are only supported for
amqpandredistransports. This is checked during initialization viaGossip.compatible_transport. - Worker Awareness: The mechanism relies on workers being aware of each other. The
Minglebootstep (whichGossiprequires) handles the initial synchronization of worker states and logical clocks when a node joins the cluster. - Node Liveness: If a worker is perceived as alive but fails to respond with an ACK, the election may be delayed. The
Gossip.periodicmethod regularly cleans up workers that have missed heartbeats to ensure thealive_workersset remains accurate.