aws_iot_core_event_listener_driver v1.2

Bases: BaseEventListenerDriver

Source Code in griptape/drivers/event_listener/aws_iot_core_event_listener_driver.py
@define
class AwsIotCoreEventListenerDriver(BaseEventListenerDriver):
    iot_endpoint: str = field(kw_only=True)
    topic: str = field(kw_only=True)
    session: boto3.Session = field(default=Factory(lambda: import_optional_dependency("boto3").Session()), kw_only=True)
    _client: Optional[IoTDataPlaneClient] = field(
        default=None, kw_only=True, alias="client", metadata={"serializable": False}
    )

    @lazy_property()
    def client(self) -> IoTDataPlaneClient:
        return self.session.client("iot-data")

    def try_publish_event_payload(self, event_payload: dict) -> None:
        self.client.publish(topic=self.topic, payload=json.dumps(event_payload))

    def try_publish_event_payload_batch(self, event_payload_batch: list[dict]) -> None:
        self.client.publish(topic=self.topic, payload=json.dumps(event_payload_batch))
  • _client = field(default=None, kw_only=True, alias='client', metadata={'serializable': False}) class-attribute instance-attribute

  • iot_endpoint = field(kw_only=True) class-attribute instance-attribute

  • session = field(default=Factory(lambda: import_optional_dependency('boto3').Session()), kw_only=True) class-attribute instance-attribute

  • topic = field(kw_only=True) class-attribute instance-attribute

client()

Source Code in griptape/drivers/event_listener/aws_iot_core_event_listener_driver.py
@lazy_property()
def client(self) -> IoTDataPlaneClient:
    return self.session.client("iot-data")

try_publish_event_payload(event_payload)

Source Code in griptape/drivers/event_listener/aws_iot_core_event_listener_driver.py
def try_publish_event_payload(self, event_payload: dict) -> None:
    self.client.publish(topic=self.topic, payload=json.dumps(event_payload))

try_publish_event_payload_batch(event_payload_batch)

Source Code in griptape/drivers/event_listener/aws_iot_core_event_listener_driver.py
def try_publish_event_payload_batch(self, event_payload_batch: list[dict]) -> None:
    self.client.publish(topic=self.topic, payload=json.dumps(event_payload_batch))