Skip to main content

Ephemeral Results via RPC

The RPC backend in Celery provides a high-performance mechanism for transient result delivery. Unlike traditional backends that persist results in a database (like Redis or SQLAlchemy), the RPC backend treats results as ephemeral messages routed through the AMQP broker. This design prioritizes low latency and reduced infrastructure overhead at the cost of long-term result persistence.

The "One Queue per Client" Model

The core design principle of the RPC backend is the use of a single, dedicated queue for each client process or thread. This is managed through the oid (Object ID) property in RPCBackend, which maps directly to self.app.thread_oid.

@property
def binding(self):
return self.Queue(
self.oid, self.exchange, self.oid,
durable=True,
auto_delete=True,
expires=self.expires,
)

@cached_property
def oid(self):
# cached here is the app thread OID: name of queue we receive results on.
return self.app.thread_oid

By using a thread-local OID, the backend ensures that results are delivered back to the specific entity that requested them. This avoids the "thundering herd" problem where multiple clients might attempt to poll the same result set, as each client only listens to its own unique queue.

Routing via Reply-To and Correlation ID

When a task is called using the RPC backend, Celery leverages standard AMQP headers to facilitate the return trip. The RPCBackend.destination_for method extracts the reply_to and correlation_id from the task request.

  • reply_to: The name of the client's unique OID queue.
  • correlation_id: A unique identifier (usually the task_id) used by the client to match the incoming result message to the pending task.

In RPCBackend.store_result, the worker publishes the result directly to the anonymous exchange using these parameters:

def store_result(self, task_id, result, state,
traceback=None, request=None, **kwargs):
routing_key, correlation_id = self.destination_for(task_id, request)
if not routing_key:
return
with self.app.amqp.producer_pool.acquire(block=True) as producer:
producer.publish(
self._to_result(task_id, state, result, traceback, request),
exchange=self.exchange,
routing_key=routing_key,
correlation_id=correlation_id,
serializer=self.serializer,
retry=True, retry_policy=self.retry_policy,
declare=self.on_reply_declare(task_id),
delivery_mode=self.delivery_mode,
)
return result

Because persistent is set to False by default in RPCBackend, the delivery_mode is typically 1 (transient), meaning results exist only in memory within the broker and are lost if the broker restarts or the queue is deleted.

Result Consumption and Reconnection

The ResultConsumer class manages the lifecycle of the connection used to retrieve results. It is designed to be resilient, implementing a _reconnect method that rebuilds the consumer and re-subscribes to all previously active queues if the connection to the broker is lost.

def _reconnect(self):
# ... (closes stale connection)
self._connection = self.app.connection()
self._consumer = self.Consumer(
self._connection.default_channel,
old_queues,
callbacks=[self.on_state_change],
no_ack=self._no_ack,
accept=self.accept,
)
self._consumer.consume()

This ensures that even in unstable network conditions, the client can continue to drain results from its ephemeral queues once the connection is restored.

Trade-offs and Constraints

The RPC backend's reliance on message queues for state management introduces several specific constraints:

No Chord Support

The RPC backend does not support chords (groups of tasks with a callback). Chords require a persistent shared state to track the completion of group members, which the ephemeral nature of RPC queues cannot provide. The ensure_chords_allowed method explicitly raises a NotImplementedError.

Backlog Limits

When polling for results via get_task_meta, the backend "slurps" messages from the queue. To prevent a single client from being overwhelmed by a massive influx of results, a backlog_limit (defaulting to 1000) is enforced.

def _slurp_from_queue(self, task_id, accept, limit=1000, no_ack=False):
# ...
for _ in range(limit):
msg = binding.get(accept=accept, no_ack=no_ack)
if not msg:
break
yield msg
else:
raise self.BacklogLimitExceeded(task_id)

If the limit is exceeded, the backend raises BacklogLimitExceeded, signaling that the client cannot keep up with the volume of incoming results.

Out-of-Band Results

Because one queue handles all results for a client, a poll for task_A might encounter a message for task_B. The RPCBackend handles this by storing these "out-of-band" results in an internal buffer (self._out_of_band), ensuring they are available when the client eventually requests the status of task_B.

def get_task_meta(self, task_id, backlog_limit=1000):
buffered = self._out_of_band.pop(task_id, None)
if buffered:
return self._set_cache_by_message(task_id, buffered)
# ... (slurps from queue and populates _out_of_band for other IDs)

This mechanism allows the RPC backend to simulate the behavior of a persistent store while remaining entirely message-driven.