EventReceiver
Capture events.
Attributes
| Attribute | Type | Description |
|---|---|---|
| app | [Celery](../../app/base/celery.md?sid=celery_app_base_celery) = null | The Celery application instance used for configuration and clock synchronization. |
Constructor
Signature
def EventReceiver(
channel: kombu.Connection,
handlers: Mapping[Callable] = null,
routing_key: string = '#',
node_id: string = null,
app: [Celery](../../app/base/celery.md?sid=celery_app_base_celery) = null,
queue_prefix: string = null,
accept: Set[string] = null,
queue_ttl: float = null,
queue_expires: float = null,
queue_exclusive: boolean = null,
queue_durable: boolean = null
) - > null
Parameters
| Name | Type | Description |
|---|---|---|
| channel | kombu.Connection | The channel or connection used to communicate with the broker. |
| handlers | Mapping[Callable] = null | A map of event type names to their respective handler functions. |
| routing_key | string = '#' | The routing key used for the event queue. |
| node_id | string = null | Unique identifier for the node; defaults to a new UUID. |
| app | [Celery](../../app/base/celery.md?sid=celery_app_base_celery) = null | The Celery application instance. |
| queue_prefix | string = null | Prefix for the event queue name. |
| accept | Set[string] = null | A set of content types to accept. |
| queue_ttl | float = null | Time-to-live for messages in the queue. |
| queue_expires | float = null | Expiration time for the queue itself. |
| queue_exclusive | boolean = null | Whether the queue is exclusive to this connection. |
| queue_durable | boolean = null | Whether the queue is durable and survives broker restarts. |
Methods
process()
@classmethod
def process(
type: string,
event: dict
)
Process event by dispatching to configured handler.
Parameters
| Name | Type | Description |
|---|---|---|
| type | string | The event type name used to look up the appropriate handler function |
| event | dict | The event data payload to be passed to the handler |
get_consumers()
@classmethod
def get_consumers(
Consumer: callable,
channel: kombu.Connection.channel
) - > list
Creates and returns a list of Kombu consumers configured to listen on the event queue.
Parameters
| Name | Type | Description |
|---|---|---|
| Consumer | callable | The Consumer class or factory used to instantiate the consumer |
| channel | kombu.Connection.channel | The AMQP channel to use for the consumer |
Returns
| Type | Description |
|---|---|
list | A list containing a single Kombu Consumer instance configured for event processing |
on_consume_ready()
@classmethod
def on_consume_ready(
connection: kombu.Connection,
channel: kombu.Connection.channel,
consumers: list,
wakeup: boolean
)
Triggers post-consumption setup, such as waking up workers when the consumer is ready.
Parameters
| Name | Type | Description |
|---|---|---|
| connection | kombu.Connection | The active broker connection |
| channel | kombu.Connection.channel | The active AMQP channel |
| consumers | list | The list of active consumer instances |
| wakeup | boolean | Flag indicating whether to broadcast a heartbeat to wake up workers |
itercapture()
@classmethod
def itercapture(
limit: int,
timeout: float,
wakeup: boolean
) - > generator
Returns a generator that yields captured events as they arrive from the broker.
Parameters
| Name | Type | Description |
|---|---|---|
| limit | int | The maximum number of events to capture before stopping |
| timeout | float | The maximum time in seconds to wait for new events |
| wakeup | boolean | Whether to signal workers to start sending heartbeats |
Returns
| Type | Description |
|---|---|
generator | An iterable stream of events processed by the consumer |
capture()
@classmethod
def capture(
limit: int,
timeout: float,
wakeup: boolean
)
Open up a consumer capturing events.
Parameters
| Name | Type | Description |
|---|---|---|
| limit | int | The maximum number of events to process before exiting |
| timeout | float | The maximum time in seconds to wait for events |
| wakeup | boolean | Whether to broadcast a heartbeat request to workers upon starting |
wakeup_workers()
@classmethod
def wakeup_workers(
channel: kombu.Connection.channel
)
Broadcasts a heartbeat control command to all workers to ensure they are active and sending events.
Parameters
| Name | Type | Description |
|---|---|---|
| channel | kombu.Connection.channel | The channel used to send the broadcast command |
event_from_message()
@classmethod
def event_from_message(
body: dict,
localize: boolean,
now: callable,
tzfields: callable,
adjust_timestamp: callable,
CLIENT_CLOCK_SKEW: int
) - > tuple
Parses a raw message body into a structured event, adjusting logical clocks and localizing timestamps.
Parameters
| Name | Type | Description |
|---|---|---|
| body | dict | The raw message payload received from the broker |
| localize | boolean | Whether to convert the event timestamp to the local timezone |
| now | callable | Function that returns the current local time |
| tzfields | callable | Function used to extract timezone information from the event body |
| adjust_timestamp | callable | Function used to apply timezone offsets to the timestamp |
| CLIENT_CLOCK_SKEW | int | Constant used to adjust clock values for task-sent events to account for network delay |
Returns
| Type | Description |
|---|---|
tuple | A tuple containing the event type string and the processed event dictionary |
connection()
@classmethod
def connection() - > kombu.Connection
Retrieves the underlying broker connection from the current channel.
Returns
| Type | Description |
|---|---|
kombu.Connection | The client connection object, or None if no channel is active |