Consumer and Event Loop Architecture
The consumer architecture in Celery is built on a modular system of "bootsteps" that manage the lifecycle of message consumption, broker connectivity, and the event loop. The central coordinator for these operations is the Consumer class, which orchestrates several sub-components through a dedicated Blueprint.
The Consumer Blueprint
The Consumer (found in celery.worker.consumer.consumer) does not operate as a monolithic entity. Instead, it uses a Blueprint to define a graph of execution steps. This allows for a highly decoupled architecture where features like gossip, heartbeats, and remote control are independent components.
The default steps in the Blueprint include:
- Connection: Manages the broker connection.
- Events: Sends worker-related events (e.g., worker-online).
- Heart: Sends AMQP heartbeats to keep the connection alive.
- Control: Handles remote control commands (e.g.,
celery inspect). - Tasks: The core step that starts consuming from queues.
- Evloop: The final step that starts the I/O event loop.
class Blueprint(bootsteps.Blueprint):
"""Consumer blueprint."""
name = 'Consumer'
default_steps = [
'celery.worker.consumer.connection:Connection',
'celery.worker.consumer.mingle:Mingle',
'celery.worker.consumer.events:Events',
'celery.worker.consumer.gossip:Gossip',
'celery.worker.consumer.heart:Heart',
'celery.worker.consumer.control:Control',
'celery.worker.consumer.tasks:Tasks',
'celery.worker.consumer.delayed_delivery:DelayedDelivery',
'celery.worker.consumer.consumer:Evloop',
'celery.worker.consumer.agent:Agent',
]
Event Loop Architecture
The worker supports both synchronous and asynchronous event loops, depending on the transport and concurrency pool used.
The Hub and Timer
The Hub (in celery.worker.components) is the abstraction for the event loop, typically powered by kombu.asynchronous.Hub. It manages file descriptor registration and callbacks. The Timer component provides the scheduling mechanism for ETAs and periodic tasks like heartbeats.
If the worker is configured to use an event loop (e.g., with the amqp transport), the Hub component is initialized:
class Hub(bootsteps.StartStopStep):
"""Worker starts the event loop."""
requires = (Timer,)
def create(self, w):
w.hub = get_event_loop()
# ... patches thread primitives for greenlet safety if needed
return self
Evloop Bootstep
The Evloop bootstep is always the last step to start in the Consumer blueprint. Its job is to hand over control to the actual loop implementation.
class Evloop(bootsteps.StartStopStep):
label = 'event loop'
last = True
def start(self, c):
self.patch_all(c)
c.loop(*c.loop_args())
The Consumer selects the loop type in its __init__:
- asynloop: A non-blocking loop used when a
Hubis available. - synloop: A blocking fallback loop for transports that do not support asynchronous I/O.
Connection Management and Retries
The Consumer manages the broker connection lifecycle, including initial connection and automatic recovery.
Connection Recovery
When a connection is lost, the Consumer.start() method catches recoverable errors and triggers a restart of the blueprint.
def start(self):
blueprint = self.blueprint
while blueprint.state not in STOP_CONDITIONS:
# ... retry logic ...
try:
blueprint.start(self)
except recoverable_errors as exc:
if blueprint.state not in STOP_CONDITIONS:
self.on_connection_error_after_connected(exc)
self.on_close()
blueprint.restart(self)
Prefetch Reduction Mechanism
A critical feature during reconnection is the temporary reduction of the prefetch count. If a worker was processing tasks when the connection was lost, those tasks are still in memory. To prevent the worker from being overwhelmed by new tasks immediately upon reconnecting, Celery reduces the prefetch count based on the number of active requests.
The on_connection_error_after_connected method calculates the reduced count:
active_count = len(active_requests)
self.initial_prefetch_count = max(
self.prefetch_multiplier,
self.max_prefetch_count - active_count * self.prefetch_multiplier
)
The prefetch count is then gradually restored to its maximum value as tasks are acknowledged via _restore_prefetch_count_after_connection_restart.
Task Handling and QoS
The Consumer uses "strategies" to determine how different task types should be handled. When a message is received, the create_task_handler returns a callback that routes the message to the appropriate strategy.
Quality of Service (QoS)
QoS manages the prefetch limit (how many unacknowledged messages the worker can hold). In the asynloop, QoS updates are deferred until the loop is idle to group multiple updates together and prioritize control commands over task messages.
# Inside asynloop (celery/worker/loops.py)
if qos.prev != qos.value:
update_qos()
Rate Limiting
Rate limiting is implemented using a TokenBucket for each task type. If a task exceeds its rate limit, it is held in a buffer and scheduled for later execution using the Timer.
def _limit_task(self, request, bucket, tokens):
bucket.add((request, tokens))
return self._schedule_bucket_request(bucket)
This architecture ensures that the worker remains responsive to remote control commands and heartbeats even under heavy task load or during connection instability.