Dynamic Task Arguments with Lazy Functions
In Celery Beat, task arguments defined in the beat_schedule are typically static. However, certain tasks require arguments that must be calculated at the exact moment the task is dispatched, such as a timestamp or a dynamic configuration value. The BeatLazyFunc class in celery.beat provides a mechanism to defer the evaluation of these arguments until the task is sent to the broker.
The BeatLazyFunc Class
The BeatLazyFunc class acts as a wrapper for a callable and its associated arguments. It stores these parameters and only executes the underlying function when explicitly called or when its delay() method is invoked.
class BeatLazyFunc:
"""A lazy function declared in 'beat_schedule' and called before sending to worker."""
def __init__(self, func, *args, **kwargs):
self._func = func
self._func_params = {
"args": args,
"kwargs": kwargs
}
def __call__(self):
return self.delay()
def delay(self):
return self._func(*self._func_params["args"], **self._func_params["kwargs"])
When the Celery Beat scheduler processes an entry, it checks for instances of BeatLazyFunc within the task's args or kwargs. If found, it executes the wrapped function and passes the result to the task instead of the BeatLazyFunc object itself.
Using Lazy Arguments in Schedules
To use dynamic arguments, wrap the function you want to evaluate at dispatch time with BeatLazyFunc inside your beat_schedule configuration.
Example: Dynamic Timestamps
A common use case is passing the current time to a task. If you were to use datetime.now() directly in the schedule, it would be evaluated once when the Beat service starts. By using BeatLazyFunc, it is evaluated every time the task is triggered.
from celery.beat import BeatLazyFunc
import datetime
app.conf.beat_schedule = {
'calculate-daily-report': {
'task': 'reports.generate',
'schedule': 3600.0, # Every hour
'kwargs': {
"dispatched_at": BeatLazyFunc(datetime.datetime.now)
}
}
}
Note: The docstring for
BeatLazyFuncincelery/beat.pycontains a typo referencing a non-existentBeatCallBackclass. The correct class to use isBeatLazyFunc.
Internal Evaluation Mechanism
The resolution of lazy arguments happens within the celery.beat.Scheduler class during the task dispatch phase. When a task is due, the scheduler calls apply_async, which utilizes two internal helper functions: _evaluate_entry_args and _evaluate_entry_kwargs.
Evaluation Helpers
These functions iterate through the arguments and keyword arguments, checking for BeatLazyFunc instances:
def _evaluate_entry_args(entry_args):
if not entry_args:
return []
return [
v() if isinstance(v, BeatLazyFunc) else v
for v in entry_args
]
def _evaluate_entry_kwargs(entry_kwargs):
if not entry_kwargs:
return {}
return {
k: v() if isinstance(v, BeatLazyFunc) else v
for k, v in entry_kwargs.items()
}
Dispatch Flow in Scheduler
In celery/beat.py, the Scheduler.apply_async method ensures these evaluations occur just before the task is sent to the broker via task.apply_async or self.send_task:
def apply_async(self, entry, producer=None, advance=True, **kwargs):
# ... (timestamp updates)
entry = self.reserve(entry) if advance else entry
task = self.app.tasks.get(entry.task)
try:
# Lazy arguments are resolved here
entry_args = _evaluate_entry_args(entry.args)
entry_kwargs = _evaluate_entry_kwargs(entry.kwargs)
if task:
return task.apply_async(entry_args, entry_kwargs,
producer=producer,
**entry.options)
else:
return self.send_task(entry.task, entry_args, entry_kwargs,
producer=producer,
**entry.options)
except Exception as exc:
# ... (error handling)
Execution Context
It is important to note that the function wrapped by BeatLazyFunc is executed within the Beat process, not the worker process. This has several implications:
- Environment: The function must be able to run in the environment where Celery Beat is hosted.
- Performance: Since the Beat process is single-threaded and relies on precise timing to dispatch tasks, the lazy function should be efficient and non-blocking.
- Dependencies: Any modules or resources required by the lazy function must be available to the Beat service.
For example, if you use a lazy function to fetch a configuration value from a database, ensure the Beat process has the necessary database drivers and connectivity.