AWS SQS

AWS SQS cloud-managed message queue implementation with FIFO support and visibility timeout

aws

1
import "gitlab.com/digitalxero/mqutils/aws"

Index

func NewMessageBuilder

1
func NewMessageBuilder() mqtypes.MessageBuilder

NewMessageBuilder creates a new SQS message builder. The builder provides a fluent interface for constructing SQS messages with all supported properties and attributes.

SQS has specific limitations:

  • Message body: max 256KB
  • Message attributes: max 10 attributes
  • Attribute name: max 256 characters
  • Attribute value: max 256KB for binary, unlimited for string

Example:

msg := aws.NewMessageBuilder().
    WithBody([]byte(`{"orderId": "12345", "status": "pending"}`)).
    WithContentType("application/json").
    WithCorrelationId("order-12345").
    WithRoutingKey("orders"). // Used as MessageGroupId for FIFO
    WithHeaders(map[string]interface{}{
        "priority": "high",
        "source": "web-api",
    }).
    Build()

func NewSQSConsumer

1
func NewSQSConsumer(config *viper.Viper) (types.Consumer, error)

NewSQSConsumer creates a new AWS SQS consumer with the provided configuration. This function is typically called by mqutils.NewConsumer when it detects an SQS URL.

Configuration options:

  • url: SQS queue URL (required, e.g., “sqs://us-east-1/123456789012/my-queue”)
  • queue: Queue name (extracted from URL if not provided)
  • handler: Name of registered handler function (default: “sqsLogger”)
  • region: AWS region (extracted from URL or uses default)
  • access_key_id: AWS access key ID (uses default credentials if not set)
  • secret_access_key: AWS secret access key (uses default credentials if not set)
  • session_token: AWS session token for temporary credentials (optional)
  • max_retries: Maximum retry attempts for failed messages (default: 50)
  • visibility_timeout: Message visibility timeout in seconds (default: 30)
  • wait_time_seconds: Long polling wait time (default: 20, max: 20)
  • max_messages: Max messages per receive (default: 10, max: 10)
  • message_retention: Message retention period in seconds (default: 345600/4 days)
  • fifo_queue: Whether this is a FIFO queue (default: auto-detect from name)
  • content_based_deduplication: Enable deduplication for FIFO (default: false)
  • batch_size: Number of messages per batch (default: 5)
  • batch_timeout: Batch collection timeout in ms (default: 100)
  • enable_batch_processing: Enable batch message processing (default: false)

Returns an error if configuration validation fails or transport creation fails.

func NewSQSProducer

1
func NewSQSProducer(config *viper.Viper) (types.Producer, error)

NewSQSProducer creates a new AWS SQS producer with the provided configuration. This function is typically called by mqutils.NewProducer when it detects an SQS URL.

Configuration options:

  • url: SQS queue URL (required, e.g., “sqs://us-east-1/123456789012/my-queue”)
  • queue_url: Alternative to url for direct queue URL specification
  • region: AWS region (extracted from URL or uses default)
  • access_key_id: AWS access key ID (uses default credentials if not set)
  • secret_access_key: AWS secret access key (uses default credentials if not set)
  • session_token: AWS session token for temporary credentials (optional)
  • max_message_size: Maximum message size in bytes (default: 262144/256KB)
  • fifo_queue: Whether this is a FIFO queue (default: auto-detect from name)
  • message_group_id_prefix: Prefix for FIFO message group IDs (optional)
  • content_based_deduplication: Enable deduplication for FIFO (default: false)

Returns an error if configuration validation fails or transport creation fails. The producer must be started with Start() before publishing messages.

func NewSQSTransport

1
func NewSQSTransport() mqtypes.Transport

NewSQSTransport creates a new AWS SQS transport implementation. The transport provides low-level SQS operations including queue management, message sending/receiving, and health monitoring.

The transport manages:

  • AWS SDK v2 client with automatic credential resolution
  • Queue URL validation and attribute retrieval
  • Message batching for efficient operations
  • FIFO queue support with deduplication
  • Dead letter queue configuration
  • Connection health monitoring

Unlike message brokers with persistent connections, SQS uses HTTP-based API calls, so there’s no connection to maintain.

type ConfigurationError

ConfigurationError represents a validation error with specific field and message

1
2
3
4
type ConfigurationError struct {
    Field   string
    Message string
}

func (*ConfigurationError) Error

1
func (e *ConfigurationError) Error() string

type ConfigurationErrors

ConfigurationErrors represents multiple validation errors

1
2
3
type ConfigurationErrors struct {
    Errors []*ConfigurationError
}

func (*ConfigurationErrors) Add

1
func (e *ConfigurationErrors) Add(field, message string)

Add adds a new configuration error to the collection

func (*ConfigurationErrors) Error

1
func (e *ConfigurationErrors) Error() string

func (*ConfigurationErrors) HasErrors

1
func (e *ConfigurationErrors) HasErrors() bool

HasErrors returns true if there are any validation errors

Generated by gomarkdoc