Bootstep syncing state with neighbor workers.
At startup, or upon consumer restart, this will:
- Sync logical clocks.
- Sync revoked tasks.
Attributes
| Attribute | Type | Description |
|---|
| label | string = Mingle | The display label used to identify this bootstep in logs and monitoring tools. |
| requires | tuple = (Events,) | A tuple of bootstep classes that must be initialized and started before the Mingle step can execute. |
| compatible_transports | set = {"amqp", "redis", "gcpubsub"} | A set of supported transport driver types that allow the worker to perform state synchronization with neighbors. |
Constructor
Signature
def Mingle(
c: Any,
without_mingle: boolean = False,
**kwargs: dict
)
Parameters
| Name | Type | Description |
|---|
| c | Any | The consumer instance. |
| without_mingle | boolean = False | Flag to manually disable the mingle process. |
| **kwargs | dict | Additional keyword arguments passed to the parent class. |
Methods
compatible_transport()
@classmethod
def compatible_transport(
app: Celery application instance
) - > boolean
Checks if the current application transport is supported by the Mingle bootstep.
Parameters
| Name | Type | Description |
|---|
| app | Celery application instance | The Celery application instance used to inspect the connection transport driver. |
Returns
| Type | Description |
|---|
boolean | True if the transport driver type is one of 'amqp', 'redis', or 'gcpubsub'. |
start()
@classmethod
def start(
c: Consumer instance
)
Starts the Mingle bootstep by initiating the synchronization process with neighbor nodes.
Parameters
| Name | Type | Description |
|---|
| c | Consumer instance | The consumer controller instance that manages the worker state. |
sync()
@classmethod
def sync(
c: Consumer instance
)
Orchestrates the discovery of neighbor workers and processes their state information to synchronize the local node.
Parameters
| Name | Type | Description |
|---|
| c | Consumer instance | The consumer controller instance used to send hello messages and process replies. |
send_hello()
@classmethod
def send_hello(
c: Consumer instance
) - > dict
Broadcasts a hello message to other workers to retrieve their logical clock and revoked task lists.
Parameters
| Name | Type | Description |
|---|
| c | Consumer instance | The consumer controller instance providing the hostname and connection for the broadcast. |
Returns
| Type | Description |
|---|
dict | A dictionary of node names mapping to their respective state replies, excluding the local node's own response. |
on_node_reply()
@classmethod
def on_node_reply(
c: Consumer instance,
nodename: string,
reply: dict
)
Processes a single neighbor's reply by attempting to synchronize the local state with the provided node data.
Parameters
| Name | Type | Description |
|---|
| c | Consumer instance | The consumer controller instance where the synchronized state will be applied. |
| nodename | string | The unique identifier of the neighbor worker node. |
| reply | dict | The state data returned by the neighbor, typically containing clock and revoked task information. |
sync_with_node()
@classmethod
def sync_with_node(
c: Consumer instance,
clock: int,
revoked: list
)
Updates the local logical clock and revoked task list using data received from a specific neighbor node.
Parameters
| Name | Type | Description |
|---|
| c | Consumer instance | The consumer controller instance to be updated. |
| clock | int | The logical clock value from the remote node used for clock adjustment. |
| revoked | list | A list of revoked task IDs to be merged into the local state. |
on_clock_event()
@classmethod
def on_clock_event(
c: Consumer instance,
clock: int
)
Adjusts the local application's logical clock based on the clock value received from a neighbor.
Parameters
| Name | Type | Description |
|---|
| c | Consumer instance | The consumer controller instance containing the application clock. |
| clock | int | The remote clock value to synchronize against; if None, the local clock is simply forwarded. |
on_revoked_received()
@classmethod
def on_revoked_received(
c: Consumer instance,
revoked: list
)
Updates the local set of revoked tasks with the list of revoked tasks provided by a neighbor.
Parameters
| Name | Type | Description |
|---|
| c | Consumer instance | The consumer controller instance whose revoked state will be updated. |
| revoked | list | A collection of task IDs that have been revoked on the remote node. |