Skip to main content

EventReceiver

Capture events.

Attributes

AttributeTypeDescription
app[Celery](../../app/base/celery.md?sid=celery_app_base_celery) = nullThe Celery application instance used for configuration and clock synchronization.

Constructor

Signature

def EventReceiver(
channel: kombu.Connection,
handlers: Mapping[Callable] = null,
routing_key: string = '#',
node_id: string = null,
app: [Celery](../../app/base/celery.md?sid=celery_app_base_celery) = null,
queue_prefix: string = null,
accept: Set[string] = null,
queue_ttl: float = null,
queue_expires: float = null,
queue_exclusive: boolean = null,
queue_durable: boolean = null
) - > null

Parameters

NameTypeDescription
channelkombu.ConnectionThe channel or connection used to communicate with the broker.
handlersMapping[Callable] = nullA map of event type names to their respective handler functions.
routing_keystring = '#'The routing key used for the event queue.
node_idstring = nullUnique identifier for the node; defaults to a new UUID.
app[Celery](../../app/base/celery.md?sid=celery_app_base_celery) = nullThe Celery application instance.
queue_prefixstring = nullPrefix for the event queue name.
acceptSet[string] = nullA set of content types to accept.
queue_ttlfloat = nullTime-to-live for messages in the queue.
queue_expiresfloat = nullExpiration time for the queue itself.
queue_exclusiveboolean = nullWhether the queue is exclusive to this connection.
queue_durableboolean = nullWhether the queue is durable and survives broker restarts.

Methods


process()

@classmethod
def process(
type: string,
event: dict
)

Process event by dispatching to configured handler.

Parameters

NameTypeDescription
typestringThe event type name used to look up the appropriate handler function
eventdictThe event data payload to be passed to the handler

get_consumers()

@classmethod
def get_consumers(
Consumer: callable,
channel: kombu.Connection.channel
) - > list

Creates and returns a list of Kombu consumers configured to listen on the event queue.

Parameters

NameTypeDescription
ConsumercallableThe Consumer class or factory used to instantiate the consumer
channelkombu.Connection.channelThe AMQP channel to use for the consumer

Returns

TypeDescription
listA list containing a single Kombu Consumer instance configured for event processing

on_consume_ready()

@classmethod
def on_consume_ready(
connection: kombu.Connection,
channel: kombu.Connection.channel,
consumers: list,
wakeup: boolean
)

Triggers post-consumption setup, such as waking up workers when the consumer is ready.

Parameters

NameTypeDescription
connectionkombu.ConnectionThe active broker connection
channelkombu.Connection.channelThe active AMQP channel
consumerslistThe list of active consumer instances
wakeupbooleanFlag indicating whether to broadcast a heartbeat to wake up workers

itercapture()

@classmethod
def itercapture(
limit: int,
timeout: float,
wakeup: boolean
) - > generator

Returns a generator that yields captured events as they arrive from the broker.

Parameters

NameTypeDescription
limitintThe maximum number of events to capture before stopping
timeoutfloatThe maximum time in seconds to wait for new events
wakeupbooleanWhether to signal workers to start sending heartbeats

Returns

TypeDescription
generatorAn iterable stream of events processed by the consumer

capture()

@classmethod
def capture(
limit: int,
timeout: float,
wakeup: boolean
)

Open up a consumer capturing events.

Parameters

NameTypeDescription
limitintThe maximum number of events to process before exiting
timeoutfloatThe maximum time in seconds to wait for events
wakeupbooleanWhether to broadcast a heartbeat request to workers upon starting

wakeup_workers()

@classmethod
def wakeup_workers(
channel: kombu.Connection.channel
)

Broadcasts a heartbeat control command to all workers to ensure they are active and sending events.

Parameters

NameTypeDescription
channelkombu.Connection.channelThe channel used to send the broadcast command

event_from_message()

@classmethod
def event_from_message(
body: dict,
localize: boolean,
now: callable,
tzfields: callable,
adjust_timestamp: callable,
CLIENT_CLOCK_SKEW: int
) - > tuple

Parses a raw message body into a structured event, adjusting logical clocks and localizing timestamps.

Parameters

NameTypeDescription
bodydictThe raw message payload received from the broker
localizebooleanWhether to convert the event timestamp to the local timezone
nowcallableFunction that returns the current local time
tzfieldscallableFunction used to extract timezone information from the event body
adjust_timestampcallableFunction used to apply timezone offsets to the timestamp
CLIENT_CLOCK_SKEWintConstant used to adjust clock values for task-sent events to account for network delay

Returns

TypeDescription
tupleA tuple containing the event type string and the processed event dictionary

connection()

@classmethod
def connection() - > kombu.Connection

Retrieves the underlying broker connection from the current channel.

Returns

TypeDescription
kombu.ConnectionThe client connection object, or None if no channel is active