Skip to main content

BasePool

Task pool.

Attributes

AttributeTypeDescription
RUNint = 1Integer constant representing the operational state where the pool is actively accepting and executing tasks.
CLOSEint = 2Integer constant representing the state where the pool is finishing current tasks but not accepting new ones.
TERMINATEint = 3Integer constant representing the state where the pool is being shut down immediately.
Timertype = timer2.TimerThe timer class used by the pool for scheduling internal events and timeouts.
signal_safeboolean = trueset to true if the pool can be shutdown from within a signal handler.
is_greenboolean = falseset to true if pool uses greenlets.
uses_semaphoreboolean = falseonly used by multiprocessing pool
task_join_will_blockboolean = trueFlag indicating whether joining the task queue will result in a blocking operation.
body_can_be_bufferboolean = falseFlag indicating whether the task body payload can be handled as a buffer object.

Constructor

Signature

def BasePool(
limit: int = None,
putlocks: bool = True,
forking_enable: bool = True,
callbacks_propagate: tuple = (),
app: Any = None,
options: dict = null
) - > null

Parameters

NameTypeDescription
limitint = NoneThe maximum number of processes or threads in the pool.
putlocksbool = TrueWhether to use locks when putting tasks into the pool.
forking_enablebool = TrueFlag to enable or disable forking support.
callbacks_propagatetuple = ()Exceptions that should be allowed to propagate from callbacks.
appAny = NoneThe application instance associated with this pool.
optionsdict = nullAdditional keyword arguments for pool configuration.

Signature

def BasePool(
limit: int = None,
putlocks: bool = True,
forking_enable: bool = True,
callbacks_propagate: tuple = (),
app: [Celery](../../app/base/celery.md?sid=celery_app_base_celery) = None
)

Parameters

NameTypeDescription
limitint = NoneThe maximum number of concurrent processes or threads allowed in the pool
putlocksbool = TrueEnables or disables locking mechanisms when placing tasks into the pool
forking_enablebool = TrueDetermines if the pool is allowed to use process forking for task execution
callbacks_propagatetuple = ()A collection of exception types that should be allowed to propagate from callbacks
app[Celery](../../app/base/celery.md?sid=celery_app_base_celery) = NoneThe Celery application instance associated with this pool

Methods


on_start()

@classmethod
def on_start()

Hook called when the pool is starting up to perform implementation-specific initialization.


did_start_ok()

@classmethod
def did_start_ok() - > bool

Checks if the pool successfully transitioned to a running state.

Returns

TypeDescription
boolTrue if the pool started without errors, False otherwise

flush()

@classmethod
def flush()

Flushes any buffered tasks or data within the pool.


on_stop()

@classmethod
def on_stop()

Hook called when the pool is stopping to perform cleanup of resources.


register_with_event_loop()

@classmethod
def register_with_event_loop(
loop: EventLoop
)

Registers pool-specific file descriptors or handles with an external event loop.

Parameters

NameTypeDescription
loopEventLoopThe event loop instance to register the pool with

on_apply()

@classmethod
def on_apply(
args: any,
kwargs: any
)

Internal handler for scheduling a task; must be implemented by subclasses to define execution logic.

Parameters

NameTypeDescription
argsanyPositional arguments for the task execution
kwargsanyKeyword arguments for the task execution

on_terminate()

@classmethod
def on_terminate()

Hook called during an ungraceful shutdown to handle immediate termination of pool components.


on_soft_timeout()

@classmethod
def on_soft_timeout(
job: Job
)

Handles a soft timeout event for a specific job, typically allowing for a graceful exit.

Parameters

NameTypeDescription
jobJobThe job instance that has exceeded its soft timeout limit

on_hard_timeout()

@classmethod
def on_hard_timeout(
job: Job
)

Handles a hard timeout event for a specific job, typically resulting in immediate process termination.

Parameters

NameTypeDescription
jobJobThe job instance that has exceeded its hard timeout limit

maintain_pool()

@classmethod
def maintain_pool(
args: any,
kwargs: any
)

Performs maintenance tasks such as replacing dead workers to keep the pool at the desired capacity.

Parameters

NameTypeDescription
argsanyMaintenance-specific positional arguments
kwargsanyMaintenance-specific keyword arguments

terminate_job()

@classmethod
def terminate_job(
pid: int,
signal: int = None
)

Forces the termination of a specific process within the pool; raises NotImplementedError if not overridden.

Parameters

NameTypeDescription
pidintThe process identifier of the job to be terminated
signalint = NoneThe signal number to send to the process for termination

restart()

@classmethod
def restart()

Restarts the pool; raises NotImplementedError if the specific pool implementation does not support restarting.


stop()

@classmethod
def stop()

Signals the pool to stop and sets the internal state to TERMINATE.


terminate()

@classmethod
def terminate()

Immediately terminates the pool and triggers the on_terminate hook.


start()

@classmethod
def start()

Starts the pool, enables debugging if configured, and sets the internal state to RUN.


close()

@classmethod
def close()

Closes the pool to new tasks and sets the internal state to CLOSE.


on_close()

@classmethod
def on_close()

Hook called when the pool is closed to perform implementation-specific shutdown logic.


apply_async()

@classmethod
def apply_async(
target: callable,
args: list = None,
kwargs: dict = None,
options: any
) - > any

Equivalent of the :func:apply built-in function. Callbacks should optimally return as soon as possible since otherwise the thread which handles the result will get blocked.

Parameters

NameTypeDescription
targetcallableThe function or task to be executed asynchronously
argslist = NonePositional arguments to pass to the target function
kwargsdict = NoneKeyword arguments to pass to the target function
optionsanyAdditional execution options passed to the underlying on_apply method

Returns

TypeDescription
anyThe result of the on_apply call, typically a result object or promise

info()

@classmethod
def info() - > dict

Property that retrieves the pool's configuration and statistical information.

Returns

TypeDescription
dictThe dictionary returned by _get_info

active()

@classmethod
def active() - > bool

Property that indicates whether the pool is currently in the RUN state.

Returns

TypeDescription
boolTrue if the pool is running, False otherwise

num_processes()

@classmethod
def num_processes() - > int

Property that returns the configured concurrency limit of the pool.

Returns

TypeDescription
intThe maximum number of processes or threads the pool is configured to use