Skip to main content

ResultConsumer

This class manages the consumption of task results from a message broker using the Kombu library. It handles the lifecycle of message consumers, including establishing connections, binding to task-specific queues, and processing state changes through callbacks. Additionally, it provides built-in support for event draining and automatic reconnection in the event of connection failures.

Attributes

AttributeTypeDescription
Consumertype = kombu.ConsumerThe kombu Consumer class used to create message consumers for task results.

Constructor

Signature

def ResultConsumer(
*args: any,
**kwargs: any
) - > null

Parameters

NameTypeDescription
*argsanyVariable length argument list passed to the base class constructor.
**kwargsanyArbitrary keyword arguments passed to the base class constructor.

Methods


start()

@classmethod
def start(
initial_task_id: string,
no_ack: boolean = True,
kwargs: dict
)

Initializes the connection and starts the message consumer for a specific task result queue.

Parameters

NameTypeDescription
initial_task_idstringThe unique identifier of the task whose result queue will be bound and consumed first.
no_ackboolean = TrueDetermines whether messages should be automatically acknowledged by the broker upon receipt.
kwargsdictAdditional configuration arguments passed to the underlying connection or consumer.

drain_events()

@classmethod
def drain_events(
timeout: float = None
) - > any

Waits for and processes incoming events from the broker connection within a specified timeframe.

Parameters

NameTypeDescription
timeoutfloat = NoneThe maximum number of seconds to wait for an event before returning.

Returns

TypeDescription
anyThe result of the drained event, or null if no connection exists and the timeout expires.

stop()

@classmethod
def stop()

Cancels the active consumer and closes the underlying network connection to the broker.


on_after_fork()

@classmethod
def on_after_fork()

Cleans up the connection and consumer state after a process fork to prevent resource sharing between parent and child.


consume_from()

@classmethod
def consume_from(
task_id: string
)

Adds a new task-specific queue to the active consumer or starts a new consumer if one does not exist.

Parameters

NameTypeDescription
task_idstringThe unique identifier of the task whose result queue should be added to the consumption list.

cancel_for()

@classmethod
def cancel_for(
task_id: string
)

Stops consuming results for a specific task by removing its associated queue from the consumer.

Parameters

NameTypeDescription
task_idstringThe unique identifier of the task whose result queue should be cancelled.