Skip to main content

ResultConsumer

This class manages the consumption of task results using a Redis pub/sub mechanism. It handles subscribing to task-specific channels, monitoring state changes, and automatically re-establishing connections upon failure. The class also provides mechanisms to drain events and cancel subscriptions once tasks reach a ready state.

Constructor

Signature

def ResultConsumer(
*args: any,
**kwargs: any
) - > null

Parameters

NameTypeDescription
*argsanyVariable length argument list passed to the base class constructor.
**kwargsanyArbitrary keyword arguments passed to the base class constructor.

Methods


on_after_fork()

@classmethod
def on_after_fork()

Resets the Redis connection pool and closes existing pub/sub connections after a process fork to ensure child processes have clean network handles.


on_state_change()

@classmethod
def on_state_change(
meta: dict,
message: dict
)

Processes a task state update and triggers unsubscription if the task is finished.

Parameters

NameTypeDescription
metadictThe decoded task result and status information
messagedictThe raw message payload received from the pub/sub channel

start()

@classmethod
def start(
initial_task_id: str
)

Initializes the pub/sub interface and begins monitoring the result channel for the specified task.

Parameters

NameTypeDescription
initial_task_idstrThe unique identifier of the first task to begin consuming results for

on_wait_for_pending()

@classmethod
def on_wait_for_pending(
result: [ResultSet](../../result/resultset.md?sid=celery_result_resultset)
)

Iterates through pending task metadata and updates the local state for any tasks that have already completed.

Parameters

NameTypeDescription
result[ResultSet](../../result/resultset.md?sid=celery_result_resultset)The result object containing the tasks to be checked for updates

stop()

@classmethod
def stop()

Closes the active pub/sub connection and releases associated network resources.


drain_events()

@classmethod
def drain_events(
timeout: float
)

Polls the Redis pub/sub channel for new messages and dispatches them to the state change handler.

Parameters

NameTypeDescription
timeoutfloatThe maximum time in seconds to wait for a new message before returning

consume_from()

@classmethod
def consume_from(
task_id: str
)

Ensures the consumer is active and subscribes to the result channel for a specific task.

Parameters

NameTypeDescription
task_idstrThe unique identifier of the task to start monitoring

cancel_for()

@classmethod
def cancel_for(
task_id: str
)

Removes a task from the subscription set and unsubscribes from its Redis result channel.

Parameters

NameTypeDescription
task_idstrThe unique identifier of the task to stop monitoring