Skip to main content

Asynchronous I/O Pools

The AsynPool implementation in celery/concurrency/asynpool.py provides a non-blocking, thread-less alternative to the standard multiprocessing pool. It is specifically designed to integrate with Celery's event loop (the Hub) to manage a pool of worker processes using asynchronous I/O. This implementation is used by the Prefork task pool when the threads option is disabled.

Core Concepts

The asynchronous pool replaces traditional blocking IPC (Inter-Process Communication) with a system of non-blocking pipes and event-driven callbacks. Instead of threads waiting on results, the main process uses the Hub to monitor file descriptors for readiness.

Worker Readiness Handshake

When a Worker process starts, it does not immediately receive tasks. It must first signal that it is ready to accept work.

  1. Loop Start: In Worker.on_loop_start, the child process puts a WORKER_UP message into its output queue.
  2. Tracking: The AsynPool tracks these processes in self._waiting_to_start.
  3. Timeout: If a process fails to send WORKER_UP within _proc_alive_timeout (defaulting to PROC_ALIVE_TIMEOUT of 4.0 seconds), the pool terminates it in verify_process_alive.
# celery/concurrency/asynpool.py

class Worker(_pool.Worker):
def on_loop_start(self, pid):
# Sends WORKER_UP to tell the parent the inqueue fd is writable.
self.outq.put((WORKER_UP, (pid,)))

Event Loop Integration

The AsynPool must be registered with the Hub to function. The register_with_event_loop method sets up the necessary readers and writers.

File Descriptor Management

The pool maintains several mappings to link file descriptors (fds) back to their respective Worker processes:

  • _fileno_to_inq: Maps the write-end of the worker's input pipe.
  • _fileno_to_outq: Maps the read-end of the worker's output pipe.
  • _fileno_to_synq: Maps the write-end of the synchronization pipe (used if synack is enabled).

Sentinel Tracking

To detect process exits immediately, the pool tracks the process "sentinel" fd. In _track_child_process, the pool uses os.dup on the sentinel to ensure it can safely unregister the fd from the event loop even after the original process object is cleaned up, preventing 100% CPU poll loops.

def _track_child_process(self, proc, hub):
try:
fd = proc._sentinel_poll
except AttributeError:
# Duplicate fd to control unregistration from epoll/poll
fd = proc._sentinel_poll = os.dup(proc._popen.sentinel)
iterate_file_descriptors_safely(
[fd], None, hub.add_reader,
self._event_process_exit, hub, proc)

Asynchronous Job Distribution

Jobs are not written to workers immediately. Instead, they are placed in an outbound_buffer (a deque).

Writing via Generators

The pool uses a generator-based approach in _write_job to write task payloads without blocking the event loop. If a write operation would block (returning EAGAIN or EWOULDBLOCK), the generator yields, allowing the event loop to handle other events before resuming the write.

def _write_job(proc, fd, job):
header, body, body_size = job._payload
# ...
while Hw < 4:
try:
Hw += send(header, Hw)
except Exception as exc:
if getattr(exc, 'errno', None) not in UNAVAIL:
raise
yield # Suspend until the fd is writable again

Scheduling Writes

The on_poll_start method is called on every event loop tick. It determines which workers are idle and which fds are writable. It then triggers schedule_writes, which cycles through ready file descriptors to distribute jobs from the outbound_buffer.

Result Handling

The ResultHandler class manages the asynchronous reception of task results and state updates (like WORKER_UP).

Coroutine-based Reading

Similar to the writing process, ResultHandler._recv_message is a generator that reads from the worker's output pipe in chunks. It reads a 4-byte header to determine the message size, then reads the body.

def _recv_message(self, add_reader, fd, callback, ...):
# ... read header ...
body_size, = unpack_from('>i', bufv)
# ... read body ...
while Br < body_size:
try:
n = __read__(fd, bufv[Br:], body_size - Br)
except OSError as exc:
if exc.errno not in UNAVAIL:
raise
yield # Wait for more data
# ...
callback(message)

Error Recovery and Maintenance

The pool is designed to be resilient to process failures and stale file descriptors.

  • maintain_pool: A timer-based handler (running every 5 seconds) that ensures the required number of processes are running.
  • on_partial_read: If a process dies while a job is being written to it, the pool attempts to recover the job and put it back into the outbound_buffer if it hasn't been accepted yet.
  • iterate_file_descriptors_safely: This utility is used throughout the pool to apply Hub methods (like add_reader). If an OSError occurs (indicating a stale fd), the utility automatically removes the fd from the pool's internal tracking sets.
def on_partial_read(self, job, proc):
if not job._accepted:
# Job was not acked, find another worker
self._put_back(job)
# ... replace queues to avoid reuse of damaged sockets ...