AWS SQS
AWS SQS cloud-managed message queue implementation with FIFO support and visibility timeout
aws
|
|
Index
- func NewMessageBuilder() mqtypes.MessageBuilder
- func NewSQSConsumer(config *viper.Viper) (types.Consumer, error)
- func NewSQSProducer(config *viper.Viper) (types.Producer, error)
- func NewSQSTransport() mqtypes.Transport
- type ConfigurationError
- type ConfigurationErrors
func NewMessageBuilder
|
|
NewMessageBuilder creates a new SQS message builder. The builder provides a fluent interface for constructing SQS messages with all supported properties and attributes.
SQS has specific limitations:
- Message body: max 256KB
- Message attributes: max 10 attributes
- Attribute name: max 256 characters
- Attribute value: max 256KB for binary, unlimited for string
Example:
msg := aws.NewMessageBuilder().
WithBody([]byte(`{"orderId": "12345", "status": "pending"}`)).
WithContentType("application/json").
WithCorrelationId("order-12345").
WithRoutingKey("orders"). // Used as MessageGroupId for FIFO
WithHeaders(map[string]interface{}{
"priority": "high",
"source": "web-api",
}).
Build()
func NewSQSConsumer
|
|
NewSQSConsumer creates a new AWS SQS consumer with the provided configuration. This function is typically called by mqutils.NewConsumer when it detects an SQS URL.
Configuration options:
- url: SQS queue URL (required, e.g., “sqs://us-east-1/123456789012/my-queue”)
- queue: Queue name (extracted from URL if not provided)
- handler: Name of registered handler function (default: “sqsLogger”)
- region: AWS region (extracted from URL or uses default)
- access_key_id: AWS access key ID (uses default credentials if not set)
- secret_access_key: AWS secret access key (uses default credentials if not set)
- session_token: AWS session token for temporary credentials (optional)
- max_retries: Maximum retry attempts for failed messages (default: 50)
- visibility_timeout: Message visibility timeout in seconds (default: 30)
- wait_time_seconds: Long polling wait time (default: 20, max: 20)
- max_messages: Max messages per receive (default: 10, max: 10)
- message_retention: Message retention period in seconds (default: 345600/4 days)
- fifo_queue: Whether this is a FIFO queue (default: auto-detect from name)
- content_based_deduplication: Enable deduplication for FIFO (default: false)
- batch_size: Number of messages per batch (default: 5)
- batch_timeout: Batch collection timeout in ms (default: 100)
- enable_batch_processing: Enable batch message processing (default: false)
Returns an error if configuration validation fails or transport creation fails.
func NewSQSProducer
|
|
NewSQSProducer creates a new AWS SQS producer with the provided configuration. This function is typically called by mqutils.NewProducer when it detects an SQS URL.
Configuration options:
- url: SQS queue URL (required, e.g., “sqs://us-east-1/123456789012/my-queue”)
- queue_url: Alternative to url for direct queue URL specification
- region: AWS region (extracted from URL or uses default)
- access_key_id: AWS access key ID (uses default credentials if not set)
- secret_access_key: AWS secret access key (uses default credentials if not set)
- session_token: AWS session token for temporary credentials (optional)
- max_message_size: Maximum message size in bytes (default: 262144/256KB)
- fifo_queue: Whether this is a FIFO queue (default: auto-detect from name)
- message_group_id_prefix: Prefix for FIFO message group IDs (optional)
- content_based_deduplication: Enable deduplication for FIFO (default: false)
Returns an error if configuration validation fails or transport creation fails. The producer must be started with Start() before publishing messages.
func NewSQSTransport
|
|
NewSQSTransport creates a new AWS SQS transport implementation. The transport provides low-level SQS operations including queue management, message sending/receiving, and health monitoring.
The transport manages:
- AWS SDK v2 client with automatic credential resolution
- Queue URL validation and attribute retrieval
- Message batching for efficient operations
- FIFO queue support with deduplication
- Dead letter queue configuration
- Connection health monitoring
Unlike message brokers with persistent connections, SQS uses HTTP-based API calls, so there’s no connection to maintain.
type ConfigurationError
ConfigurationError represents a validation error with specific field and message
|
|
func (*ConfigurationError) Error
|
|
type ConfigurationErrors
ConfigurationErrors represents multiple validation errors
|
|
func (*ConfigurationErrors) Add
|
|
Add adds a new configuration error to the collection
func (*ConfigurationErrors) Error
|
|
func (*ConfigurationErrors) HasErrors
|
|
HasErrors returns true if there are any validation errors
Generated by gomarkdoc