Asynchronous I/O Pools
The AsynPool implementation in celery/concurrency/asynpool.py provides a non-blocking, thread-less alternative to the standard multiprocessing pool. It is specifically designed to integrate with Celery's event loop (the Hub) to manage a pool of worker processes using asynchronous I/O. This implementation is used by the Prefork task pool when the threads option is disabled.
Core Concepts
The asynchronous pool replaces traditional blocking IPC (Inter-Process Communication) with a system of non-blocking pipes and event-driven callbacks. Instead of threads waiting on results, the main process uses the Hub to monitor file descriptors for readiness.
Worker Readiness Handshake
When a Worker process starts, it does not immediately receive tasks. It must first signal that it is ready to accept work.
- Loop Start: In
Worker.on_loop_start, the child process puts aWORKER_UPmessage into its output queue. - Tracking: The
AsynPooltracks these processes inself._waiting_to_start. - Timeout: If a process fails to send
WORKER_UPwithin_proc_alive_timeout(defaulting toPROC_ALIVE_TIMEOUTof 4.0 seconds), the pool terminates it inverify_process_alive.
# celery/concurrency/asynpool.py
class Worker(_pool.Worker):
def on_loop_start(self, pid):
# Sends WORKER_UP to tell the parent the inqueue fd is writable.
self.outq.put((WORKER_UP, (pid,)))
Event Loop Integration
The AsynPool must be registered with the Hub to function. The register_with_event_loop method sets up the necessary readers and writers.
File Descriptor Management
The pool maintains several mappings to link file descriptors (fds) back to their respective Worker processes:
_fileno_to_inq: Maps the write-end of the worker's input pipe._fileno_to_outq: Maps the read-end of the worker's output pipe._fileno_to_synq: Maps the write-end of the synchronization pipe (used ifsynackis enabled).
Sentinel Tracking
To detect process exits immediately, the pool tracks the process "sentinel" fd. In _track_child_process, the pool uses os.dup on the sentinel to ensure it can safely unregister the fd from the event loop even after the original process object is cleaned up, preventing 100% CPU poll loops.
def _track_child_process(self, proc, hub):
try:
fd = proc._sentinel_poll
except AttributeError:
# Duplicate fd to control unregistration from epoll/poll
fd = proc._sentinel_poll = os.dup(proc._popen.sentinel)
iterate_file_descriptors_safely(
[fd], None, hub.add_reader,
self._event_process_exit, hub, proc)
Asynchronous Job Distribution
Jobs are not written to workers immediately. Instead, they are placed in an outbound_buffer (a deque).
Writing via Generators
The pool uses a generator-based approach in _write_job to write task payloads without blocking the event loop. If a write operation would block (returning EAGAIN or EWOULDBLOCK), the generator yields, allowing the event loop to handle other events before resuming the write.
def _write_job(proc, fd, job):
header, body, body_size = job._payload
# ...
while Hw < 4:
try:
Hw += send(header, Hw)
except Exception as exc:
if getattr(exc, 'errno', None) not in UNAVAIL:
raise
yield # Suspend until the fd is writable again
Scheduling Writes
The on_poll_start method is called on every event loop tick. It determines which workers are idle and which fds are writable. It then triggers schedule_writes, which cycles through ready file descriptors to distribute jobs from the outbound_buffer.
Result Handling
The ResultHandler class manages the asynchronous reception of task results and state updates (like WORKER_UP).
Coroutine-based Reading
Similar to the writing process, ResultHandler._recv_message is a generator that reads from the worker's output pipe in chunks. It reads a 4-byte header to determine the message size, then reads the body.
def _recv_message(self, add_reader, fd, callback, ...):
# ... read header ...
body_size, = unpack_from('>i', bufv)
# ... read body ...
while Br < body_size:
try:
n = __read__(fd, bufv[Br:], body_size - Br)
except OSError as exc:
if exc.errno not in UNAVAIL:
raise
yield # Wait for more data
# ...
callback(message)
Error Recovery and Maintenance
The pool is designed to be resilient to process failures and stale file descriptors.
maintain_pool: A timer-based handler (running every 5 seconds) that ensures the required number of processes are running.on_partial_read: If a process dies while a job is being written to it, the pool attempts to recover the job and put it back into theoutbound_bufferif it hasn't been accepted yet.iterate_file_descriptors_safely: This utility is used throughout the pool to applyHubmethods (likeadd_reader). If anOSErroroccurs (indicating a stale fd), the utility automatically removes the fd from the pool's internal tracking sets.
def on_partial_read(self, job, proc):
if not job._accepted:
# Job was not acked, find another worker
self._put_back(job)
# ... replace queues to avoid reuse of damaged sockets ...