Skip to main content

ManagerMixin

Mixin that adds :class:Manager capabilities.

Attributes

AttributeTypeDescription
stdoutTextIO = sys.stdoutThe output stream used for printing remarks and status updates, defaulting to sys.stdout.
stderrTextIO = sys.stderrThe error stream used for reporting issues, defaulting to sys.stderr.
connerrorstupleA tuple of recoverable connection exception types used to catch and handle network failures during operations.
block_timeoutfloat = 1800.0The maximum time in seconds to wait for blocking operations to complete.
no_joinbool = FalseA flag that, when set to True, prevents the manager from waiting for task results in the join method.

Constructor

Signature

def ManagerMixin()

Methods


remark()

@classmethod
def remark(
s: string,
sep: string = '-'
) - > null

Prints a formatted message to the configured stdout stream.

Parameters

NameTypeDescription
sstringThe message string to be printed
sepstring = '-'The separator character to prefix the message with

Returns

TypeDescription
nullNothing

missing_results()

@classmethod
def missing_results(
r: Sequence[[AsyncResult](../../../result/asyncresult.md?sid=celery_result_asyncresult)]
) - > array

Identifies which task results are not yet present in the backend cache.

Parameters

NameTypeDescription
rSequence[[AsyncResult](../../../result/asyncresult.md?sid=celery_result_asyncresult)]A sequence of Celery AsyncResult objects to check

Returns

TypeDescription
arrayA list of task IDs that are missing from the result backend cache

wait_for()

@classmethod
def wait_for(
fun: Callable,
catch: Sequence[Any],
desc: string = 'thing',
args: Tuple = (),
kwargs: Dict = null,
errback: Callable = null,
max_retries: int = 10,
interval_start: float = 0.1,
interval_step: float = 0.5,
interval_max: float = 5.0,
emit_warning: boolean = false
) - > any

Wait for event to happen. The catch argument specifies the exception that means the event has not happened yet.

Parameters

NameTypeDescription
funCallableThe function to execute repeatedly until it succeeds or times out
catchSequence[Any]Exception types that indicate the event is still pending and should trigger a retry
descstring = 'thing'A description of the event used in warning messages
argsTuple = ()Positional arguments to pass to the function
kwargsDict = nullKeyword arguments to pass to the function
errbackCallable = nullOptional callback executed on each retry attempt
max_retriesint = 10Maximum number of retry attempts before giving up
interval_startfloat = 0.1Initial delay between retries in seconds
interval_stepfloat = 0.5Amount to increase the delay by on each retry
interval_maxfloat = 5.0Maximum delay between retries in seconds
emit_warningboolean = falseWhether to print a warning to stderr while waiting

Returns

TypeDescription
anyThe result of the successful function call

ensure_not_for_a_while()

@classmethod
def ensure_not_for_a_while(
fun: Callable,
catch: Sequence[Any],
desc: string = 'thing'
) - > null

Make sure something does not happen (at least for a while).

Parameters

NameTypeDescription
funCallableThe function that should not succeed
catchSequence[Any]The exception expected to be raised if the event has not happened
descstring = 'thing'Description of the event for the error message if it unexpectedly occurs

Returns

TypeDescription
nullNothing

retry_over_time()

@classmethod
def retry_over_time(
args: any,
kwargs: any
) - > any

Retries a function call over a period of time with exponential backoff.

Parameters

NameTypeDescription
argsanyPositional arguments passed to the underlying retry utility
kwargsanyKeyword arguments passed to the underlying retry utility

Returns

TypeDescription
anyThe result of the function call once it succeeds

join()

@classmethod
def join(
r: any,
propagate: boolean = false,
max_retries: int = 10
) - > any

Waits for task results to be available, handling timeouts and connection errors during the process.

Parameters

NameTypeDescription
ranyThe result or result set to wait for
propagateboolean = falseWhether to propagate exceptions raised by the tasks
max_retriesint = 10Maximum number of attempts to retrieve results before failing

Returns

TypeDescription
anyThe task results retrieved from the backend

inspect()

@classmethod
def inspect(
timeout: float = 3.0
) - > object

Creates an inspect object to query the state of workers.

Parameters

NameTypeDescription
timeoutfloat = 3.0The time in seconds to wait for workers to respond to the inspect command

Returns

TypeDescription
objectA Celery inspect instance

query_tasks()

@classmethod
def query_tasks(
ids: Sequence[str],
timeout: float = 0.5
) - > Generator

Queries workers for information regarding specific task IDs.

Parameters

NameTypeDescription
idsSequence[str]A list of task IDs to query
timeoutfloat = 0.5The time in seconds to wait for the query response

Returns

TypeDescription
GeneratorA generator yielding tuples of (hostname, reply) for the queried tasks

query_task_states()

@classmethod
def query_task_states(
ids: Sequence[str],
timeout: float = 0.5
) - > dict

