ResultConsumer
This class manages the consumption of task results from a message broker using the Kombu library. It handles the lifecycle of message consumers, including establishing connections, binding to task-specific queues, and processing state changes through callbacks. Additionally, it provides built-in support for event draining and automatic reconnection in the event of connection failures.
Attributes
| Attribute | Type | Description |
|---|---|---|
| Consumer | type = kombu.Consumer | The kombu Consumer class used to create message consumers for task results. |
Constructor
Signature
def ResultConsumer(
*args: any,
**kwargs: any
) - > null
Parameters
| Name | Type | Description |
|---|---|---|
| *args | any | Variable length argument list passed to the base class constructor. |
| **kwargs | any | Arbitrary keyword arguments passed to the base class constructor. |
Methods
start()
@classmethod
def start(
initial_task_id: string,
no_ack: boolean = True,
kwargs: dict
)
Initializes the connection and starts the message consumer for a specific task result queue.
Parameters
| Name | Type | Description |
|---|---|---|
| initial_task_id | string | The unique identifier of the task whose result queue will be bound and consumed first. |
| no_ack | boolean = True | Determines whether messages should be automatically acknowledged by the broker upon receipt. |
| kwargs | dict | Additional configuration arguments passed to the underlying connection or consumer. |
drain_events()
@classmethod
def drain_events(
timeout: float = None
) - > any
Waits for and processes incoming events from the broker connection within a specified timeframe.
Parameters
| Name | Type | Description |
|---|---|---|
| timeout | float = None | The maximum number of seconds to wait for an event before returning. |
Returns
| Type | Description |
|---|---|
any | The result of the drained event, or null if no connection exists and the timeout expires. |
stop()
@classmethod
def stop()
Cancels the active consumer and closes the underlying network connection to the broker.
on_after_fork()
@classmethod
def on_after_fork()
Cleans up the connection and consumer state after a process fork to prevent resource sharing between parent and child.
consume_from()
@classmethod
def consume_from(
task_id: string
)
Adds a new task-specific queue to the active consumer or starts a new consumer if one does not exist.
Parameters
| Name | Type | Description |
|---|---|---|
| task_id | string | The unique identifier of the task whose result queue should be added to the consumption list. |
cancel_for()
@classmethod
def cancel_for(
task_id: string
)
Stops consuming results for a specific task by removing its associated queue from the consumer.
Parameters
| Name | Type | Description |
|---|---|---|
| task_id | string | The unique identifier of the task whose result queue should be cancelled. |