Skip to main content

Cooperative Multitasking: Eventlet and Gevent

To handle high-throughput, I/O-bound tasks efficiently, you can use cooperative multitasking with the Eventlet or Gevent worker pools. These pools use greenlets to manage concurrency, allowing a single process to handle thousands of concurrent connections without the overhead of operating system threads.

Enabling Cooperative Pools

You can enable these pools using the --pool (or -P) command-line argument when starting the worker.

# Use the Gevent pool with 1000 concurrent greenlets
celery -A proj worker -P gevent -c 1000

# Use the Eventlet pool with 1000 concurrent greenlets
celery -A proj worker -P eventlet -c 1000

Alternatively, you can configure the pool in your Celery application settings:

app.conf.update(
worker_pool='gevent',
worker_concurrency=1000,
)

How Cooperative Multitasking Works

In this project, both celery.concurrency.gevent.TaskPool and celery.concurrency.eventlet.TaskPool are marked with is_green = True. This indicates that they use cooperative multitasking rather than preemptive multitasking (like threads or processes).

Task Execution

When a task is applied to the pool, the on_apply method spawns a new greenlet.

For Gevent, the TaskPool uses gevent.pool.Pool to manage execution:

# From celery/concurrency/gevent.py
def on_apply(self, target, args=None, kwargs=None, callback=None,
accept_callback=None, timeout=None,
timeout_callback=None, apply_target=apply_target, **_):
timeout = self.timeout if timeout is None else timeout
target = self._make_killable_target(target)
# Spawns a greenlet using the gevent pool
greenlet = self._quick_put(apply_timeout if timeout else apply_target,
target, args, kwargs, callback, accept_callback,
self.getpid, timeout=timeout, timeout_callback=timeout_callback)
self._add_to_pool_map(id(greenlet), greenlet)
return greenlet

For Eventlet, the TaskPool uses eventlet.greenpool.GreenPool:

# From celery/concurrency/eventlet.py
def on_apply(self, target, args=None, kwargs=None, callback=None,
accept_callback=None, **_):
target = TaskPool._make_killable_target(target)
self._quick_apply_sig(sender=self, target=target, args=args, kwargs=kwargs,)
# Spawns a greenlet using the eventlet pool
greenlet = self._quick_put(
apply_target,
target, args,
kwargs,
callback,
accept_callback,
self.getpid
)
self._add_to_pool_map(id(greenlet), greenlet)

Scheduling and Timers

Both implementations provide a custom Timer class to ensure that scheduled tasks (like ETAs) do not block the execution of other greenlets.

  • Gevent Timer: Uses gevent.Greenlet.spawn_later to schedule execution.
  • Eventlet Timer: Uses eventlet.greenthread.spawn_after to schedule execution.

Important: Monkey Patching

For cooperative multitasking to work, the standard library (e.g., socket, select, threading) must be "monkey patched" to be non-blocking. Celery handles this automatically via maybe_patch_concurrency in celery/__init__.py.

# From celery/__init__.py
def _patch_gevent():
import gevent.monkey
gevent.monkey.patch_all()

def _patch_eventlet():
import eventlet
eventlet.monkey_patch()

Timing of Monkey Patching

Monkey patching must happen as early as possible. If you import modules like socket or threading before the patch is applied, they will remain blocking, which can cause the entire worker to hang.

The celery.concurrency.eventlet module includes a safety check that warns you if certain modules were imported before patching:

# From celery/concurrency/eventlet.py
RACE_MODS = ('billiard.', 'celery.', 'kombu.')
for mod in (mod for mod in sys.modules if mod.startswith(RACE_MODS)):
for side in ('thread', 'threading', 'socket'):
if getattr(mod, side, None):
warnings.warn(RuntimeWarning(W_RACE % side))

Eventlet Lifecycle Signals

The Eventlet pool provides specific signals that you can hook into to manage resources during the pool's lifecycle:

  • eventlet_pool_started: Sent when the pool starts.
  • eventlet_pool_preshutdown: Sent before the pool begins shutting down.
  • eventlet_pool_postshutdown: Sent after the pool has finished shutting down.
  • eventlet_pool_apply: Sent when a task is about to be applied to the pool.

Example usage:

from celery import signals

@signals.eventlet_pool_started.connect
def on_pool_start(sender, **kwargs):
print(f"Eventlet pool started with limit: {sender.limit}")

Troubleshooting and Gotchas

Signal Safety

Both Gevent and Eventlet pools have signal_safe = False. This means they do not handle OS signals (like SIGUSR1 for stack traces) in the same way as the default prefork pool. Avoid relying on signal-based process management within tasks.

Blocking Detection (Eventlet)

If your worker seems to hang, a greenlet might be performing a blocking operation that wasn't patched. You can enable Eventlet's hub blocking detection by setting the EVENTLET_NOBLOCK environment variable to a float value (the timeout in seconds).

# Enable blocking detection with a 2-second timeout
EVENTLET_NOBLOCK=2.0 celery -A proj worker -P eventlet

Gevent Task Timeouts

The Gevent TaskPool supports enforcing timeouts at the pool level using gevent.Timeout. If a task exceeds the timeout, it will be killed and the timeout_callback will be executed.

# From celery/concurrency/gevent.py
def apply_timeout(target, ..., timeout=None, timeout_callback=None, ...):
try:
with Timeout(timeout):
return apply_target(...)
except Timeout:
return timeout_callback(False, timeout)