EventDispatcher
Dispatches event messages.
Attributes
| Attribute | Type | Description |
|---|---|---|
| DISABLED_TRANSPORTS | set = {'sql'} | A set of transport driver types for which event dispatching is automatically disabled. |
| app | [Celery](../../app/base/celery.md?sid=celery_app_base_celery) = None | The Celery application instance associated with this dispatcher. |
| on_enabled | set = None | set of callbacks to be called when :meth:enabled. |
| on_disabled | set = None | set of callbacks to be called when :meth:disabled. |
| publisher | kombu.Producer | Alias for the internal producer instance used to publish messages to the broker. |
Constructor
Signature
def EventDispatcher(
connection: kombu.Connection = None,
hostname: str = None,
enabled: bool = True,
channel: kombu.Channel = None,
buffer_while_offline: bool = True,
app: [Celery](../../app/base/celery.md?sid=celery_app_base_celery) = None,
serializer: str = None,
groups: Sequence[str] = None,
delivery_mode: int = 1,
buffer_group: frozenset = None,
buffer_limit: int = 24,
on_send_buffered: Callable = None
)
Parameters
| Name | Type | Description |
|---|---|---|
| connection | kombu.Connection = None | Connection to the broker. |
| hostname | str = None | Hostname to identify the dispatcher. |
| enabled | bool = True | Whether to actually publish events. |
| channel | kombu.Channel = None | Specific channel to use for sending events. |
| buffer_while_offline | bool = True | If true, events are buffered while the connection is down. |
| app | [Celery](../../app/base/celery.md?sid=celery_app_base_celery) = None | The Celery app instance. |
| serializer | str = None | The serialization method to use. |
| groups | Sequence[str] = None | List of event groups to send. |
| delivery_mode | int = 1 | Transient (1) or Persistent (2) delivery. |
| buffer_group | frozenset = None | Groups that should be buffered. |
| buffer_limit | int = 24 | Maximum number of events in a group buffer before flushing. |
| on_send_buffered | Callable = None | Callback for when an event is buffered. |
Methods
enable()
@classmethod
def enable()
Initializes the message producer and triggers all registered on_enabled callbacks.
disable()
@classmethod
def disable()
Deactivates the dispatcher, closes the producer, and triggers all registered on_disabled callbacks.
publish()
@classmethod
def publish(
type: str,
fields: dict,
producer: kombu.Producer,
blind: bool,
Event: Callable
) - > Any
Publish event using custom :class:~kombu.Producer.
Parameters
| Name | Type | Description |
|---|---|---|
| type | str | Event type name, with group separated by dash (-). |
| fields | dict | Dictionary of event fields, must be json serializable. |
| producer | kombu.Producer | Producer instance to use: only the publish method will be called. |
| blind | bool | Don't set logical clock value (also don't forward the internal logical clock). |
| Event | Callable | Event type used to create event. Defaults to :func:Event. |
Returns
| Type | Description |
|---|---|
Any | The result of the internal _publish call. |
send()
@classmethod
def send(
type: str,
blind: bool,
utcoffset: Callable,
retry: bool,
retry_policy: dict,
Event: Callable,
**fields: Any
) - > Any
Send event.
Parameters
| Name | Type | Description |
|---|---|---|
| type | str | Event type name, with group separated by dash (-). |
| blind | bool | Don't set logical clock value (also don't forward the internal logical clock). |
| utcoffset | Callable | Function returning the current utc offset in hours. |
| retry | bool | Retry in the event of connection failure. |
| retry_policy | dict | Map of custom retry policy options. |
| Event | Callable | Event type used to create event, defaults to :func:Event. |
| **fields | Any | Event fields -- must be json serializable. |
Returns
| Type | Description |
|---|---|
Any | Returns the result of the publication or None if the event is buffered or ignored. |
flush()
@classmethod
def flush(
errors: bool,
groups: bool
)
Flush the outbound buffer.
Parameters
| Name | Type | Description |
|---|---|---|
| errors | bool | If true, attempts to send events that were buffered due to previous connection errors. |
| groups | bool | If true, flushes events that were intentionally buffered by group. |
extend_buffer()
@classmethod
def extend_buffer(
other: [EventDispatcher](eventdispatcher.md?sid=celery_events_dispatcher_eventdispatcher)
)
Copy the outbound buffer of another instance.
Parameters
| Name | Type | Description |
|---|---|---|
| other | [EventDispatcher](eventdispatcher.md?sid=celery_events_dispatcher_eventdispatcher) | The other dispatcher instance from which to copy buffered events. |
close()
@classmethod
def close()
Close the event dispatcher.