AMQP

AMQP/RabbitMQ message queue implementation with exchanges, routing keys, and dead letter queues

amqp

1
import "gitlab.com/digitalxero/mqutils/amqp"

Index

func AcknowledgeMessage

1
func AcknowledgeMessage(ctx context.Context, message types.Message) error

AcknowledgeMessage provides a robust way for handlers to acknowledge messages with retry logic This function is exported to allow handlers to use the same acknowledgment retry mechanism

func NacknowledgeMessage

1
func NacknowledgeMessage(ctx context.Context, message types.Message) error

NacknowledgeMessage provides a robust way for handlers to nack messages with retry logic This function is exported to allow handlers to use the same nack retry mechanism

func NewAMQPConsumer

1
func NewAMQPConsumer(config *viper.Viper) (types.Consumer, error)

NewAMQPConsumer creates a new AMQP/RabbitMQ consumer with the provided configuration. This function is typically called by mqutils.NewConsumer when it detects an AMQP URL.

Configuration options:

  • url: AMQP connection URL (required, e.g., “amqp://user:pass@host:port/vhost”)
  • queue: Queue name to consume from (required)
  • exchange: Exchange name (optional, for binding)
  • exchange_type: Exchange type - direct, fanout, topic, headers (default: direct)
  • routing_key: Routing key for queue binding (optional)
  • transient: Whether queue is non-durable (default: false)
  • auto_declare: Auto-create queue/exchange if missing (default: false)
  • auto_reconnect: Automatically reconnect on connection loss (default: false)
  • skip_verify: Skip TLS certificate verification (default: false)
  • handler: Name of registered handler function (default: “amqpLogger”)
  • dead_letter_exchange: DLX for failed messages (optional)
  • retry_queue_name: Queue for message retries (optional)
  • retry_queue_ttl: TTL for retry messages in ms (default: 1000)
  • retry_queue_max_retries: Max retry attempts (default: 50)
  • message_channel_buffer: Buffer size for message channel (default: 10)
  • graceful_shutdown_timeout: Shutdown timeout in seconds (default: 300)
  • enable_graceful_shutdown: Enable graceful shutdown (default: false)
  • batch_size: Number of messages per batch (default: 5)
  • batch_timeout: Batch collection timeout in ms (default: 100)
  • enable_batch_processing: Enable batch message processing (default: false)

Returns an error if configuration validation fails or transport creation fails.

func NewAMQPProducer

1
func NewAMQPProducer(config *viper.Viper) (types.Producer, error)

NewAMQPProducer creates a new AMQP/RabbitMQ producer with the provided configuration. This function is typically called by mqutils.NewProducer when it detects an AMQP URL.

Configuration options:

  • url: AMQP connection URL (required, e.g., “amqp://user:pass@host:port/vhost”)
  • exchange: Exchange name for publishing (required)
  • exchange_type: Exchange type - direct, fanout, topic, headers (default: topic)
  • routing_key_prefix: Prefix to add to all routing keys (optional)
  • durable: Whether exchange is durable (default: true)
  • auto_declare: Auto-create exchange if missing (default: false)
  • skip_verify: Skip TLS certificate verification (default: false)
  • priority: Default message priority 0-9 (default: 0)
  • delivery_mode: Message persistence - 1 (non-persistent), 2 (persistent) (default: 2)
  • channel_pool_size: Max pooled channels for publishing (default: 20)

Returns an error if configuration validation fails or transport creation fails. The producer must be started with Start() before publishing messages.

func NewAMQPTransport

1
func NewAMQPTransport(retry bool, prefetch int) types.Transport

NewAMQPTransport creates a new AMQP transport implementation. The transport provides low-level AMQP operations including connection management, channel pooling, message publishing, and consumption.

Parameters:

  • retry: Whether to retry failed operations (used for retry queue handling)
  • prefetch: Number of messages to prefetch for fair dispatching (0 = unlimited)

The transport uses connection pooling to efficiently manage AMQP resources and provides separate channels for consumer operations and publishing to avoid conflicts and improve performance.

func NewMessageBuilder

1
func NewMessageBuilder() types.MessageBuilder

NewMessageBuilder creates a new AMQP message builder. The builder provides a fluent interface for constructing AMQP messages with all supported properties and attributes.

Example:

msg := amqp.NewMessageBuilder().
    WithBody([]byte("Hello, World!")).
    WithContentType("text/plain").
    WithCorrelationId("req-123").
    WithHeaders(map[string]interface{}{"source": "service-a"}).
    WithDeliveryMode(2). // Persistent
    Build()

type ConfigurationError

ConfigurationError represents a validation error with specific field and message

1
2
3
4
type ConfigurationError struct {
    Field   string
    Message string
}

func (*ConfigurationError) Error

1
func (e *ConfigurationError) Error() string

type ConfigurationErrors

ConfigurationErrors represents multiple validation errors

1
2
3
type ConfigurationErrors struct {
    Errors []*ConfigurationError
}

func (*ConfigurationErrors) Add

1
func (e *ConfigurationErrors) Add(field, message string)

Add adds a new configuration error to the collection

func (*ConfigurationErrors) Error

1
func (e *ConfigurationErrors) Error() string

func (*ConfigurationErrors) HasErrors

1
func (e *ConfigurationErrors) HasErrors() bool

HasErrors returns true if there are any validation errors

Generated by gomarkdoc