Skip to main content

BaseResultConsumer

Manager responsible for consuming result messages.

Constructor

Signature

def BaseResultConsumer(
backend: Any,
app: Any,
accept: Any,
pending_results: Any,
pending_messages: Any
) - > null

Parameters

NameTypeDescription
backendAnyThe result backend used for retrieving task states.
appAnyThe Celery application instance.
acceptAnyContent types to accept for message serialization.
pending_resultsAnyA collection of results currently awaiting completion.
pending_messagesAnyA buffer for messages received before their corresponding result objects were registered.

Methods


start()

@classmethod
def start(
initial_task_id: string,
**kwargs: dict
) - > null

Initializes the consumer for a specific task ID. This method must be implemented by subclasses to set up the initial consumption state.

Parameters

NameTypeDescription
initial_task_idstringThe unique identifier of the first task to begin monitoring for results
**kwargsdictAdditional backend-specific configuration parameters for starting the consumer

Returns

TypeDescription
null

reconnect_on_error()

@classmethod
def reconnect_on_error() - > contextmanager

Context manager that catches connection errors and reconnects. Wraps a block of code so that any :attr:_connection_errors raised inside it trigger a call to :meth:_reconnect. If reconnection itself raises a connection error the consumer is considered unrecoverable and a :exc:RuntimeError is raised to signal that the Celery application must be restarted.

Returns

TypeDescription
contextmanagerA context manager that provides error handling and reconnection logic for the wrapped block

stop()

@classmethod
def stop() - > null

Stops the consumer and cleans up any active resources or connections.

Returns

TypeDescription
null

drain_events()

@classmethod
def drain_events(
timeout: float
) - > null

Reads and processes events from the underlying transport until a message is received or the timeout expires.

Parameters

NameTypeDescription
timeoutfloatThe maximum number of seconds to wait for an event before returning

Returns

TypeDescription
null

consume_from()

@classmethod
def consume_from(
task_id: string
) - > null

Registers a specific task ID to be included in the current consumption stream.

Parameters

NameTypeDescription
task_idstringThe unique identifier of the task whose results should be consumed

Returns

TypeDescription
null

cancel_for()

@classmethod
def cancel_for(
task_id: string
) - > null

Unregisters a task ID so that the consumer no longer listens for its result messages.

Parameters

NameTypeDescription
task_idstringThe unique identifier of the task to stop monitoring

Returns

TypeDescription
null

on_after_fork()

@classmethod
def on_after_fork() - > null

Hook method for subclasses to perform custom cleanup or re-initialization after a process fork.

Returns

TypeDescription
null

drain_events_until()

@classmethod
def drain_events_until(
p: callable,
timeout: float,
on_interval: callable
) - > any

Continuously drains events until a specific predicate is satisfied or a timeout occurs.

Parameters

NameTypeDescription
pcallableA predicate function that returns True when the desired condition is met
timeoutfloatThe maximum time in seconds to wait for the predicate to become true
on_intervalcallableA callback function executed periodically between event polling cycles

Returns

TypeDescription
anyThe result returned by the drainer's event loop

on_wait_for_pending()

@classmethod
def on_wait_for_pending(
result: [AsyncResult](../../result/asyncresult.md?sid=celery_result_asyncresult),
timeout: float
) - > null

Hook method called before entering the wait loop for a pending result.

Parameters

NameTypeDescription
result[AsyncResult](../../result/asyncresult.md?sid=celery_result_asyncresult)The result object that is about to be waited on
timeoutfloatThe timeout value applied to the wait operation

Returns

TypeDescription
null

on_out_of_band_result()

@classmethod
def on_out_of_band_result(
message: Message
) - > null

Processes a result message received outside of the standard event loop by triggering a state change.

Parameters

NameTypeDescription
messageMessageThe raw message object containing the out-of-band result payload

Returns

TypeDescription
null

on_state_change()

@classmethod
def on_state_change(
meta: dict,
message: Message
) - > null

Handles updates to a task's state, updating local caches and notifying waiting buckets if the task has reached a ready state.

Parameters

NameTypeDescription
metadictThe metadata dictionary containing task status and result data
messageMessageThe original message object that triggered the state change

Returns

TypeDescription
null