Skip to main content

Mingle

Bootstep syncing state with neighbor workers.

At startup, or upon consumer restart, this will:

  • Sync logical clocks.
  • Sync revoked tasks.

Attributes

AttributeTypeDescription
labelstring = MingleThe display label used to identify this bootstep in logs and monitoring tools.
requirestuple = (Events,)A tuple of bootstep classes that must be initialized and started before the Mingle step can execute.
compatible_transportsset = {"amqp", "redis", "gcpubsub"}A set of supported transport driver types that allow the worker to perform state synchronization with neighbors.

Constructor

Signature

def Mingle(
c: Any,
without_mingle: boolean = False,
**kwargs: dict
)

Parameters

NameTypeDescription
cAnyThe consumer instance.
without_mingleboolean = FalseFlag to manually disable the mingle process.
**kwargsdictAdditional keyword arguments passed to the parent class.

Methods


compatible_transport()

@classmethod
def compatible_transport(
app: Celery application instance
) - > boolean

Checks if the current application transport is supported by the Mingle bootstep.

Parameters

NameTypeDescription
appCelery application instanceThe Celery application instance used to inspect the connection transport driver.

Returns

TypeDescription
booleanTrue if the transport driver type is one of 'amqp', 'redis', or 'gcpubsub'.

start()

@classmethod
def start(
c: Consumer instance
)

Starts the Mingle bootstep by initiating the synchronization process with neighbor nodes.

Parameters

NameTypeDescription
cConsumer instanceThe consumer controller instance that manages the worker state.

sync()

@classmethod
def sync(
c: Consumer instance
)

Orchestrates the discovery of neighbor workers and processes their state information to synchronize the local node.

Parameters

NameTypeDescription
cConsumer instanceThe consumer controller instance used to send hello messages and process replies.

send_hello()

@classmethod
def send_hello(
c: Consumer instance
) - > dict

Broadcasts a hello message to other workers to retrieve their logical clock and revoked task lists.

Parameters

NameTypeDescription
cConsumer instanceThe consumer controller instance providing the hostname and connection for the broadcast.

Returns

TypeDescription
dictA dictionary of node names mapping to their respective state replies, excluding the local node's own response.

on_node_reply()

@classmethod
def on_node_reply(
c: Consumer instance,
nodename: string,
reply: dict
)

Processes a single neighbor's reply by attempting to synchronize the local state with the provided node data.

Parameters

NameTypeDescription
cConsumer instanceThe consumer controller instance where the synchronized state will be applied.
nodenamestringThe unique identifier of the neighbor worker node.
replydictThe state data returned by the neighbor, typically containing clock and revoked task information.

sync_with_node()

@classmethod
def sync_with_node(
c: Consumer instance,
clock: int,
revoked: list
)

Updates the local logical clock and revoked task list using data received from a specific neighbor node.

Parameters

NameTypeDescription
cConsumer instanceThe consumer controller instance to be updated.
clockintThe logical clock value from the remote node used for clock adjustment.
revokedlistA list of revoked task IDs to be merged into the local state.

on_clock_event()

@classmethod
def on_clock_event(
c: Consumer instance,
clock: int
)

Adjusts the local application's logical clock based on the clock value received from a neighbor.

Parameters

NameTypeDescription
cConsumer instanceThe consumer controller instance containing the application clock.
clockintThe remote clock value to synchronize against; if None, the local clock is simply forwarded.

on_revoked_received()

@classmethod
def on_revoked_received(
c: Consumer instance,
revoked: list
)

Updates the local set of revoked tasks with the list of revoked tasks provided by a neighbor.

Parameters

NameTypeDescription
cConsumer instanceThe consumer controller instance whose revoked state will be updated.
revokedlistA collection of task IDs that have been revoked on the remote node.