Mixin that adds :class:Manager capabilities.
Attributes
| Attribute | Type | Description |
|---|
| stdout | TextIO = sys.stdout | The output stream used for printing remarks and status updates, defaulting to sys.stdout. |
| stderr | TextIO = sys.stderr | The error stream used for reporting issues, defaulting to sys.stderr. |
| connerrors | tuple | A tuple of recoverable connection exception types used to catch and handle network failures during operations. |
| block_timeout | float = 1800.0 | The maximum time in seconds to wait for blocking operations to complete. |
| no_join | bool = False | A flag that, when set to True, prevents the manager from waiting for task results in the join method. |
Constructor
Signature
Methods
@classmethod
def remark(
s: string,
sep: string = '-'
) - > null
Prints a formatted message to the configured stdout stream.
Parameters
| Name | Type | Description |
|---|
| s | string | The message string to be printed |
| sep | string = '-' | The separator character to prefix the message with |
Returns
| Type | Description |
|---|
null | Nothing |
missing_results()
@classmethod
def missing_results(
r: Sequence[[AsyncResult](../../../result/asyncresult.md?sid=celery_result_asyncresult)]
) - > array
Identifies which task results are not yet present in the backend cache.
Parameters
| Name | Type | Description |
|---|
| r | Sequence[[AsyncResult](../../../result/asyncresult.md?sid=celery_result_asyncresult)] | A sequence of Celery AsyncResult objects to check |
Returns
| Type | Description |
|---|
array | A list of task IDs that are missing from the result backend cache |
wait_for()
@classmethod
def wait_for(
fun: Callable,
catch: Sequence[Any],
desc: string = 'thing',
args: Tuple = (),
kwargs: Dict = null,
errback: Callable = null,
max_retries: int = 10,
interval_start: float = 0.1,
interval_step: float = 0.5,
interval_max: float = 5.0,
emit_warning: boolean = false
) - > any
Wait for event to happen. The catch argument specifies the exception that means the event has not happened yet.
Parameters
| Name | Type | Description |
|---|
| fun | Callable | The function to execute repeatedly until it succeeds or times out |
| catch | Sequence[Any] | Exception types that indicate the event is still pending and should trigger a retry |
| desc | string = 'thing' | A description of the event used in warning messages |
| args | Tuple = () | Positional arguments to pass to the function |
| kwargs | Dict = null | Keyword arguments to pass to the function |
| errback | Callable = null | Optional callback executed on each retry attempt |
| max_retries | int = 10 | Maximum number of retry attempts before giving up |
| interval_start | float = 0.1 | Initial delay between retries in seconds |
| interval_step | float = 0.5 | Amount to increase the delay by on each retry |
| interval_max | float = 5.0 | Maximum delay between retries in seconds |
| emit_warning | boolean = false | Whether to print a warning to stderr while waiting |
Returns
| Type | Description |
|---|
any | The result of the successful function call |
ensure_not_for_a_while()
@classmethod
def ensure_not_for_a_while(
fun: Callable,
catch: Sequence[Any],
desc: string = 'thing'
) - > null
Make sure something does not happen (at least for a while).
Parameters
| Name | Type | Description |
|---|
| fun | Callable | The function that should not succeed |
| catch | Sequence[Any] | The exception expected to be raised if the event has not happened |
| desc | string = 'thing' | Description of the event for the error message if it unexpectedly occurs |
Returns
| Type | Description |
|---|
null | Nothing |
retry_over_time()
@classmethod
def retry_over_time(
args: any,
kwargs: any
) - > any
Retries a function call over a period of time with exponential backoff.
Parameters
| Name | Type | Description |
|---|
| args | any | Positional arguments passed to the underlying retry utility |
| kwargs | any | Keyword arguments passed to the underlying retry utility |
Returns
| Type | Description |
|---|
any | The result of the function call once it succeeds |
join()
@classmethod
def join(
r: any,
propagate: boolean = false,
max_retries: int = 10
) - > any
Waits for task results to be available, handling timeouts and connection errors during the process.
Parameters
| Name | Type | Description |
|---|
| r | any | The result or result set to wait for |
| propagate | boolean = false | Whether to propagate exceptions raised by the tasks |
| max_retries | int = 10 | Maximum number of attempts to retrieve results before failing |
Returns
| Type | Description |
|---|
any | The task results retrieved from the backend |
inspect()
@classmethod
def inspect(
timeout: float = 3.0
) - > object
Creates an inspect object to query the state of workers.
Parameters
| Name | Type | Description |
|---|
| timeout | float = 3.0 | The time in seconds to wait for workers to respond to the inspect command |
Returns
| Type | Description |
|---|
object | A Celery inspect instance |
query_tasks()
@classmethod
def query_tasks(
ids: Sequence[str],
timeout: float = 0.5
) - > Generator
Queries workers for information regarding specific task IDs.
Parameters
| Name | Type | Description |
|---|
| ids | Sequence[str] | A list of task IDs to query |
| timeout | float = 0.5 | The time in seconds to wait for the query response |
Returns
| Type | Description |
|---|
Generator | A generator yielding tuples of (hostname, reply) for the queried tasks |
query_task_states()
@classmethod
def query_task_states(
ids: Sequence[str],
timeout: float = 0.5
) - > dict
Aggregates the current states of specific tasks across all workers.
Parameters
| Name | Type | Description |
|---|
| ids | Sequence[str] | A list of task IDs to check |
| timeout | float = 0.5 | The time in seconds to wait for worker responses |
Returns
| Type | Description |
|---|
dict | A dictionary mapping state names to sets of task IDs in that state |
assert_accepted()
@classmethod
def assert_accepted(
ids: Sequence[str],
interval: float = 0.5,
desc: string = 'waiting for tasks to be accepted'
) - > any
Blocks until the specified tasks have been accepted by workers.
Parameters
| Name | Type | Description |
|---|
| ids | Sequence[str] | The task IDs expected to be accepted |
| interval | float = 0.5 | The polling interval for checking task states |
| desc | string = 'waiting for tasks to be accepted' | Description used for the wait operation |
Returns
| Type | Description |
|---|
any | The result of the wait operation |
assert_received()
@classmethod
def assert_received(
ids: Sequence[str],
interval: float = 0.5,
desc: string = 'waiting for tasks to be received'
) - > any
Blocks until the specified tasks have been received by workers.
Parameters
| Name | Type | Description |
|---|
| ids | Sequence[str] | The task IDs expected to be received |
| interval | float = 0.5 | The polling interval for checking task states |
| desc | string = 'waiting for tasks to be received' | Description used for the wait operation |
Returns
| Type | Description |
|---|
any | The result of the wait operation |
assert_result_tasks_in_progress_or_completed()
@classmethod
def assert_result_tasks_in_progress_or_completed(
async_results: Sequence[[AsyncResult](../../../result/asyncresult.md?sid=celery_result_asyncresult)],
interval: float = 0.5,
desc: string = 'waiting for tasks to be started or completed'
) - > any
Blocks until the tasks associated with the given results are either in progress or finished.
Parameters
| Name | Type | Description |
|---|
| async_results | Sequence[[AsyncResult](../../../result/asyncresult.md?sid=celery_result_asyncresult)] | The result objects to monitor for state changes |
| interval | float = 0.5 | The polling interval for checking result states |
| desc | string = 'waiting for tasks to be started or completed' | Description used for the wait operation |
Returns
| Type | Description |
|---|
any | The result of the wait operation |
assert_task_state_from_result()
@classmethod
def assert_task_state_from_result(
fun: Callable,
results: Sequence[[AsyncResult](../../../result/asyncresult.md?sid=celery_result_asyncresult)],
interval: float = 0.5
) - > any
Generic assertion that waits for a condition to be true based on task result objects.
Parameters
| Name | Type | Description |
|---|
| fun | Callable | The predicate function to evaluate against the results |
| results | Sequence[[AsyncResult](../../../result/asyncresult.md?sid=celery_result_asyncresult)] | The task results to check |
| interval | float = 0.5 | The polling interval for the wait loop |
Returns
| Type | Description |
|---|
any | The result of the wait operation |
is_result_task_in_progress()
@classmethod
def is_result_task_in_progress(
results: Sequence[[AsyncResult](../../../result/asyncresult.md?sid=celery_result_asyncresult)]
) - > boolean
Checks if all provided results are in a STARTED or SUCCESS state.
Parameters
| Name | Type | Description |
|---|
| results | Sequence[[AsyncResult](../../../result/asyncresult.md?sid=celery_result_asyncresult)] | The result objects to inspect |
Returns
| Type | Description |
|---|
boolean | True if all tasks are started or completed, False otherwise |
assert_task_worker_state()
@classmethod
def assert_task_worker_state(
fun: Callable,
ids: Sequence[str],
interval: float = 0.5
) - > any
Generic assertion that waits for a condition to be true based on worker task IDs.
Parameters
| Name | Type | Description |
|---|
| fun | Callable | The predicate function to evaluate against the task IDs |
| ids | Sequence[str] | The task IDs to check |
| interval | float = 0.5 | The polling interval for the wait loop |
Returns
| Type | Description |
|---|
any | The result of the wait operation |
is_received()
@classmethod
def is_received(
ids: Sequence[str]
) - > boolean
Checks if the specified tasks have been received by a worker (reserved, active, or ready).
Parameters
| Name | Type | Description |
|---|
| ids | Sequence[str] | The task IDs to check |
Returns
| Type | Description |
|---|
boolean | True if all tasks are in a received state |
is_accepted()
@classmethod
def is_accepted(
ids: Sequence[str]
) - > boolean
Checks if the specified tasks have been accepted and are active or ready.
Parameters
| Name | Type | Description |
|---|
| ids | Sequence[str] | The task IDs to check |
Returns
| Type | Description |
|---|
boolean | True if all tasks are in an accepted state |
true_or_raise()
@classmethod
def true_or_raise(
fun: Callable,
args: any,
kwargs: any
) - > any
Executes a function and raises a Sentinel exception if the result is falsy.
Parameters
| Name | Type | Description |
|---|
| fun | Callable | The function to evaluate |
| args | any | Positional arguments for the function |
| kwargs | any | Keyword arguments for the function |
Returns
| Type | Description |
|---|
any | The result of the function if it is truthy |
wait_until_idle()
@classmethod
def wait_until_idle() - > null
Purges all queues and blocks until there are no active tasks reported by workers.
Returns
| Type | Description |
|---|
null | Nothing |