Aggregates the current states of specific tasks across all workers.

Parameters

NameTypeDescription
idsSequence[str]A list of task IDs to check
timeoutfloat = 0.5The time in seconds to wait for worker responses

Returns

TypeDescription
dictA dictionary mapping state names to sets of task IDs in that state

assert_accepted()

@classmethod
def assert_accepted(
ids: Sequence[str],
interval: float = 0.5,
desc: string = 'waiting for tasks to be accepted'
) - > any

Blocks until the specified tasks have been accepted by workers.

Parameters

NameTypeDescription
idsSequence[str]The task IDs expected to be accepted
intervalfloat = 0.5The polling interval for checking task states
descstring = 'waiting for tasks to be accepted'Description used for the wait operation

Returns

TypeDescription
anyThe result of the wait operation

assert_received()

@classmethod
def assert_received(
ids: Sequence[str],
interval: float = 0.5,
desc: string = 'waiting for tasks to be received'
) - > any

Blocks until the specified tasks have been received by workers.

Parameters

NameTypeDescription
idsSequence[str]The task IDs expected to be received
intervalfloat = 0.5The polling interval for checking task states
descstring = 'waiting for tasks to be received'Description used for the wait operation

Returns

TypeDescription
anyThe result of the wait operation

assert_result_tasks_in_progress_or_completed()

@classmethod
def assert_result_tasks_in_progress_or_completed(
async_results: Sequence[[AsyncResult](../../../result/asyncresult.md?sid=celery_result_asyncresult)],
interval: float = 0.5,
desc: string = 'waiting for tasks to be started or completed'
) - > any

Blocks until the tasks associated with the given results are either in progress or finished.

Parameters

NameTypeDescription
async_resultsSequence[[AsyncResult](../../../result/asyncresult.md?sid=celery_result_asyncresult)]The result objects to monitor for state changes
intervalfloat = 0.5The polling interval for checking result states
descstring = 'waiting for tasks to be started or completed'Description used for the wait operation

Returns

TypeDescription
anyThe result of the wait operation

assert_task_state_from_result()

@classmethod
def assert_task_state_from_result(
fun: Callable,
results: Sequence[[AsyncResult](../../../result/asyncresult.md?sid=celery_result_asyncresult)],
interval: float = 0.5
) - > any

Generic assertion that waits for a condition to be true based on task result objects.

Parameters

NameTypeDescription
funCallableThe predicate function to evaluate against the results
resultsSequence[[AsyncResult](../../../result/asyncresult.md?sid=celery_result_asyncresult)]The task results to check
intervalfloat = 0.5The polling interval for the wait loop

Returns

TypeDescription
anyThe result of the wait operation

is_result_task_in_progress()

@classmethod
def is_result_task_in_progress(
results: Sequence[[AsyncResult](../../../result/asyncresult.md?sid=celery_result_asyncresult)]
) - > boolean

Checks if all provided results are in a STARTED or SUCCESS state.

Parameters

NameTypeDescription
resultsSequence[[AsyncResult](../../../result/asyncresult.md?sid=celery_result_asyncresult)]The result objects to inspect

Returns

TypeDescription
booleanTrue if all tasks are started or completed, False otherwise

assert_task_worker_state()

@classmethod
def assert_task_worker_state(
fun: Callable,
ids: Sequence[str],
interval: float = 0.5
) - > any

Generic assertion that waits for a condition to be true based on worker task IDs.

Parameters

NameTypeDescription
funCallableThe predicate function to evaluate against the task IDs
idsSequence[str]The task IDs to check
intervalfloat = 0.5The polling interval for the wait loop

Returns

TypeDescription
anyThe result of the wait operation

is_received()

@classmethod
def is_received(
ids: Sequence[str]
) - > boolean

Checks if the specified tasks have been received by a worker (reserved, active, or ready).

Parameters

NameTypeDescription
idsSequence[str]The task IDs to check

Returns

TypeDescription
booleanTrue if all tasks are in a received state

is_accepted()

@classmethod
def is_accepted(
ids: Sequence[str]
) - > boolean

Checks if the specified tasks have been accepted and are active or ready.

Parameters

NameTypeDescription
idsSequence[str]The task IDs to check

Returns

TypeDescription
booleanTrue if all tasks are in an accepted state

true_or_raise()

@classmethod
def true_or_raise(
fun: Callable,
args: any,
kwargs: any
) - > any

Executes a function and raises a Sentinel exception if the result is falsy.

Parameters

NameTypeDescription
funCallableThe function to evaluate
argsanyPositional arguments for the function
kwargsanyKeyword arguments for the function

Returns

TypeDescription
anyThe result of the function if it is truthy

wait_until_idle()

@classmethod
def wait_until_idle() - > null

Purges all queues and blocks until there are no active tasks reported by workers.

Returns

TypeDescription
nullNothing