Skip to main content

RPCBackend

Base class for the RPC result backend.

Attributes

AttributeTypeDescription
Exchangetype = kombu.ExchangeThe kombu.Exchange class used to define the message exchange for results.
Producertype = kombu.ProducerThe kombu.Producer class used to publish result messages to the broker.
ResultConsumertype = ResultConsumerThe consumer class responsible for managing result message consumption.
BacklogLimitExceededtype = BacklogLimitExceededException raised when there are too many messages for a task id.
persistentboolean = falseBoolean flag indicating if result messages should be persistent on the broker.
supports_autoexpireboolean = trueIndicates that this backend supports automatic expiration of result queues.
supports_native_joinboolean = trueIndicates that this backend supports native join operations for task results.
retry_policydict = {'max_retries': 20, 'interval_start': 0, 'interval_step': 1, 'interval_max': 1}Dictionary defining the retry behavior for publishing results, including max retries and intervals.
pollcallable = get_task_metaAlias for the get_task_meta method used for polling the queue for task updates.

Constructor

Signature

def RPCBackend(
app: [Celery](../../app/base/celery.md?sid=celery_app_base_celery),
connection: kombu.Connection = None,
exchange: str = None,
exchange_type: str = None,
persistent: bool = None,
serializer: str = None,
auto_delete: bool = True,
kwargs: dict
) - > null

Parameters

NameTypeDescription
app[Celery](../../app/base/celery.md?sid=celery_app_base_celery)The Celery application instance associated with this backend.
connectionkombu.Connection = NoneThe connection to the message broker.
exchangestr = NoneThe name of the exchange to use for results.
exchange_typestr = NoneThe type of exchange (e.g., 'direct', 'topic').
persistentbool = NoneWhether the result messages should be persistent.
serializerstr = NoneThe default serialization method for results.
auto_deletebool = TrueWhether to automatically delete the result queue.
kwargsdictAdditional keyword arguments passed to the base Backend class.

Methods


ensure_chords_allowed()

@classmethod
def ensure_chords_allowed()

Validates if the backend supports chord primitives; raises NotImplementedError as RPC backends do not support chords.


on_task_call()

@classmethod
def on_task_call(
producer: kombu.Producer,
task_id: str
)

Prepares the reply queue before a task is sent, ensuring the consumer is ready to receive the result.

Parameters

NameTypeDescription
producerkombu.ProducerThe producer instance used to send the task message
task_idstrThe unique identifier of the task being called

destination_for()

@classmethod
def destination_for(
task_id: str,
request: celery.app.task.Context
) - > Tuple[str, str]

Get the destination for result by task id.

Parameters

NameTypeDescription
task_idstrThe unique identifier of the task
requestcelery.app.task.ContextThe task request context containing routing information

Returns

TypeDescription
Tuple[str, str]A tuple containing the reply_to queue name and the correlation_id

on_reply_declare()

@classmethod
def on_reply_declare(
task_id: str
) - > null

Returns the declaration parameters for the producer when sending a result; defaults to no-op for RPC.

Parameters

NameTypeDescription
task_idstrThe unique identifier of the task

Returns

TypeDescription
nullAlways returns None as no explicit declaration is required during result publishing

on_result_fulfilled()

@classmethod
def on_result_fulfilled(
result: any
)

Performs cleanup actions after a result is successfully received; no-op for RPC as queues are process-specific.

Parameters

NameTypeDescription
resultanyThe fulfilled result object

as_uri()

@classmethod
def as_uri(
include_password: bool
) - > str

Returns the string representation of the backend connection URI.

Parameters

NameTypeDescription
include_passwordboolWhether to include the password in the URI string

Returns

TypeDescription
strThe fixed URI string 'rpc://'

store_result()

@classmethod
def store_result(
task_id: str,
result: any,
state: str,
traceback: str,
request: celery.app.task.Context
) - > any

Send task return value and state.

Parameters

NameTypeDescription
task_idstrThe unique identifier of the task
resultanyThe return value of the task to be stored
statestrThe current execution state of the task
tracebackstrThe stack trace if the task failed
requestcelery.app.task.ContextThe task request context used to determine the reply destination

Returns

TypeDescription
anyThe original result value that was stored

on_out_of_band_result()

@classmethod
def on_out_of_band_result(
task_id: str,
message: kombu.Message
)

Handles results received for tasks that are not currently marked as pending by buffering them for future retrieval.

Parameters

NameTypeDescription
task_idstrThe unique identifier of the task associated with the message
messagekombu.MessageThe raw message object received from the broker

get_task_meta()

@classmethod
def get_task_meta(
task_id: str,
backlog_limit: int
) - > dict

Fetches the current state and result for a task by polling the reply queue and checking the local buffer.

Parameters

NameTypeDescription
task_idstrThe unique identifier of the task to query
backlog_limitintThe maximum number of messages to consume from the queue before raising a limit error

Returns

TypeDescription
dictA dictionary containing the task metadata including status and result

revive()

@classmethod
def revive(
channel: kombu.Channel
)

Re-establishes the backend connection state; no-op for the RPC backend.

Parameters

NameTypeDescription
channelkombu.ChannelThe broker channel to use for revival

reload_task_result()

@classmethod
def reload_task_result(
task_id: str
)

Attempts to reload a task result; raises NotImplementedError as this operation is not supported by the RPC backend.

Parameters

NameTypeDescription
task_idstrThe unique identifier of the task

reload_group_result()

@classmethod
def reload_group_result(
task_id: str
)

Reload group result, even if it has been previously fetched.

Parameters

NameTypeDescription
task_idstrThe unique identifier of the group

save_group()

@classmethod
def save_group(
group_id: str,
result: any
)

Attempts to save a group result; raises NotImplementedError as this operation is not supported by the RPC backend.

Parameters

NameTypeDescription
group_idstrThe unique identifier of the group
resultanyThe group result to save

restore_group()

@classmethod
def restore_group(
group_id: str,
cache: bool
)

Attempts to restore a group result; raises NotImplementedError as this operation is not supported by the RPC backend.

Parameters

NameTypeDescription
group_idstrThe unique identifier of the group
cacheboolWhether to use the local cache for restoration

delete_group()

@classmethod
def delete_group(
group_id: str
)

Attempts to delete a group result; raises NotImplementedError as this operation is not supported by the RPC backend.

Parameters

NameTypeDescription
group_idstrThe unique identifier of the group to delete

binding()

@classmethod
def binding() - > RPCBackend.Queue

Defines the queue binding configuration used for receiving results on the process-specific queue.

Returns

TypeDescription
RPCBackend.QueueA configured Queue object for the backend's OID

oid()

@classmethod
def oid() - > str

Provides the unique identifier for the current application thread, used as the queue name for results.

Returns

TypeDescription
strThe thread-specific unique identifier