AMQP
AMQP/RabbitMQ message queue implementation with exchanges, routing keys, and dead letter queues
amqp
|
|
Index
- func AcknowledgeMessage(ctx context.Context, message types.Message) error
- func NacknowledgeMessage(ctx context.Context, message types.Message) error
- func NewAMQPConsumer(config *viper.Viper) (types.Consumer, error)
- func NewAMQPProducer(config *viper.Viper) (types.Producer, error)
- func NewAMQPTransport(retry bool, prefetch int) types.Transport
- func NewMessageBuilder() types.MessageBuilder
- type ConfigurationError
- type ConfigurationErrors
func AcknowledgeMessage
|
|
AcknowledgeMessage provides a robust way for handlers to acknowledge messages with retry logic This function is exported to allow handlers to use the same acknowledgment retry mechanism
func NacknowledgeMessage
|
|
NacknowledgeMessage provides a robust way for handlers to nack messages with retry logic This function is exported to allow handlers to use the same nack retry mechanism
func NewAMQPConsumer
|
|
NewAMQPConsumer creates a new AMQP/RabbitMQ consumer with the provided configuration. This function is typically called by mqutils.NewConsumer when it detects an AMQP URL.
Configuration options:
- url: AMQP connection URL (required, e.g., “amqp://user:pass@host:port/vhost”)
- queue: Queue name to consume from (required)
- exchange: Exchange name (optional, for binding)
- exchange_type: Exchange type - direct, fanout, topic, headers (default: direct)
- routing_key: Routing key for queue binding (optional)
- transient: Whether queue is non-durable (default: false)
- auto_declare: Auto-create queue/exchange if missing (default: false)
- auto_reconnect: Automatically reconnect on connection loss (default: false)
- skip_verify: Skip TLS certificate verification (default: false)
- handler: Name of registered handler function (default: “amqpLogger”)
- dead_letter_exchange: DLX for failed messages (optional)
- retry_queue_name: Queue for message retries (optional)
- retry_queue_ttl: TTL for retry messages in ms (default: 1000)
- retry_queue_max_retries: Max retry attempts (default: 50)
- message_channel_buffer: Buffer size for message channel (default: 10)
- graceful_shutdown_timeout: Shutdown timeout in seconds (default: 300)
- enable_graceful_shutdown: Enable graceful shutdown (default: false)
- batch_size: Number of messages per batch (default: 5)
- batch_timeout: Batch collection timeout in ms (default: 100)
- enable_batch_processing: Enable batch message processing (default: false)
Returns an error if configuration validation fails or transport creation fails.
func NewAMQPProducer
|
|
NewAMQPProducer creates a new AMQP/RabbitMQ producer with the provided configuration. This function is typically called by mqutils.NewProducer when it detects an AMQP URL.
Configuration options:
- url: AMQP connection URL (required, e.g., “amqp://user:pass@host:port/vhost”)
- exchange: Exchange name for publishing (required)
- exchange_type: Exchange type - direct, fanout, topic, headers (default: topic)
- routing_key_prefix: Prefix to add to all routing keys (optional)
- durable: Whether exchange is durable (default: true)
- auto_declare: Auto-create exchange if missing (default: false)
- skip_verify: Skip TLS certificate verification (default: false)
- priority: Default message priority 0-9 (default: 0)
- delivery_mode: Message persistence - 1 (non-persistent), 2 (persistent) (default: 2)
- channel_pool_size: Max pooled channels for publishing (default: 20)
Returns an error if configuration validation fails or transport creation fails. The producer must be started with Start() before publishing messages.
func NewAMQPTransport
|
|
NewAMQPTransport creates a new AMQP transport implementation. The transport provides low-level AMQP operations including connection management, channel pooling, message publishing, and consumption.
Parameters:
- retry: Whether to retry failed operations (used for retry queue handling)
- prefetch: Number of messages to prefetch for fair dispatching (0 = unlimited)
The transport uses connection pooling to efficiently manage AMQP resources and provides separate channels for consumer operations and publishing to avoid conflicts and improve performance.
func NewMessageBuilder
|
|
NewMessageBuilder creates a new AMQP message builder. The builder provides a fluent interface for constructing AMQP messages with all supported properties and attributes.
Example:
msg := amqp.NewMessageBuilder().
WithBody([]byte("Hello, World!")).
WithContentType("text/plain").
WithCorrelationId("req-123").
WithHeaders(map[string]interface{}{"source": "service-a"}).
WithDeliveryMode(2). // Persistent
Build()
type ConfigurationError
ConfigurationError represents a validation error with specific field and message
|
|
func (*ConfigurationError) Error
|
|
type ConfigurationErrors
ConfigurationErrors represents multiple validation errors
|
|
func (*ConfigurationErrors) Add
|
|
Add adds a new configuration error to the collection
func (*ConfigurationErrors) Error
|
|
func (*ConfigurationErrors) HasErrors
|
|
HasErrors returns true if there are any validation errors
Generated by gomarkdoc