Skip to main content

Map, Starmap, and Chunks

The celery.canvas module provides several primitives for applying tasks to collections of data. While they share similar interfaces, they differ significantly in how they distribute work across a cluster.

Sequential Mapping with xmap and xstarmap

The xmap and xstarmap classes are designed for sequential execution of a task over an iterable. Unlike a group, which distributes each item as a separate task message to be processed in parallel, these mapping primitives send the entire iterable to a single worker.

Core Implementation

Both classes inherit from _basemap, which handles the initialization and serialization of the task and its arguments. A key feature of _basemap is its use of regen(it) to ensure that generators are evaluated before being sent to the worker:

class _basemap(Signature):
_task_name = None
_unpack_args = itemgetter('task', 'it')

def __init__(self, task, it, **options):
super().__init__(self._task_name, (),
{'task': task, 'it': regen(it)}, immutable=True, **options
)

When apply_async is called, the primitive invokes a built-in Celery task (celery.map or celery.starmap). The worker receiving this task then iterates through the list locally:

  • xmap: Applies the task to each item in the iterable: [task(item) for item in it].
  • xstarmap: Unpacks each item in the iterable as arguments: [task(*item) for item in it].

Why use Sequential Mapping?

Sequential mapping is useful when the overhead of sending many small messages to the broker outweighs the benefits of parallel execution, or when you specifically need the operations to happen on the same worker (e.g., for local caching or resource locality).

Parallel Processing with Chunks

The chunks primitive is a performance-oriented tool that bridges the gap between sequential mapping and full parallelism. It partitions a large iterable into smaller "chunks" of size n.

How Chunks Work

When you create a chunks signature, it doesn't execute as a single task. Instead, it transforms itself into a group of xstarmap tasks. Each task in the group processes one chunk of the data.

The transformation happens in the group() method:

def group(self):
# need to evaluate generators
task, it, n = self._unpack_args(self.kwargs)
return group((xstarmap(task, part, app=self._app)
for part in _chunks(iter(it), n)),
app=self._app)

This approach provides two major benefits:

  1. Parallelism: Since it becomes a group, different chunks can be processed by different workers simultaneously.
  2. Efficiency: By grouping multiple items into a single xstarmap message, it significantly reduces the number of messages sent to the broker compared to a standard group.

Task Shortcuts

The most common way to interact with these primitives is through the shortcut methods provided by the Task class in celery/app/task.py. These methods allow you to create signatures directly from your task instances.

# In celery/app/task.py
def chunks(self, it, n):
from celery import chunks
return chunks(self.s(), it, n, app=self.app)

def map(self, it):
from celery import xmap
return xmap(self.s(), it, app=self.app)

def starmap(self, it):
from celery import xstarmap
return xstarmap(self.s(), it, app=self.app)

Practical Example

If you have a task add(x, y), you can apply it to a dataset using these shortcuts:

from proj.tasks import add

# Sequential: one message, one worker processes all 1000 pairs
s_map = add.starmap([(i, i) for i in range(1000)])
s_map.apply_async()

# Parallel: 10 messages, each containing 100 pairs,
# distributed across available workers
s_chunks = add.chunks([(i, i) for i in range(1000)], 100)
s_chunks.apply_async()

Summary of Differences

PrimitiveParallel?Execution LogicUse Case
xmapNoSingle worker iterates task(item)Small datasets, low overhead.
xstarmapNoSingle worker iterates task(*item)Small datasets with multiple arguments.
chunksYesMultiple workers process starmap chunksLarge datasets where message overhead is a concern.

All three primitives are defined as immutable=True by default in their __init__ methods, meaning they do not accept additional arguments from previous tasks in a chain.