Customizing Signature Types
To create specialized task signatures that Celery can recognize and reconstruct during serialization, you must extend the base Signature class and register it with the internal type registry. This allows canvas primitives and factory functions like signature() to correctly instantiate your custom class.
Creating and Registering a Custom Signature
The following example demonstrates how to create a custom signature type that adds specific execution options by default.
from celery.canvas import Signature
@Signature.register_type(name='priority_sig')
class PrioritySignature(Signature):
def __init__(self, task=None, args=None, kwargs=None, **options):
# Ensure the base Signature (which is a dict) is initialized
super().__init__(task, args, kwargs, **options)
# CRITICAL: subtask_type must match the registered name for deserialization
self.subtask_type = 'priority_sig'
# Apply custom logic or defaults
self.options.setdefault('priority', 10)
@classmethod
def from_dict(cls, d, app=None):
"""Reconstruct the signature from a dictionary."""
# Signature.from_dict uses the 'subtask_type' key in the dict
# to find this class in the registry.
return cls(
task=d.get('task'),
args=d.get('args'),
kwargs=d.get('kwargs'),
app=app,
**d.get('options', {})
)
Key Implementation Requirements
When customizing signatures in this codebase, you must adhere to these requirements to ensure compatibility with the canvas system:
- The
@Signature.register_type()Decorator: This adds your class to theSignature.TYPESregistry. If you don't provide anameargument, it defaults to the class name. - The
subtask_typeAttribute: You must setself.subtask_typein your__init__method. When a signature is serialized to a dictionary, this field is used bySignature.from_dictto determine which subclass to instantiate upon reconstruction. - Dictionary Inheritance:
Signatureinherits fromdict. All core data (task name, args, kwargs, options) is stored as dictionary keys. If you add custom instance variables that are not stored in the underlying dictionary, they will be lost during serialization unless you specifically handle them infrom_dict.
Customizing Reconstruction Logic
For complex signatures like chain or group, you may need to override from_dict to handle nested signatures. This pattern is used internally in celery/canvas.py for the _chain class:
from celery.canvas import Signature, maybe_signature
@Signature.register_type(name='chain')
class _chain(Signature):
@classmethod
def from_dict(cls, d, app=None):
# Extract tasks from kwargs and ensure they are converted back to signatures
tasks = d['kwargs']['tasks']
if tasks:
if isinstance(tasks, tuple):
tasks = d['kwargs']['tasks'] = list(tasks)
# Use maybe_signature to reconstruct nested task dicts
tasks = [maybe_signature(task, app=app) for task in tasks]
return cls(tasks, app=app, **d['options'])
def __init__(self, *tasks, **options):
# Initialize with the internal task name 'celery.chain'
super().__init__('celery.chain', (), {'tasks': tasks}, **options)
self.subtask_type = 'chain'
Subclassing Existing Primitives
You can also extend existing canvas primitives like group or chord to add specialized behavior. This is often used in tests to verify subclass behavior (see t/unit/tasks/test_canvas.py):
from celery.canvas import group, Signature
@Signature.register_type()
class group_subclass(group):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.subtask_type = "group_subclass"
def apply_async(self, args=None, kwargs=None, **options):
# Custom behavior before the group is dispatched
print(f"Dispatching group subclass with {len(self.tasks)} tasks")
return super().apply_async(args, kwargs, **options)
Troubleshooting
- Signature is reconstructed as base
Signature: Ensure thatself.subtask_typeis set in__init__and that it exactly matches the string passed to@Signature.register_type(name=...). If they don't match,Signature.from_dictwill fall back to the base class. - Custom attributes missing after serialization: Since
Signatureis adict, only data stored in the dictionary keys (or handled by a customfrom_dict) survives serialization. If you need extra metadata, store it insideself.optionsorself.kwargs. TypeErrorduringfrom_dict: Ensure yourfrom_dictimplementation matches the signature of your__init__. The baseSignature.from_dictpasses the entire dictionary to the constructor, while custom implementations often unpack specific keys.