Thread-based Concurrency
The thread-based concurrency implementation in Celery provides an execution pool that utilizes Python's standard concurrent.futures.ThreadPoolExecutor. This pool is specifically designed for I/O-bound tasks where multiple operations can progress concurrently within a single process, avoiding the overhead of process creation associated with the default prefork pool.
Core Components
The implementation resides in celery/concurrency/thread.py and consists of two primary classes that bridge Celery's internal pool interface with the standard library's threading tools.
TaskPool
The TaskPool class is the main entry point for thread-based execution. It inherits from BasePool and initializes a ThreadPoolExecutor using the concurrency limit provided during worker startup.
class TaskPool(BasePool):
"""Thread Task Pool."""
limit: int
body_can_be_buffer = True
signal_safe = False
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.executor = ThreadPoolExecutor(max_workers=self.limit)
Key characteristics of TaskPool:
- Concurrency Limit: The
max_workersof the underlying executor is set by thelimitattribute (configured viaworker_concurrency). - Signal Safety: It is explicitly marked as
signal_safe = False. This means it cannot be safely manipulated from within signal handlers, which is a significant difference from the prefork implementation.
ApplyResult
When a task is submitted to the pool, TaskPool.on_apply returns an ApplyResult instance. This class acts as a thin wrapper around a concurrent.futures.Future object, providing a Celery-compatible interface for result retrieval.
class ApplyResult:
def __init__(self, future: Future) -> None:
self.f = future
self.get = self.f.result
def wait(self, timeout: float | None = None) -> None:
wait([self.f], timeout)
By mapping get to self.f.result, ApplyResult allows the rest of the Celery worker to interact with thread-based results using the same API as other pool types.
Task Execution Flow
Task submission is handled by the on_apply method. It uses the apply_target utility function to execute the task within the thread pool.
def on_apply(
self,
target: TargetFunction,
args: tuple[Any, ...] | None = None,
kwargs: dict[str, Any] | None = None,
callback: Callable[..., Any] | None = None,
accept_callback: Callable[..., Any] | None = None,
**_: Any
) -> ApplyResult:
f = self.executor.submit(apply_target, target, args, kwargs,
callback, accept_callback)
return ApplyResult(f)
The apply_target function (imported from celery.concurrency.base) is responsible for the actual execution of the task function and the invocation of success/failure callbacks.
Pool Lifecycle and Shutdown
The TaskPool manages the lifecycle of its threads through the on_stop method. When the worker shuts down, it calls executor.shutdown(cancel_futures=True).
def on_stop(self) -> None:
self.executor.shutdown(cancel_futures=True)
super().on_stop()
Setting cancel_futures=True ensures that any tasks that have been submitted to the pool but have not yet started execution are cancelled immediately. This behavior is verified in t/unit/concurrency/test_thread.py, where a pending task's future is checked for cancellation after the pool is stopped.
Configuration and Usage
The thread pool is registered under the alias threads in celery/concurrency/__init__.py. It is only available if the concurrent.futures module can be imported (which is standard in Python 3).
To use the thread-based pool, you can start the worker with the --pool argument:
celery -A proj worker --pool=threads --concurrency=10
Alternatively, it can be configured in the Celery application settings:
app.conf.worker_pool = 'threads'
app.conf.worker_concurrency = 10
Implementation Caveats
While the thread pool is efficient for I/O-bound workloads, it has specific limitations compared to the default prefork pool:
- No Task Termination: Unlike the prefork pool, the thread pool does not support terminating individual tasks (e.g., via
terminate_job). This is because Python does not provide a safe way to kill a running thread. - Global Interpreter Lock (GIL): Because all tasks run within the same process, CPU-bound tasks will be limited by the GIL. For CPU-intensive workloads, the
preforkpool is generally preferred. - Shared Memory: Since all threads share the same memory space, developers must ensure that tasks are thread-safe, especially when accessing shared resources or global variables.
- Django Integration: The codebase includes specific logic in
celery/fixups/django.pythat checks the pool type. For example, certain database connection management logic that is required for the prefork pool (to handle fork-safety) is skipped when using the thread pool.