Skip to main content

Building a Custom Task Pool

In this tutorial, you will build a custom task pool by extending the BasePool class. You will create a LoggingPool that logs task execution details and provides custom statistics to the Celery worker.

Prerequisites

  • Celery installed in your environment.
  • A basic Celery application defined (e.g., in tasks.py).

Step 1: Define the Custom Pool Class

All Celery task pools must inherit from celery.concurrency.base.BasePool. This class defines the interface the worker uses to manage execution.

Create a file named my_pool.py:

import logging
from celery.concurrency.base import BasePool, apply_target

logger = logging.getLogger(__name__)

class LoggingPool(BasePool):
"""A custom pool that logs task execution."""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# The limit attribute defines the concurrency level
logger.info(f"LoggingPool initialized with limit: {self.limit}")

By inheriting from BasePool, your class gains access to lifecycle management methods and the apply_async entry point used by the worker.

Step 2: Implement Lifecycle Methods

The worker calls on_start when the pool is initialized and on_stop during shutdown. You can use these to set up and tear down resources.

    def on_start(self):
"""Called when the worker starts the pool."""
logger.info("LoggingPool is starting up...")

def on_stop(self):
"""Called when the worker is shutting down."""
logger.info("LoggingPool is shutting down...")

Step 3: Implement Task Execution Logic

The on_apply method is where the actual task execution happens. Celery provides a helper function apply_target in celery.concurrency.base that handles the execution of the task function, manages callbacks (for success and failure), and ensures error propagation.

    def on_apply(self, target, args=None, kwargs=None, callback=None,
accept_callback=None, **options):
"""Execute the task."""
logger.info(f"Executing task: {target.__name__} with args: {args}")

# We use the built-in apply_target to handle the heavy lifting
return apply_target(
target, args, kwargs,
callback=callback,
accept_callback=accept_callback,
**options
)

In this implementation, on_apply logs the task name before delegating the execution to apply_target. apply_target is responsible for calling accept_callback (to signal the task has started) and callback (to return the result).

Step 4: Provide Pool Statistics

You can expose custom information to the celery inspect stats command by overriding the _get_info method.

    def _get_info(self):
"""Return configuration and statistics information."""
# Start with the base info (implementation name and limit)
info = super()._get_info()
info.update({
'pool_type': 'Custom Logging Pool',
'is_thread_safe': self.signal_safe,
})
return info

The BasePool.info property calls this method to return a JSON-friendly dictionary of the pool's state.

Step 5: Register and Run the Custom Pool

To use your custom pool, you must tell Celery where to find it using the CELERY_CUSTOM_WORKER_POOL environment variable and the --pool custom flag.

  1. Set the environment variable: The format is module:class.

    export CELERY_CUSTOM_WORKER_POOL='my_pool:LoggingPool'
  2. Start the worker:

    celery -A tasks worker --pool custom --loglevel=info

When the worker starts, you should see your log messages:

[INFO/MainProcess] LoggingPool initialized with limit: 10
[INFO/MainProcess] LoggingPool is starting up...

Complete Working Result

Your my_pool.py should now look like this:

import logging
from celery.concurrency.base import BasePool, apply_target

logger = logging.getLogger(__name__)

class LoggingPool(BasePool):
"""A custom pool that logs task execution."""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
logger.info(f"LoggingPool initialized with limit: {self.limit}")

def on_start(self):
logger.info("LoggingPool is starting up...")

def on_stop(self):
logger.info("LoggingPool is shutting down...")

def on_apply(self, target, args=None, kwargs=None, callback=None,
accept_callback=None, **options):
logger.info(f"Executing task: {target.__name__}")
return apply_target(
target, args, kwargs,
callback=callback,
accept_callback=accept_callback,
**options
)

def _get_info(self):
info = super()._get_info()
info.update({
'pool_type': 'Custom Logging Pool',
})
return info

Next Steps

  • Handle Timeouts: Override on_soft_timeout and on_hard_timeout to implement custom behavior when tasks exceed their time limits.
  • Job Termination: Implement terminate_job(pid, signal=None) if your pool supports revoking active tasks.
  • Async Execution: If you want to build a non-blocking pool (like celery.concurrency.eventlet.TaskPool), set is_green = True and use an asynchronous mechanism to call apply_target within on_apply.