Manager responsible for consuming result messages.
Constructor
Signature
def BaseResultConsumer(
backend: Any,
app: Any,
accept: Any,
pending_results: Any,
pending_messages: Any
) - > null
Parameters
| Name | Type | Description |
|---|
| backend | Any | The result backend used for retrieving task states. |
| app | Any | The Celery application instance. |
| accept | Any | Content types to accept for message serialization. |
| pending_results | Any | A collection of results currently awaiting completion. |
| pending_messages | Any | A buffer for messages received before their corresponding result objects were registered. |
Methods
start()
@classmethod
def start(
initial_task_id: string,
**kwargs: dict
) - > null
Initializes the consumer for a specific task ID. This method must be implemented by subclasses to set up the initial consumption state.
Parameters
| Name | Type | Description |
|---|
| initial_task_id | string | The unique identifier of the first task to begin monitoring for results |
| **kwargs | dict | Additional backend-specific configuration parameters for starting the consumer |
Returns
reconnect_on_error()
@classmethod
def reconnect_on_error() - > contextmanager
Context manager that catches connection errors and reconnects. Wraps a block of code so that any :attr:_connection_errors raised inside it trigger a call to :meth:_reconnect. If reconnection itself raises a connection error the consumer is considered unrecoverable and a :exc:RuntimeError is raised to signal that the Celery application must be restarted.
Returns
| Type | Description |
|---|
contextmanager | A context manager that provides error handling and reconnection logic for the wrapped block |
stop()
@classmethod
def stop() - > null
Stops the consumer and cleans up any active resources or connections.
Returns
drain_events()
@classmethod
def drain_events(
timeout: float
) - > null
Reads and processes events from the underlying transport until a message is received or the timeout expires.
Parameters
| Name | Type | Description |
|---|
| timeout | float | The maximum number of seconds to wait for an event before returning |
Returns
consume_from()
@classmethod
def consume_from(
task_id: string
) - > null
Registers a specific task ID to be included in the current consumption stream.
Parameters
| Name | Type | Description |
|---|
| task_id | string | The unique identifier of the task whose results should be consumed |
Returns
cancel_for()
@classmethod
def cancel_for(
task_id: string
) - > null
Unregisters a task ID so that the consumer no longer listens for its result messages.
Parameters
| Name | Type | Description |
|---|
| task_id | string | The unique identifier of the task to stop monitoring |
Returns
on_after_fork()
@classmethod
def on_after_fork() - > null
Hook method for subclasses to perform custom cleanup or re-initialization after a process fork.
Returns
drain_events_until()
@classmethod
def drain_events_until(
p: callable,
timeout: float,
on_interval: callable
) - > any
Continuously drains events until a specific predicate is satisfied or a timeout occurs.
Parameters
| Name | Type | Description |
|---|
| p | callable | A predicate function that returns True when the desired condition is met |
| timeout | float | The maximum time in seconds to wait for the predicate to become true |
| on_interval | callable | A callback function executed periodically between event polling cycles |
Returns
| Type | Description |
|---|
any | The result returned by the drainer's event loop |
on_wait_for_pending()
@classmethod
def on_wait_for_pending(
result: [AsyncResult](../../result/asyncresult.md?sid=celery_result_asyncresult),
timeout: float
) - > null
Hook method called before entering the wait loop for a pending result.
Parameters
| Name | Type | Description |
|---|
| result | [AsyncResult](../../result/asyncresult.md?sid=celery_result_asyncresult) | The result object that is about to be waited on |
| timeout | float | The timeout value applied to the wait operation |
Returns
on_out_of_band_result()
@classmethod
def on_out_of_band_result(
message: Message
) - > null
Processes a result message received outside of the standard event loop by triggering a state change.
Parameters
| Name | Type | Description |
|---|
| message | Message | The raw message object containing the out-of-band result payload |
Returns
on_state_change()
@classmethod
def on_state_change(
meta: dict,
message: Message
) - > null
Handles updates to a task's state, updating local caches and notifying waiting buckets if the task has reached a ready state.
Parameters
| Name | Type | Description |
|---|
| meta | dict | The metadata dictionary containing task status and result data |
| message | Message | The original message object that triggered the state change |
Returns