Skip to main content

Worker State Synchronization

Worker state synchronization in this codebase is managed by two primary bootsteps: Mingle and Gossip. These components ensure that when a worker joins or operates within a cluster, it maintains a consistent view of the global state, specifically regarding logical clocks and revoked task lists.

Startup Synchronization (Mingle)

The Mingle bootstep (defined in celery.worker.consumer.mingle) is responsible for synchronizing a worker's state with its neighbors immediately upon startup or consumer restart. It focuses on two critical pieces of data: the logical clock and the list of revoked tasks.

Neighbor Discovery and Hello Exchange

Mingle uses the hello control command to discover other workers in the cluster. The send_hello method utilizes the application's control interface to broadcast the worker's presence and its current revoked task list.

# From celery/worker/consumer/mingle.py
def send_hello(self, c):
inspect = c.app.control.inspect(timeout=1.0, connection=c.connection)
our_revoked = c.controller.state.revoked
replies = inspect.hello(c.hostname, our_revoked._data) or {}
replies.pop(c.hostname, None) # delete my own response
return replies

State Merging

When neighbors reply, the sync_with_node method processes the incoming state:

  1. Logical Clock: The worker adjusts its local clock using c.app.clock.adjust(clock). This ensures the new worker's clock is at least as high as the highest clock in the cluster.
  2. Revoked Tasks: The worker merges the neighbor's revoked task list into its own LimitedSet via c.controller.state.revoked.update(revoked).

This synchronization prevents a new worker from accidentally executing a task that was globally revoked while the worker was offline.

Runtime Coordination (Gossip)

Once the worker has started and completed Mingle, the Gossip bootstep (defined in celery.worker.consumer.gossip) takes over to maintain state during runtime. Gossip is a ConsumerStep that listens for events from other workers.

Logical Clock Maintenance

Gossip keeps the logical clock updated by observing every message received on the worker.# routing key. If a message originates from a different host, Gossip updates the local state and clock. If the message is from the local host, it simply forwards the clock.

# From celery/worker/consumer/gossip.py
def on_message(self, prepare, message):
# ... (handler lookup)
hostname = (message.headers.get('hostname') or
message.payload['hostname'])
if hostname != self.hostname:
try:
_, event = prepare(message.payload)
self.update_state(event) # Updates logical clock via event
except (DecodeError, ContentDisallowed, TypeError) as exc:
logger.error(exc)
else:
self.clock.forward()

Node Monitoring

Gossip tracks the liveness of other nodes using an internal state object (c.app.events.State). It registers handlers for node lifecycle events:

  • on_node_join: Triggered when a new worker is detected.
  • on_node_leave: Triggered when a worker shuts down gracefully.
  • on_node_lost: Triggered when a worker misses heartbeats, handled via a periodic timer that checks worker.alive status.

Consensus and Leader Election

Gossip provides a mechanism for cluster-wide consensus, primarily used for leader election. This is implemented through worker-elect and worker-elect-ack events.

The Election Process

An election is initiated via the election method, which broadcasts a worker-elect event containing a unique ID, a topic (e.g., 'task'), and an action.

  1. Request: Workers receiving the request push the candidate's details (clock, hostname, pid) onto a heap in consensus_requests and reply with an ACK.
  2. Consensus: Once a worker receives ACKs from all known alive workers (tracked by self.state.alive_workers()), it determines the winner.
  3. Winning Criteria: The winner is determined by sorting the heap of candidates. The sorting uses the logical clock as the primary key, with the full hostname (including PID) as a tie-breaker.
# From celery/worker/consumer/gossip.py
if len(replies) >= len(alive_workers):
_, leader, topic, action = self.clock.sort_heap(
self.consensus_requests[id],
)
if leader == self.full_hostname:
info('I won the election %r', id)
# Execute the election handler (e.g., call_task)
handler(action)

Transport Compatibility and Configuration

State synchronization features are dependent on the underlying broker transport.

FeatureCompatible Transports
Mingleamqp, redis, gcpubsub
Gossipamqp, redis

Configuration Parameters

  • without_mingle: Disables startup synchronization.
  • without_gossip: Disables runtime event consumption and leader election.
  • interval: (Default 5.0s) The frequency at which Gossip checks for "lost" nodes that have stopped sending heartbeats.
  • heartbeat_interval: (Default 2.0s) Used as the TTL for the gossip event queue to ensure stale events are discarded.

Gossip has a strict dependency on Mingle; in the bootstep graph, Gossip.requires includes Mingle, ensuring that startup synchronization always precedes runtime gossip.