Filterer
This class manages the consumption and filtering of messages from specified message broker queues. It provides functionality to process messages through custom callbacks, track state, and automatically acknowledge messages based on configurable limits and task filters. The class coordinates the underlying consumer lifecycle and event loop to facilitate message migration or transformation tasks.
Attributes
| Attribute | Type | Description |
|---|---|---|
| app | [Celery](../../app/base/celery.md?sid=celery_app_base_celery) | The Celery application instance used to access configuration and AMQP components. |
| conn | kombu.Connection | The broker connection object used for consuming and managing messages. |
| filter | callable | A callback function applied to every message received by the consumer. |
| limit | int = None | Maximum number of messages to process before stopping the filtering operation. |
| timeout | float = 1.0 | The maximum time in seconds to wait for new messages during the event loop. |
| ack_messages | boolean = false | Flag indicating whether processed messages should be acknowledged to the broker. |
| tasks | set | A set of task names used to restrict filtering and callbacks to specific message types. |
| queues | dict | A mapping or collection of queue definitions to be managed during the filtering process. |
| callback | callable = None | An optional user-defined function executed after a message is successfully filtered. |
| forever | boolean = false | If true, the event loop will ignore socket timeouts and continue running indefinitely. |
| on_declare_queue | callable = None | A callback triggered whenever a queue is declared on the broker. |
| consume_from | list | A list of queue objects from which the consumer will fetch messages. |
| state | [State](../../events/state/state.md?sid=celery_events_state_state) | An object tracking the progress and statistics of the filtering operation. |
| accept | list = None | A list of content types or serialization formats to allow for incoming messages. |
Constructor
Signature
def Filterer(
app: Any,
conn: Any,
filter: Callable,
limit: int = None,
timeout: float = 1.0,
ack_messages: bool = False,
tasks: Union[str, list] = None,
queues: Any = None,
callback: Callable = None,
forever: bool = False,
on_declare_queue: Callable = None,
consume_from: list = None,
state: Any = None,
accept: Any = None
) - > null
Parameters
| Name | Type | Description |
|---|---|---|
| app | Any | The application instance. |
| conn | Any | The connection object to the broker. |
| filter | Callable | The filter function to apply to messages. |
| limit | int = None | Maximum number of messages to process. |
| timeout | float = 1.0 | Timeout for the event loop. |
| ack_messages | bool = False | Whether to acknowledge messages after processing. |
| tasks | Union[str, list] = None | A list or string of task names to filter. |
| queues | Any = None | The queues to be prepared for filtering. |
| callback | Callable = None | Optional callback function to execute on messages. |
| forever | bool = False | Whether to ignore timeouts and run indefinitely. |
| on_declare_queue | Callable = None | Callback triggered when a queue is declared. |
| consume_from | list = None | Specific queues to consume from. |
| state | Any = None | Initial state object for tracking progress. |
| accept | Any = None | Content types to accept. |
Methods
start()
@classmethod
def start() - > [State](../../events/state/state.md?sid=celery_events_state_state)
start migrating messages.
Returns
| Type | Description |
|---|---|
[State](../../events/state/state.md?sid=celery_events_state_state) | The final state object containing message counts and processing statistics after filtering completes |
update_state()
@classmethod
def update_state(
body: Any,
message: Message
)
Increments the internal message counter and raises StopFiltering if the processing limit has been reached.
Parameters
| Name | Type | Description |
|---|---|---|
| body | Any | The decoded content of the message being processed |
| message | Message | The raw message object received from the broker |
ack_message()
@classmethod
def ack_message(
body: Any,
message: Message
)
Sends an acknowledgment to the broker for the specific message to mark it as successfully processed.
Parameters
| Name | Type | Description |
|---|---|---|
| body | Any | The decoded content of the message being acknowledged |
| message | Message | The message instance to be acknowledged |
create_consumer()
@classmethod
def create_consumer() - > TaskConsumer
Initializes a new TaskConsumer instance configured with the specified connection, queues, and content types.
Returns
| Type | Description |
|---|---|
TaskConsumer | A configured AMQP consumer ready to be bound to callbacks |
prepare_consumer()
@classmethod
def prepare_consumer(
consumer: TaskConsumer
) - > TaskConsumer
Configures the consumer by registering filtering, state tracking, and acknowledgment callbacks based on the instance settings.
Parameters
| Name | Type | Description |
|---|---|---|
| consumer | TaskConsumer | The consumer instance to be configured with callbacks and queue declarations |
Returns
| Type | Description |
|---|---|
TaskConsumer | The consumer instance with all necessary callbacks and queue declarations applied |
declare_queues()
@classmethod
def declare_queues(
consumer: TaskConsumer
)
declare all queues on the new broker.
Parameters
| Name | Type | Description |
|---|---|---|
| consumer | TaskConsumer | The consumer whose associated queues will be declared on the broker |