Skip to main content

Dynamic Task Arguments with Lazy Functions

In Celery Beat, task arguments defined in the beat_schedule are typically static. However, certain tasks require arguments that must be calculated at the exact moment the task is dispatched, such as a timestamp or a dynamic configuration value. The BeatLazyFunc class in celery.beat provides a mechanism to defer the evaluation of these arguments until the task is sent to the broker.

The BeatLazyFunc Class

The BeatLazyFunc class acts as a wrapper for a callable and its associated arguments. It stores these parameters and only executes the underlying function when explicitly called or when its delay() method is invoked.

class BeatLazyFunc:
"""A lazy function declared in 'beat_schedule' and called before sending to worker."""

def __init__(self, func, *args, **kwargs):
self._func = func
self._func_params = {
"args": args,
"kwargs": kwargs
}

def __call__(self):
return self.delay()

def delay(self):
return self._func(*self._func_params["args"], **self._func_params["kwargs"])

When the Celery Beat scheduler processes an entry, it checks for instances of BeatLazyFunc within the task's args or kwargs. If found, it executes the wrapped function and passes the result to the task instead of the BeatLazyFunc object itself.

Using Lazy Arguments in Schedules

To use dynamic arguments, wrap the function you want to evaluate at dispatch time with BeatLazyFunc inside your beat_schedule configuration.

Example: Dynamic Timestamps

A common use case is passing the current time to a task. If you were to use datetime.now() directly in the schedule, it would be evaluated once when the Beat service starts. By using BeatLazyFunc, it is evaluated every time the task is triggered.

from celery.beat import BeatLazyFunc
import datetime

app.conf.beat_schedule = {
'calculate-daily-report': {
'task': 'reports.generate',
'schedule': 3600.0, # Every hour
'kwargs': {
"dispatched_at": BeatLazyFunc(datetime.datetime.now)
}
}
}

Note: The docstring for BeatLazyFunc in celery/beat.py contains a typo referencing a non-existent BeatCallBack class. The correct class to use is BeatLazyFunc.

Internal Evaluation Mechanism

The resolution of lazy arguments happens within the celery.beat.Scheduler class during the task dispatch phase. When a task is due, the scheduler calls apply_async, which utilizes two internal helper functions: _evaluate_entry_args and _evaluate_entry_kwargs.

Evaluation Helpers

These functions iterate through the arguments and keyword arguments, checking for BeatLazyFunc instances:

def _evaluate_entry_args(entry_args):
if not entry_args:
return []
return [
v() if isinstance(v, BeatLazyFunc) else v
for v in entry_args
]

def _evaluate_entry_kwargs(entry_kwargs):
if not entry_kwargs:
return {}
return {
k: v() if isinstance(v, BeatLazyFunc) else v
for k, v in entry_kwargs.items()
}

Dispatch Flow in Scheduler

In celery/beat.py, the Scheduler.apply_async method ensures these evaluations occur just before the task is sent to the broker via task.apply_async or self.send_task:

def apply_async(self, entry, producer=None, advance=True, **kwargs):
# ... (timestamp updates)
entry = self.reserve(entry) if advance else entry
task = self.app.tasks.get(entry.task)

try:
# Lazy arguments are resolved here
entry_args = _evaluate_entry_args(entry.args)
entry_kwargs = _evaluate_entry_kwargs(entry.kwargs)

if task:
return task.apply_async(entry_args, entry_kwargs,
producer=producer,
**entry.options)
else:
return self.send_task(entry.task, entry_args, entry_kwargs,
producer=producer,
**entry.options)
except Exception as exc:
# ... (error handling)

Execution Context

It is important to note that the function wrapped by BeatLazyFunc is executed within the Beat process, not the worker process. This has several implications:

  1. Environment: The function must be able to run in the environment where Celery Beat is hosted.
  2. Performance: Since the Beat process is single-threaded and relies on precise timing to dispatch tasks, the lazy function should be efficient and non-blocking.
  3. Dependencies: Any modules or resources required by the lazy function must be available to the Beat service.

For example, if you use a lazy function to fetch a configuration value from a database, ensure the Beat process has the necessary database drivers and connectivity.