ResultConsumer
This class manages the consumption of task results using a Redis pub/sub mechanism. It handles subscribing to task-specific channels, monitoring state changes, and automatically re-establishing connections upon failure. The class also provides mechanisms to drain events and cancel subscriptions once tasks reach a ready state.
Constructor
Signature
def ResultConsumer(
*args: any,
**kwargs: any
) - > null
Parameters
| Name | Type | Description |
|---|---|---|
| *args | any | Variable length argument list passed to the base class constructor. |
| **kwargs | any | Arbitrary keyword arguments passed to the base class constructor. |
Methods
on_after_fork()
@classmethod
def on_after_fork()
Resets the Redis connection pool and closes existing pub/sub connections after a process fork to ensure child processes have clean network handles.
on_state_change()
@classmethod
def on_state_change(
meta: dict,
message: dict
)
Processes a task state update and triggers unsubscription if the task is finished.
Parameters
| Name | Type | Description |
|---|---|---|
| meta | dict | The decoded task result and status information |
| message | dict | The raw message payload received from the pub/sub channel |
start()
@classmethod
def start(
initial_task_id: str
)
Initializes the pub/sub interface and begins monitoring the result channel for the specified task.
Parameters
| Name | Type | Description |
|---|---|---|
| initial_task_id | str | The unique identifier of the first task to begin consuming results for |
on_wait_for_pending()
@classmethod
def on_wait_for_pending(
result: [ResultSet](../../result/resultset.md?sid=celery_result_resultset)
)
Iterates through pending task metadata and updates the local state for any tasks that have already completed.
Parameters
| Name | Type | Description |
|---|---|---|
| result | [ResultSet](../../result/resultset.md?sid=celery_result_resultset) | The result object containing the tasks to be checked for updates |
stop()
@classmethod
def stop()
Closes the active pub/sub connection and releases associated network resources.
drain_events()
@classmethod
def drain_events(
timeout: float
)
Polls the Redis pub/sub channel for new messages and dispatches them to the state change handler.
Parameters
| Name | Type | Description |
|---|---|---|
| timeout | float | The maximum time in seconds to wait for a new message before returning |
consume_from()
@classmethod
def consume_from(
task_id: str
)
Ensures the consumer is active and subscribes to the result channel for a specific task.
Parameters
| Name | Type | Description |
|---|---|---|
| task_id | str | The unique identifier of the task to start monitoring |
cancel_for()
@classmethod
def cancel_for(
task_id: str
)
Removes a task from the subscription set and unsubscribes from its Redis result channel.
Parameters
| Name | Type | Description |
|---|---|---|
| task_id | str | The unique identifier of the task to stop monitoring |