pusher_event_listener_driver v1.2
Bases:
BaseEventListenerDriver
Source Code in griptape/drivers/event_listener/pusher_event_listener_driver.py
@define class PusherEventListenerDriver(BaseEventListenerDriver): app_id: str = field(kw_only=True, metadata={"serializable": True}) key: str = field(kw_only=True, metadata={"serializable": True}) secret: str = field(kw_only=True, metadata={"serializable": False}) cluster: str = field(kw_only=True, metadata={"serializable": True}) channel: str = field(kw_only=True, metadata={"serializable": True}) event_name: str = field(kw_only=True, metadata={"serializable": True}) ssl: bool = field(default=True, kw_only=True, metadata={"serializable": True}) _client: Optional[Pusher] = field(default=None, kw_only=True, alias="client", metadata={"serializable": False}) @lazy_property() def client(self) -> Pusher: return import_optional_dependency("pusher").Pusher( app_id=self.app_id, key=self.key, secret=self.secret, cluster=self.cluster, ssl=self.ssl, ) def try_publish_event_payload_batch(self, event_payload_batch: list[dict]) -> None: data = [ {"channel": self.channel, "name": self.event_name, "data": event_payload} for event_payload in event_payload_batch ] self.client.trigger_batch(data) def try_publish_event_payload(self, event_payload: dict) -> None: self.client.trigger(channels=self.channel, event_name=self.event_name, data=event_payload)
_client = field(default=None, kw_only=True, alias='client', metadata={'serializable': False})class-attribute instance-attributeapp_id = field(kw_only=True, metadata={'serializable': True})class-attribute instance-attributechannel = field(kw_only=True, metadata={'serializable': True})class-attribute instance-attributecluster = field(kw_only=True, metadata={'serializable': True})class-attribute instance-attributeevent_name = field(kw_only=True, metadata={'serializable': True})class-attribute instance-attributekey = field(kw_only=True, metadata={'serializable': True})class-attribute instance-attributesecret = field(kw_only=True, metadata={'serializable': False})class-attribute instance-attributessl = field(default=True, kw_only=True, metadata={'serializable': True})class-attribute instance-attribute
client()
Source Code in griptape/drivers/event_listener/pusher_event_listener_driver.py
@lazy_property() def client(self) -> Pusher: return import_optional_dependency("pusher").Pusher( app_id=self.app_id, key=self.key, secret=self.secret, cluster=self.cluster, ssl=self.ssl, )
try_publish_event_payload(event_payload)
Source Code in griptape/drivers/event_listener/pusher_event_listener_driver.py
def try_publish_event_payload(self, event_payload: dict) -> None: self.client.trigger(channels=self.channel, event_name=self.event_name, data=event_payload)
try_publish_event_payload_batch(event_payload_batch)
Source Code in griptape/drivers/event_listener/pusher_event_listener_driver.py
def try_publish_event_payload_batch(self, event_payload_batch: list[dict]) -> None: data = [ {"channel": self.channel, "name": self.event_name, "data": event_payload} for event_payload in event_payload_batch ] self.client.trigger_batch(data)