Skip to main content

Gossip

Bootstep consuming events from other workers.

This keeps the logical clock value up to date.

Attributes

AttributeTypeDescription
labelstring = GossipInternal identifier used for logging and step tracking within the bootstep graph.
requirestuple = (Mingle,)A tuple of bootstep classes that must be initialized before the Gossip step can start.
compatible_transportsset = {'amqp', 'redis'}A set of broker transport names that support the gossip protocol requirements.

Constructor

Signature

def Gossip(
c: [Consumer](../consumer/consumer.md?sid=celery_worker_consumer_consumer_consumer),
without_gossip: boolean = False,
interval: float = 5.0,
heartbeat_interval: float = 2.0,
**kwargs: dict
) - > null

Parameters

NameTypeDescription
c[Consumer](../consumer/consumer.md?sid=celery_worker_consumer_consumer_consumer)The consumer instance associated with this bootstep.
without_gossipboolean = FalseFlag to manually disable the gossip service.
intervalfloat = 5.0The interval in seconds for periodic gossip checks.
heartbeat_intervalfloat = 2.0The interval in seconds for heartbeat events.
**kwargsdictAdditional keyword arguments passed to the parent class constructor.

Methods


compatible_transport()

@classmethod
def compatible_transport(
app: [Celery](../../../app/base/celery.md?sid=celery_app_base_celery)
) - > boolean

Checks if the current application transport is supported by the Gossip bootstep.

Parameters

NameTypeDescription
app[Celery](../../../app/base/celery.md?sid=celery_app_base_celery)The Celery application instance used to inspect the connection transport

Returns

TypeDescription
booleanTrue if the transport driver type is 'amqp' or 'redis', otherwise False

election()

@classmethod
def election(
id: string,
topic: string,
action: any
) - > null

Initiates a leader election process across the cluster for a specific topic.

Parameters

NameTypeDescription
idstringThe unique identifier for this specific election request
topicstringThe category of the election, such as 'task', which determines the handler to execute
actionanyThe specific payload or task signature to be executed by the election winner

Returns

TypeDescription
null

call_task()

@classmethod
def call_task(
task: string|dict
) - > null

Executes a task signature asynchronously as a result of winning an election.

Parameters

NameTypeDescription
task`stringdict`

Returns

TypeDescription
null

on_elect()

@classmethod
def on_elect(
event: dict
) - > null

Handles incoming election requests from other nodes and sends an acknowledgment.

Parameters

NameTypeDescription
eventdictThe election event payload containing clock, hostname, and election metadata

Returns

TypeDescription
null

start()

@classmethod
def start(
c: [Consumer](../consumer/consumer.md?sid=celery_worker_consumer_consumer_consumer)
) - > null

Starts the Gossip bootstep and initializes the event dispatcher.

Parameters

NameTypeDescription
c[Consumer](../consumer/consumer.md?sid=celery_worker_consumer_consumer_consumer)The consumer instance providing the event dispatcher

Returns

TypeDescription
null

on_elect_ack()

@classmethod
def on_elect_ack(
event: dict
) - > null

Processes election acknowledgments and determines the winner once all nodes have replied.

Parameters

NameTypeDescription
eventdictThe acknowledgment event containing the responding node's hostname and election ID

Returns

TypeDescription
null

on_node_join()

@classmethod
def on_node_join(
worker: [Worker](../../../events/state/worker.md?sid=celery_events_state_worker)
) - > null

Triggers registered callbacks when a new worker node is detected in the cluster.

Parameters

NameTypeDescription
worker[Worker](../../../events/state/worker.md?sid=celery_events_state_worker)The worker state object representing the node that joined

Returns

TypeDescription
null

on_node_leave()

@classmethod
def on_node_leave(
worker: [Worker](../../../events/state/worker.md?sid=celery_events_state_worker)
) - > null

Triggers registered callbacks when a worker node gracefully leaves the cluster.

Parameters

NameTypeDescription
worker[Worker](../../../events/state/worker.md?sid=celery_events_state_worker)The worker state object representing the node that left

Returns

TypeDescription
null

on_node_lost()

@classmethod
def on_node_lost(
worker: [Worker](../../../events/state/worker.md?sid=celery_events_state_worker)
) - > null

Triggers registered callbacks when a worker node is considered lost due to missed heartbeats.

Parameters

NameTypeDescription
worker[Worker](../../../events/state/worker.md?sid=celery_events_state_worker)The worker state object representing the node that was lost

Returns

TypeDescription
null

register_timer()

@classmethod
def register_timer() - > null

Schedules the periodic cleanup task to run at the configured interval.

Returns

TypeDescription
null

periodic()

@classmethod
def periodic() - > null

Inspects the state of all known workers and removes those that are no longer alive.

Returns

TypeDescription
null

get_consumers()

@classmethod
def get_consumers(
channel: Channel
) - > list

Returns a list of AMQP consumers configured to listen for worker events.

Parameters

NameTypeDescription
channelChannelThe AMQP channel used to create the consumer

Returns

TypeDescription
listA list containing a Kombu Consumer configured for worker event patterns

on_message()

@classmethod
def on_message(
prepare: callable,
message: Message
) - > any

Dispatches incoming messages to specific event handlers or updates the internal worker state.

Parameters

NameTypeDescription
preparecallableA function used to deserialize and prepare the message payload
messageMessageThe raw message object received from the broker

Returns

TypeDescription
anyThe result of the specific event handler, or null if processed as a state update