Base class for the RPC result backend.
Attributes
| Attribute | Type | Description |
|---|
| Exchange | type = kombu.Exchange | The kombu.Exchange class used to define the message exchange for results. |
| Producer | type = kombu.Producer | The kombu.Producer class used to publish result messages to the broker. |
| ResultConsumer | type = ResultConsumer | The consumer class responsible for managing result message consumption. |
| BacklogLimitExceeded | type = BacklogLimitExceeded | Exception raised when there are too many messages for a task id. |
| persistent | boolean = false | Boolean flag indicating if result messages should be persistent on the broker. |
| supports_autoexpire | boolean = true | Indicates that this backend supports automatic expiration of result queues. |
| supports_native_join | boolean = true | Indicates that this backend supports native join operations for task results. |
| retry_policy | dict = {'max_retries': 20, 'interval_start': 0, 'interval_step': 1, 'interval_max': 1} | Dictionary defining the retry behavior for publishing results, including max retries and intervals. |
| poll | callable = get_task_meta | Alias for the get_task_meta method used for polling the queue for task updates. |
Constructor
Signature
def RPCBackend(
app: [Celery](../../app/base/celery.md?sid=celery_app_base_celery),
connection: kombu.Connection = None,
exchange: str = None,
exchange_type: str = None,
persistent: bool = None,
serializer: str = None,
auto_delete: bool = True,
kwargs: dict
) - > null
Parameters
| Name | Type | Description |
|---|
| app | [Celery](../../app/base/celery.md?sid=celery_app_base_celery) | The Celery application instance associated with this backend. |
| connection | kombu.Connection = None | The connection to the message broker. |
| exchange | str = None | The name of the exchange to use for results. |
| exchange_type | str = None | The type of exchange (e.g., 'direct', 'topic'). |
| persistent | bool = None | Whether the result messages should be persistent. |
| serializer | str = None | The default serialization method for results. |
| auto_delete | bool = True | Whether to automatically delete the result queue. |
| kwargs | dict | Additional keyword arguments passed to the base Backend class. |
Methods
ensure_chords_allowed()
@classmethod
def ensure_chords_allowed()
Validates if the backend supports chord primitives; raises NotImplementedError as RPC backends do not support chords.
on_task_call()
@classmethod
def on_task_call(
producer: kombu.Producer,
task_id: str
)
Prepares the reply queue before a task is sent, ensuring the consumer is ready to receive the result.
Parameters
| Name | Type | Description |
|---|
| producer | kombu.Producer | The producer instance used to send the task message |
| task_id | str | The unique identifier of the task being called |
destination_for()
@classmethod
def destination_for(
task_id: str,
request: celery.app.task.Context
) - > Tuple[str, str]
Get the destination for result by task id.
Parameters
| Name | Type | Description |
|---|
| task_id | str | The unique identifier of the task |
| request | celery.app.task.Context | The task request context containing routing information |
Returns
| Type | Description |
|---|
Tuple[str, str] | A tuple containing the reply_to queue name and the correlation_id |
on_reply_declare()
@classmethod
def on_reply_declare(
task_id: str
) - > null
Returns the declaration parameters for the producer when sending a result; defaults to no-op for RPC.
Parameters
| Name | Type | Description |
|---|
| task_id | str | The unique identifier of the task |
Returns
| Type | Description |
|---|
null | Always returns None as no explicit declaration is required during result publishing |
on_result_fulfilled()
@classmethod
def on_result_fulfilled(
result: any
)
Performs cleanup actions after a result is successfully received; no-op for RPC as queues are process-specific.
Parameters
| Name | Type | Description |
|---|
| result | any | The fulfilled result object |
as_uri()
@classmethod
def as_uri(
include_password: bool
) - > str
Returns the string representation of the backend connection URI.
Parameters
| Name | Type | Description |
|---|
| include_password | bool | Whether to include the password in the URI string |
Returns
| Type | Description |
|---|
str | The fixed URI string 'rpc://' |
store_result()
@classmethod
def store_result(
task_id: str,
result: any,
state: str,
traceback: str,
request: celery.app.task.Context
) - > any
Send task return value and state.
Parameters
| Name | Type | Description |
|---|
| task_id | str | The unique identifier of the task |
| result | any | The return value of the task to be stored |
| state | str | The current execution state of the task |
| traceback | str | The stack trace if the task failed |
| request | celery.app.task.Context | The task request context used to determine the reply destination |
Returns
| Type | Description |
|---|
any | The original result value that was stored |
on_out_of_band_result()
@classmethod
def on_out_of_band_result(
task_id: str,
message: kombu.Message
)
Handles results received for tasks that are not currently marked as pending by buffering them for future retrieval.
Parameters
| Name | Type | Description |
|---|
| task_id | str | The unique identifier of the task associated with the message |
| message | kombu.Message | The raw message object received from the broker |
@classmethod
def get_task_meta(
task_id: str,
backlog_limit: int
) - > dict
Fetches the current state and result for a task by polling the reply queue and checking the local buffer.
Parameters
| Name | Type | Description |
|---|
| task_id | str | The unique identifier of the task to query |
| backlog_limit | int | The maximum number of messages to consume from the queue before raising a limit error |
Returns
| Type | Description |
|---|
dict | A dictionary containing the task metadata including status and result |
revive()
@classmethod
def revive(
channel: kombu.Channel
)
Re-establishes the backend connection state; no-op for the RPC backend.
Parameters
| Name | Type | Description |
|---|
| channel | kombu.Channel | The broker channel to use for revival |
reload_task_result()
@classmethod
def reload_task_result(
task_id: str
)
Attempts to reload a task result; raises NotImplementedError as this operation is not supported by the RPC backend.
Parameters
| Name | Type | Description |
|---|
| task_id | str | The unique identifier of the task |
reload_group_result()
@classmethod
def reload_group_result(
task_id: str
)
Reload group result, even if it has been previously fetched.
Parameters
| Name | Type | Description |
|---|
| task_id | str | The unique identifier of the group |
save_group()
@classmethod
def save_group(
group_id: str,
result: any
)
Attempts to save a group result; raises NotImplementedError as this operation is not supported by the RPC backend.
Parameters
| Name | Type | Description |
|---|
| group_id | str | The unique identifier of the group |
| result | any | The group result to save |
restore_group()
@classmethod
def restore_group(
group_id: str,
cache: bool
)
Attempts to restore a group result; raises NotImplementedError as this operation is not supported by the RPC backend.
Parameters
| Name | Type | Description |
|---|
| group_id | str | The unique identifier of the group |
| cache | bool | Whether to use the local cache for restoration |
delete_group()
@classmethod
def delete_group(
group_id: str
)
Attempts to delete a group result; raises NotImplementedError as this operation is not supported by the RPC backend.
Parameters
| Name | Type | Description |
|---|
| group_id | str | The unique identifier of the group to delete |
binding()
@classmethod
def binding() - > RPCBackend.Queue
Defines the queue binding configuration used for receiving results on the process-specific queue.
Returns
| Type | Description |
|---|
RPCBackend.Queue | A configured Queue object for the backend's OID |
oid()
@classmethod
def oid() - > str
Provides the unique identifier for the current application thread, used as the queue name for results.
Returns
| Type | Description |
|---|
str | The thread-specific unique identifier |