The Control Command Registry
The Control Command Registry is the central mechanism through which Celery workers expose their internal state and management functions to the outside world. This system allows developers to interact with running workers via the celery inspect and celery control CLI commands, or programmatically through the app.control API.
At the heart of this system is the Panel class, located in celery/worker/control.py.
The Panel Registry
The Panel class acts as a global registry for all remote control commands. It is implemented as a UserDict but primarily operates through class-level attributes that store the command implementations and their associated metadata.
class Panel(UserDict):
"""Global registry of remote control commands."""
data = {} # global dict mapping command names to functions
meta = {} # global dict mapping command names to controller_info_t
The registry separates commands into two logical types:
- Control Commands: Actions that modify the worker's state or behavior (e.g.,
revoke,rate_limit,pool_grow). - Inspect Commands: Read-only operations that return information about the worker (e.g.,
stats,active,registered).
Registering Commands
Commands are registered using the @control_command and @inspect_command decorators. These decorators are wrappers around Panel.register, which populates the data and meta dictionaries.
Metadata and CLI Discovery
When registering a command, you can provide metadata that the Celery CLI uses for argument parsing and help generation. This metadata is stored in a controller_info_t namedtuple.
@inspect_command(
alias='dump_conf',
signature='[include_defaults=False]',
args=[('with_defaults', strtobool)],
)
def conf(state, with_defaults=False, **kwargs):
"""List configuration."""
return jsonify(state.app.conf.table(with_defaults=with_defaults),
keyfilter=_wanted_config_key,
unknown_type_filter=safe_repr)
In this example from celery/worker/control.py:
alias: Provides a backward-compatible name (dump_conf).signature: Describes the expected arguments for the CLI help text.args: Defines a list of tuples for argument conversion (e.g., converting a string to a boolean usingstrtobool).
The Handler Interface
Every command function registered in the Panel must follow a specific signature. The first argument is always state, which provides the command with context about the worker environment.
The State Argument
The state object is an AttributeDict passed by the Pidbox component. It typically contains:
app: The current Celery application instance.hostname: The worker's hostname.consumer: Thecelery.worker.consumer.Consumerinstance.tset: A utility for handling sets (varies depending on whether the event loop is used).
Return Values
Commands typically return a dictionary. The helper functions ok(value) and nok(value) are frequently used to standardize successful and failed responses:
def ok(value):
return {'ok': value}
def nok(value):
return {'error': value}
Integration with Pidbox
The Pidbox component in celery/worker/pidbox.py is responsible for the actual networking aspect of remote control. It initializes a kombu.pidbox.Node and passes Panel.data as the message handlers.
# From celery/worker/pidbox.py
class Pidbox:
def __init__(self, c):
self.c = c
self.hostname = c.hostname
self.node = c.app.control.mailbox.Node(
safe_str(c.hostname),
handlers=control.Panel.data,
state=AttributeDict(
app=c.app,
hostname=c.hostname,
consumer=c,
tset=pass1 if c.controller.use_eventloop else set),
)
When a message arrives via the broadcast queue (the "pidbox"), the Node looks up the command name in Panel.data and executes the corresponding function, passing the prepared state and any arguments extracted from the message body.
Design Tradeoffs and Constraints
Global Registry
The Panel uses class-level dictionaries (data and meta), making the registry global to the Python process. While this simplifies registration via decorators, it means that if multiple Celery workers were to run in the same process (an uncommon configuration), they would share the same set of available commands.
Synchronous Execution
Control commands are generally executed within the worker's strategy or consumer context. If a command performs heavy I/O or blocks, it can impact the worker's ability to process tasks or respond to other control signals. This is why many commands, such as add_consumer, use state.consumer.call_soon to schedule the actual work on the event loop rather than executing it immediately.
@control_command(args=[('queue', str)])
def cancel_consumer(state, queue, **_):
"""Tell worker(s) to stop consuming from task queue by name."""
state.consumer.call_soon(
state.consumer.cancel_task_queue, queue,
)
return ok(f'no longer consuming from {queue}')
Argument Validation
While the meta dictionary provides hints for the CLI, the worker-side implementation must still be robust. Commands often use **kwargs in their signatures to remain compatible with older or newer versions of the protocol that might send unexpected parameters.