Maintaining Cluster State
The cluster state in Celery is maintained by the State object, which acts as an in-memory mirror of the entire cluster. It aggregates individual events—sent by workers and clients—into a coherent representation of active workers and the lifecycle of tasks.
This system is primarily implemented in celery/events/state.py through three core classes: State, Worker, and Task.
The State Aggregator
The State class is the central coordinator. It maintains collections of Worker and Task objects and provides a high-performance dispatcher to process incoming event dictionaries.
Event Dispatching
The primary entry point for updating the cluster state is the State.event() method. Internally, this uses a highly optimized dispatcher created by _create_dispatcher().
from celery.events.state import State
state = State()
# Example event dictionary as received from the broker
event = {
'type': 'task-received',
'uuid': 'f47ac10b-58cc-4372-a567-0e02b2c3d479',
'hostname': 'worker1@example.com',
'timestamp': 1625097600.0,
'local_received': 1625097600.1,
'clock': 42,
'name': 'tasks.add',
'args': '(2, 2)',
}
# Update the state
state.event(event)
When state.event(event) is called, it:
- Increments
event_count. - Identifies the event group (e.g.,
taskorworker). - Retrieves or creates the corresponding
WorkerorTaskinstance. - Delegates the specific update to the instance's own
event()method.
Tracking Workers
The Worker class represents the state of an individual Celery worker. It tracks metadata such as the process ID (pid), software identity (sw_ident), and load average (loadavg).
Liveness and Heartbeats
A critical role of the Worker object is determining if a worker is still "alive." This is handled through heartbeat tracking.
- Heartbeat Storage: The
heartbeatsattribute is a list of timestamps of the most recent heartbeats received. - Expiry: The
aliveproperty determines status by checking if the latest heartbeat falls within theheartbeat_expireswindow.
# From celery/events/state.py
@property
def alive(self, nowfun=time):
return bool(self.heartbeats and nowfun() < self.heartbeat_expires)
The Worker._create_event_handler manages these heartbeats, ensuring the list doesn't grow indefinitely (capped by heartbeat_max, default 4) and handling clock drift warnings if the difference between the event timestamp and local receipt time exceeds HEARTBEAT_DRIFT_MAX.
Task Lifecycle and Consistency
The Task class tracks the state of individual task instances. Because events can arrive out of order or be duplicated, the Task class implements logical consistency rules.
Handling Out-of-Order Events
Celery uses a precedence system (defined in celery.states) to ensure that a "later" state (like SUCCESS) isn't overwritten by an "earlier" state (like RECEIVED) that arrived late.
In Task.event(), if an incoming event's state has a lower precedence than the current state, the Task object performs a partial merge instead of a full update:
# From celery/events/state.py
merge_rules = {
states.RECEIVED: (
'name', 'args', 'kwargs', 'parent_id',
'root_id', 'retries', 'eta', 'expires',
),
}
If a task-received event arrives after the task has already moved to STARTED, the Task object will only update the fields defined in merge_rules (like args and kwargs) while preserving the STARTED state.
Task Relationships
Tasks track their position in a hierarchy using root_id and parent_id. The State object automatically resolves these relationships:
- If a task is created and its parent is already known, it is added to the parent's
childrencollection. - If the parent is not yet known, the task is placed in
_tasks_to_resolveuntil the parent event arrives.
The children attribute is a WeakSet, ensuring that task objects can be garbage collected even if they are part of a large tree.
Querying and Indexing
The State object provides several ways to query the current cluster representation:
- Time-Ordered Access:
tasks_by_time()yields tasks ordered by their logical clock and timestamp using an internal_taskheap. - Worker/Type Indexes:
tasks_by_typeandtasks_by_workerprovide fast access to tasks grouped by their name or the worker that executed them. These useCallableDefaultdictandWeakSetto maintain memory efficiency.
# Get the 10 most recent tasks
for uuid, task in state.tasks_by_time(limit=10):
print(f"{uuid}: {task.state}")
# Access tasks by type
add_tasks = state.tasks_by_type['tasks.add']
Memory Management
To prevent unbounded memory growth in long-running monitors, State uses LRUCache (from celery.utils.functional) for both tasks and workers.
- Eviction: When
max_tasks_in_memory(default 10,000) is reached, the least recently used tasks are evicted. - Weak References: Because indexes like
tasks_by_typeand relationship sets liketask.childrenuseWeakSet, evicted tasks are automatically removed from these indexes, preventing memory leaks.
Thread Safety and Snapshots
The State object uses a threading lock (_mutex) to ensure that event processing and state queries do not conflict. For operations that need to capture a consistent view of the entire state (like taking a snapshot), the freeze_while method is used.
# From celery/events/snapshot.py
def capture(self):
# Executes self.shutter while holding the state lock
self.state.freeze_while(self.shutter, clear_after=self.clear_after)
This ensures that the state does not change while it is being serialized or processed for a snapshot.