Real-time Monitoring with Curses
Real-time monitoring is essential for managing distributed task queues. This tutorial guides you through using the built-in terminal-based monitor provided by the celery.events.cursesmon module. You will learn how to launch the interactive dashboard, navigate the task history, and perform remote control actions like revoking tasks directly from your terminal.
Prerequisites
Before starting, ensure you have the following:
- Python Curses Support: The
_cursesmodule must be available. On Windows, you may need to installwindows-curses. - Active Broker: A running Celery broker (e.g., Redis or RabbitMQ).
- Events Enabled: Workers must be started with the
-Eflag to send events, or you must enable them globally in your configuration:worker_send_task_events = True
Step 1: Launching the Monitor
The easiest way to start the monitor is through the Celery command-line interface. This invokes the evtop function located in celery/events/cursesmon.py.
Run the following command in your terminal:
celery events
Internally, this command executes the following logic to initialize the UI and start the event capture loop:
from celery import Celery
from celery.events.cursesmon import evtop
app = Celery('my_project')
# This starts the curses screen, a background refresh thread,
# and the event consumer loop.
evtop(app=app)
When the monitor starts, you will see a header with the Celery version (defined in CursesMonitor.greet) and a table with columns for UUID, TASK, WORKER, TIME, and STATE.
Step 2: Navigating the Task List
The CursesMonitor maintains an internal State object that tracks all tasks received from the broker. You can navigate this list using keyboard shortcuts defined in the keymap.
- Move Selection: Use
j(or the Down arrow) to move the selection bar down andk(or the Up arrow) to move it up. - Scroll: The monitor automatically limits the display to the height of your terminal window. As new tasks arrive, they appear at the top.
The navigation is handled by the handle_keypress method, which maps terminal keys to internal methods:
# From celery/events.cursesmon.CursesMonitor
keyalias = {
curses.KEY_DOWN: 'J',
curses.KEY_UP: 'K',
curses.KEY_ENTER: 'I'
}
def move_selection(self, direction=1):
if not self.tasks:
return
pos = self.find_position()
try:
self.selected_task = self.tasks[pos + direction][0]
except IndexError:
self.selected_task = self.tasks[0][0]
Step 3: Inspecting Task Details
Once you have selected a task (highlighted in the UI), you can inspect its metadata, including arguments and keyword arguments.
- View Info: Press
ito open the detail view for the selected task. - View Traceback: If a task has failed (state
FAILURE), presstto view the full Python traceback. - View Result: Press
rto see the return value of a completed task.
The selection_info method retrieves data from the State object and formats it for the curses window:
def selection_info(self):
if not self.selected_task:
return
# ... logic to fetch task from self.state.tasks and display args/kwargs ...
Step 4: Performing Remote Control Actions
The monitor is not just a passive viewer; it can send control commands back to the workers using the app.control interface.
- Revoke a Task: Select a pending or running task and press
c. This callsself.app.control.revoke(self.selected_task). - Rate Limiting: Press
lto set a new rate limit for a task type. You will be prompted to enter the limit (e.g., "10/m").
When you perform these actions, the monitor waits up to 1 second for replies from workers and displays the results in an alert box via alert_remote_control_reply.
How it Works Internally
The monitor relies on two main components working in parallel:
- The Event Loop: The
capture_eventsfunction runs in the main thread. It connects to the broker and updates theStateobject every time a new event (liketask-succeededorworker-heartbeat) arrives. - The Display Thread: The
DisplayThreadclass runs in the background, callingCursesMonitor.draw()at a fixed interval (defined byscreen_delay).
class DisplayThread(threading.Thread):
def run(self):
while not self.shutdown:
self.display.draw()
self.display.nap() # Default 10ms delay
The draw method uses a threading.RLock to safely read the task state while the main thread is updating it, ensuring the UI remains consistent even under high event volume.
Next Steps
- Custom Keybindings: You can extend
CursesMonitorby passing a customkeymapdictionary to its constructor if you are embedding the monitor in your own tool. - Filtering: Explore the
celery events --cameraoption if you need to take snapshots of the state and store them in a database instead of viewing them interactively.