amazon_sqs v1.2
Adapted from the Griptape AI Framework documentation.
__all__ = ['AmazonSqsEventListenerDriver']module-attribute
Bases:
BaseEventListenerDriver
Source Code in griptape/drivers/event_listener/amazon_sqs_event_listener_driver.py
@define class AmazonSqsEventListenerDriver(BaseEventListenerDriver): queue_url: str = field(kw_only=True) session: boto3.Session = field(default=Factory(lambda: import_optional_dependency("boto3").Session()), kw_only=True) _client: Optional[SQSClient] = field(default=None, kw_only=True, alias="client", metadata={"serializable": False}) @lazy_property() def client(self) -> SQSClient: return self.session.client("sqs") def try_publish_event_payload(self, event_payload: dict) -> None: self.client.send_message(QueueUrl=self.queue_url, MessageBody=json.dumps(event_payload)) def try_publish_event_payload_batch(self, event_payload_batch: list[dict]) -> None: entries: Sequence[SendMessageBatchRequestEntryTypeDef] = [ {"Id": str(event_payload["id"]), "MessageBody": json.dumps(event_payload)} for event_payload in event_payload_batch ] self.client.send_message_batch(QueueUrl=self.queue_url, Entries=entries)
_client = field(default=None, kw_only=True, alias='client', metadata={'serializable': False})class-attribute instance-attributequeue_url = field(kw_only=True)class-attribute instance-attributesession = field(default=Factory(lambda: import_optional_dependency('boto3').Session()), kw_only=True)class-attribute instance-attribute
client()
Source Code in griptape/drivers/event_listener/amazon_sqs_event_listener_driver.py
@lazy_property() def client(self) -> SQSClient: return self.session.client("sqs")
try_publish_event_payload(event_payload)
Source Code in griptape/drivers/event_listener/amazon_sqs_event_listener_driver.py
def try_publish_event_payload(self, event_payload: dict) -> None: self.client.send_message(QueueUrl=self.queue_url, MessageBody=json.dumps(event_payload))
try_publish_event_payload_batch(event_payload_batch)
Source Code in griptape/drivers/event_listener/amazon_sqs_event_listener_driver.py
def try_publish_event_payload_batch(self, event_payload_batch: list[dict]) -> None: entries: Sequence[SendMessageBatchRequestEntryTypeDef] = [ {"Id": str(event_payload["id"]), "MessageBody": json.dumps(event_payload)} for event_payload in event_payload_batch ] self.client.send_message_batch(QueueUrl=self.queue_url, Entries=entries)