Dynamic Pool Scaling
To dynamically adjust the number of worker processes based on the current workload, use the Autoscaler component. This allows the worker to scale up to a maximum concurrency when tasks are queued and scale down to a minimum concurrency during idle periods.
Enabling Autoscaling
Enable the autoscaler by providing the --autoscale option when starting the worker. This option accepts the maximum and minimum number of processes.
celery -A proj worker --autoscale=10,3
In this example, the worker starts with 3 processes and can grow up to 10 processes as the number of reserved tasks increases.
How the Autoscaler Works
The Autoscaler (found in celery.worker.autoscale) monitors the number of tasks currently reserved by the worker. It compares this against the current pool size to decide whether to grow or shrink the pool.
The core logic resides in the _maybe_scale method:
# celery/worker/autoscale.py
def _maybe_scale(self, req=None):
procs = self.processes
cur = min(self.qty, self.max_concurrency)
if cur > procs:
self.scale_up(cur - procs)
return True
cur = max(self.qty, self.min_concurrency)
if cur < procs:
self.scale_down(procs - cur)
return True
- Scaling Up: Occurs immediately when
qty(the number of reserved requests incelery.worker.state.reserved_requests) exceeds the current number of processes. - Scaling Down: Occurs only if the
keepaliveperiod has passed since the last scale-up event. This prevents "thrashing" (rapidly adding and removing processes).
Adjusting Scaling Limits at Runtime
You can modify the autoscale limits without restarting the worker using remote control commands. This triggers the Autoscaler.update method, which adjusts the concurrency limits and immediately scales the pool if necessary.
celery -A proj control autoscale 15 5
The implementation in celery/worker/control.py handles this command:
@control_command(
args=[('max', int), ('min', int)],
signature='[max [min]]',
)
def autoscale(state, max=None, min=None):
"""Modify autoscale settings."""
autoscaler = state.consumer.controller.autoscaler
if autoscaler:
max_, min_ = autoscaler.update(max, min)
return ok(f'autoscale now max={max_} min={min_}')
raise ValueError('Autoscale not enabled')
Configuring the Keepalive Period
The keepalive period determines how long the autoscaler waits before scaling down after a scale-up event. By default, this is 30 seconds. You can override this using the AUTOSCALE_KEEPALIVE environment variable:
export AUTOSCALE_KEEPALIVE=60
celery -A proj worker --autoscale=10,3
Integration with the Worker Lifecycle
The WorkerComponent bootstep manages the lifecycle of the autoscaler. Depending on whether the worker is using an event loop (like gevent or eventlet) or standard threading, it registers the scaling logic differently:
# celery/worker/autoscale.py
def register_with_event_loop(self, w, hub):
# Trigger scaling check on every new task message
w.consumer.on_task_message.add(w.autoscaler.maybe_scale)
# Periodically check scaling status
hub.call_repeatedly(
w.autoscaler.keepalive, w.autoscaler.maybe_scale,
)
Troubleshooting and Caveats
Scale-Down Delays
If the worker does not scale down immediately after tasks are finished, it is likely waiting for the keepalive timer. The timer resets every time the worker scales up.
Busy Process Protection
The autoscaler will not scale down if all processes in the pool are currently busy. The _shrink method catches the ValueError raised by the pool and logs a debug message:
def _shrink(self, n):
info('Scaling down %s processes.', n)
try:
self.pool.shrink(n)
except ValueError:
debug("Autoscaler won't scale down: all processes busy.")
Manual Scaling Conflicts
When the autoscaler is enabled, manual pool adjustment commands like pool_grow or pool_shrink should be avoided. Use the autoscale remote control command instead to ensure the Autoscaler component remains synchronized with the pool state.