Skip to main content

Real-time Monitoring with Curses

Real-time monitoring is essential for managing distributed task queues. This tutorial guides you through using the built-in terminal-based monitor provided by the celery.events.cursesmon module. You will learn how to launch the interactive dashboard, navigate the task history, and perform remote control actions like revoking tasks directly from your terminal.

Prerequisites

Before starting, ensure you have the following:

  1. Python Curses Support: The _curses module must be available. On Windows, you may need to install windows-curses.
  2. Active Broker: A running Celery broker (e.g., Redis or RabbitMQ).
  3. Events Enabled: Workers must be started with the -E flag to send events, or you must enable them globally in your configuration:
    worker_send_task_events = True

Step 1: Launching the Monitor

The easiest way to start the monitor is through the Celery command-line interface. This invokes the evtop function located in celery/events/cursesmon.py.

Run the following command in your terminal:

celery events

Internally, this command executes the following logic to initialize the UI and start the event capture loop:

from celery import Celery
from celery.events.cursesmon import evtop

app = Celery('my_project')

# This starts the curses screen, a background refresh thread,
# and the event consumer loop.
evtop(app=app)

When the monitor starts, you will see a header with the Celery version (defined in CursesMonitor.greet) and a table with columns for UUID, TASK, WORKER, TIME, and STATE.

Step 2: Navigating the Task List

The CursesMonitor maintains an internal State object that tracks all tasks received from the broker. You can navigate this list using keyboard shortcuts defined in the keymap.

  1. Move Selection: Use j (or the Down arrow) to move the selection bar down and k (or the Up arrow) to move it up.
  2. Scroll: The monitor automatically limits the display to the height of your terminal window. As new tasks arrive, they appear at the top.

The navigation is handled by the handle_keypress method, which maps terminal keys to internal methods:

# From celery/events.cursesmon.CursesMonitor
keyalias = {
curses.KEY_DOWN: 'J',
curses.KEY_UP: 'K',
curses.KEY_ENTER: 'I'
}

def move_selection(self, direction=1):
if not self.tasks:
return
pos = self.find_position()
try:
self.selected_task = self.tasks[pos + direction][0]
except IndexError:
self.selected_task = self.tasks[0][0]

Step 3: Inspecting Task Details

Once you have selected a task (highlighted in the UI), you can inspect its metadata, including arguments and keyword arguments.

  1. View Info: Press i to open the detail view for the selected task.
  2. View Traceback: If a task has failed (state FAILURE), press t to view the full Python traceback.
  3. View Result: Press r to see the return value of a completed task.

The selection_info method retrieves data from the State object and formats it for the curses window:

def selection_info(self):
if not self.selected_task:
return
# ... logic to fetch task from self.state.tasks and display args/kwargs ...

Step 4: Performing Remote Control Actions

The monitor is not just a passive viewer; it can send control commands back to the workers using the app.control interface.

  1. Revoke a Task: Select a pending or running task and press c. This calls self.app.control.revoke(self.selected_task).
  2. Rate Limiting: Press l to set a new rate limit for a task type. You will be prompted to enter the limit (e.g., "10/m").

When you perform these actions, the monitor waits up to 1 second for replies from workers and displays the results in an alert box via alert_remote_control_reply.

How it Works Internally

The monitor relies on two main components working in parallel:

  1. The Event Loop: The capture_events function runs in the main thread. It connects to the broker and updates the State object every time a new event (like task-succeeded or worker-heartbeat) arrives.
  2. The Display Thread: The DisplayThread class runs in the background, calling CursesMonitor.draw() at a fixed interval (defined by screen_delay).
class DisplayThread(threading.Thread):
def run(self):
while not self.shutdown:
self.display.draw()
self.display.nap() # Default 10ms delay

The draw method uses a threading.RLock to safely read the task state while the main thread is updating it, ensuring the UI remains consistent even under high event volume.

Next Steps

  • Custom Keybindings: You can extend CursesMonitor by passing a custom keymap dictionary to its constructor if you are embedding the monitor in your own tool.
  • Filtering: Explore the celery events --camera option if you need to take snapshots of the state and store them in a database instead of viewing them interactively.