Skip to main content

Filterer

This class manages the consumption and filtering of messages from specified message broker queues. It provides functionality to process messages through custom callbacks, track state, and automatically acknowledge messages based on configurable limits and task filters. The class coordinates the underlying consumer lifecycle and event loop to facilitate message migration or transformation tasks.

Attributes

AttributeTypeDescription
app[Celery](../../app/base/celery.md?sid=celery_app_base_celery)The Celery application instance used to access configuration and AMQP components.
connkombu.ConnectionThe broker connection object used for consuming and managing messages.
filtercallableA callback function applied to every message received by the consumer.
limitint = NoneMaximum number of messages to process before stopping the filtering operation.
timeoutfloat = 1.0The maximum time in seconds to wait for new messages during the event loop.
ack_messagesboolean = falseFlag indicating whether processed messages should be acknowledged to the broker.
taskssetA set of task names used to restrict filtering and callbacks to specific message types.
queuesdictA mapping or collection of queue definitions to be managed during the filtering process.
callbackcallable = NoneAn optional user-defined function executed after a message is successfully filtered.
foreverboolean = falseIf true, the event loop will ignore socket timeouts and continue running indefinitely.
on_declare_queuecallable = NoneA callback triggered whenever a queue is declared on the broker.
consume_fromlistA list of queue objects from which the consumer will fetch messages.
state[State](../../events/state/state.md?sid=celery_events_state_state)An object tracking the progress and statistics of the filtering operation.
acceptlist = NoneA list of content types or serialization formats to allow for incoming messages.

Constructor

Signature

def Filterer(
app: Any,
conn: Any,
filter: Callable,
limit: int = None,
timeout: float = 1.0,
ack_messages: bool = False,
tasks: Union[str, list] = None,
queues: Any = None,
callback: Callable = None,
forever: bool = False,
on_declare_queue: Callable = None,
consume_from: list = None,
state: Any = None,
accept: Any = None
) - > null

Parameters

NameTypeDescription
appAnyThe application instance.
connAnyThe connection object to the broker.
filterCallableThe filter function to apply to messages.
limitint = NoneMaximum number of messages to process.
timeoutfloat = 1.0Timeout for the event loop.
ack_messagesbool = FalseWhether to acknowledge messages after processing.
tasksUnion[str, list] = NoneA list or string of task names to filter.
queuesAny = NoneThe queues to be prepared for filtering.
callbackCallable = NoneOptional callback function to execute on messages.
foreverbool = FalseWhether to ignore timeouts and run indefinitely.
on_declare_queueCallable = NoneCallback triggered when a queue is declared.
consume_fromlist = NoneSpecific queues to consume from.
stateAny = NoneInitial state object for tracking progress.
acceptAny = NoneContent types to accept.

Methods


start()

@classmethod
def start() - > [State](../../events/state/state.md?sid=celery_events_state_state)

start migrating messages.

Returns

TypeDescription
[State](../../events/state/state.md?sid=celery_events_state_state)The final state object containing message counts and processing statistics after filtering completes

update_state()

@classmethod
def update_state(
body: Any,
message: Message
)

Increments the internal message counter and raises StopFiltering if the processing limit has been reached.

Parameters

NameTypeDescription
bodyAnyThe decoded content of the message being processed
messageMessageThe raw message object received from the broker

ack_message()

@classmethod
def ack_message(
body: Any,
message: Message
)

Sends an acknowledgment to the broker for the specific message to mark it as successfully processed.

Parameters

NameTypeDescription
bodyAnyThe decoded content of the message being acknowledged
messageMessageThe message instance to be acknowledged

create_consumer()

@classmethod
def create_consumer() - > TaskConsumer

Initializes a new TaskConsumer instance configured with the specified connection, queues, and content types.

Returns

TypeDescription
TaskConsumerA configured AMQP consumer ready to be bound to callbacks

prepare_consumer()

@classmethod
def prepare_consumer(
consumer: TaskConsumer
) - > TaskConsumer

Configures the consumer by registering filtering, state tracking, and acknowledgment callbacks based on the instance settings.

Parameters

NameTypeDescription
consumerTaskConsumerThe consumer instance to be configured with callbacks and queue declarations

Returns

TypeDescription
TaskConsumerThe consumer instance with all necessary callbacks and queue declarations applied

declare_queues()

@classmethod
def declare_queues(
consumer: TaskConsumer
)

declare all queues on the new broker.

Parameters

NameTypeDescription
consumerTaskConsumerThe consumer whose associated queues will be declared on the broker