types
1
| import "gitlab.com/digitalxero/mqutils/types"
|
Index
Constants
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
| const (
// DefaultRetryQueueTTL is the default time-to-live for retry queue messages in milliseconds
DefaultRetryQueueTTL = 1000
// DefaultMaxRetries is the default maximum number of retry attempts for failed messages
DefaultMaxRetries = 50
// DefaultAckRetries is the default number of acknowledgment retry attempts
DefaultAckRetries = 3
// DefaultAckRetryDelay is the base delay for acknowledgment retries in milliseconds
DefaultAckRetryDelay = 100
// DefaultMessageChannelBuffer is the default buffer size for message channels
// A buffer of 10 provides good throughput while avoiding excessive memory usage
DefaultMessageChannelBuffer = 10
// DefaultChannelPoolSize is the default maximum size for AMQP channel pools
// A pool of 20 channels provides good resource reuse while avoiding excessive connections
DefaultChannelPoolSize = 20
// DefaultGracefulShutdownTimeout is the default timeout for graceful shutdown in seconds
// Allows in-flight messages to complete processing before forcing shutdown
DefaultGracefulShutdownTimeout = 300
// DefaultBatchSize is the default number of messages to process in a batch
// A batch size of 10 provides good throughput while maintaining responsiveness
DefaultBatchSize = 5
// DefaultBatchTimeout is the default timeout for batch collection in milliseconds
// A timeout of 100ms ensures batches are processed regularly even with low message volume
DefaultBatchTimeout = 100
)
|
Variables
Common message acknowledgment errors that can occur during message processing. These errors help identify specific failure conditions in message handling.
1
2
3
4
5
6
7
8
9
10
11
| var (
// ErrAlreadyAcknowledged is returned when attempting to acknowledge
// or reject a message that has already been acknowledged. This prevents
// duplicate acknowledgments which could cause issues with some brokers.
ErrAlreadyAcknowledged = errors.New("message has already been acknowledged")
// ErrTransportNotSet is returned when attempting to acknowledge a message
// that doesn't have an associated transport. This typically indicates
// a programming error where a message was created without proper initialization.
ErrTransportNotSet = errors.New("message transport is not set")
)
|
1
| func NewErrUnknownPublisherType(url string) error
|
NewErrUnknownPublisherType creates a new ErrUnknownPublisherType error with the specified URL. The URL parameter typically contains the full connection URL that could not be parsed or recognized.
1
| func RegisterBatchHandler(name string, handler BatchHandlerFunc)
|
RegisterBatchHandler registers a batch message handler function with the given name. Batch handlers process multiple messages at once, which can be more efficient for certain operations like bulk database inserts. The handler registry provides a centralized location for all batch handlers in the application.
Example:
types.RegisterBatchHandler("events.bulk", handleEventsBatch)
types.RegisterBatchHandler("metrics.aggregate", handleMetricsBatch)
1
| func RegisterConsumer(consumer NewConsumerFunc, urlPrefixes ...string)
|
RegisterConsumer registers a consumer factory function with one or more URL prefixes. This allows the mqutils package to create the appropriate consumer type based on the connection URL scheme. Each message queue implementation should register its consumer factory during package initialization.
Example:
func init() {
types.RegisterConsumer(NewAMQPConsumer, "amqp://", "amqps://")
}
1
| func RegisterHandler(name string, handler HandlerFunc)
|
RegisterHandler registers a message handler function with the given name. Handlers can be retrieved by name and used to process messages from specific queues or topics. This provides a centralized registry for message handlers across the application.
Example:
types.RegisterHandler("user.created", handleUserCreated)
types.RegisterHandler("order.placed", handleOrderPlaced)
1
| func RegisterProducer(producer NewProducerFunc, urlPrefixes ...string)
|
RegisterProducer registers a producer factory function with one or more URL prefixes. This allows the mqutils package to create the appropriate producer type based on the connection URL scheme. Each message queue implementation should register its producer factory during package initialization.
Example:
func init() {
types.RegisterProducer(NewKafkaProducer, "kafka://", "kafkas://")
}
BatchHandlerFunc handles a batch of messages as a single unit. This is useful for operations that benefit from processing multiple messages together, such as bulk database inserts or aggregations.
The handler receives all messages in the batch and is responsible for acknowledging or rejecting each message individually based on the processing results. If the entire batch fails, all messages should be rejected. If only some messages fail, the handler can acknowledge the successful ones and reject the failures.
Batch handlers should be careful about transaction boundaries and partial failures. Consider using database transactions or other mechanisms to ensure consistency when processing batches.
Example:
func myBatchHandler(ctx context.Context, msgs []types.Message) {
// Prepare batch data
var records []Record
for _, msg := range msgs {
var record Record
if err := json.Unmarshal(msg.Body(), &record); err != nil {
msg.Logger(ctx).Error("Invalid message", zap.Error(err))
msg.Nack()
continue
}
records = append(records, record)
}
// Bulk insert
if err := db.BulkInsert(ctx, records); err != nil {
// Failed - reject all messages
for _, msg := range msgs {
msg.Nack()
}
return
}
// Success - acknowledge all messages
for _, msg := range msgs {
msg.Ack()
}
}
1
| type BatchHandlerFunc func(ctx context.Context, msgs []Message)
|
1
| func GetBatchHandler(name string) BatchHandlerFunc
|
GetBatchHandler retrieves a registered batch message handler by name. Returns nil if no batch handler is registered with the given name. This is typically used by consumers that support batch processing to find the appropriate handler for a specific queue or topic.
Consumer defines the high-level interface for message consumers. It provides a simple abstraction for running message processing loops with built-in health monitoring capabilities.
Consumer implementations handle all the complexity of connecting to message queues, receiving messages, managing acknowledgments, and processing messages through registered handlers. The consumer runs continuously until the context is canceled or an unrecoverable error occurs.
Consumer embeds HealthChecker to provide health monitoring capabilities, allowing systems to check the consumer’s connection status and overall health.
Example usage:
consumer, err := mqutils.NewConsumer("amqp://localhost:5672", "my-queue",
handlers.WithHandler("my-queue", myHandler))
if err != nil {
log.Fatal(err)
}
// Run the consumer until context is canceled
if err := consumer.Run(ctx); err != nil {
log.Printf("Consumer stopped: %v", err)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| type Consumer interface {
HealthChecker
// Run starts the consumer's message processing loop. It will continuously
// receive messages from the configured queue and dispatch them to the
// appropriate handlers until the context is canceled or an error occurs.
//
// The consumer will:
// - Establish connections to the message broker
// - Subscribe to the configured queue(s)
// - Receive messages and dispatch them to handlers
// - Handle message acknowledgments based on handler results
// - Implement graceful shutdown when context is canceled
//
// Run blocks until the consumer stops. It returns nil if the consumer
// was stopped due to context cancellation, or an error if the consumer
// encountered an unrecoverable error.
Run(ctx context.Context) error
}
|
ErrUnknownConsumerType is returned when attempting to create a consumer with an unrecognized URL scheme. This typically indicates that the requested message queue system is not supported or the URL is malformed.
1
2
3
| type ErrUnknownConsumerType struct {
// contains filtered or unexported fields
}
|
1
| func NewErrUnknownConsumerType(typeName string) *ErrUnknownConsumerType
|
NewErrUnknownConsumerType creates a new ErrUnknownConsumerType error with the specified type name. The type name is typically the URL scheme that was not recognized (e.g., “unknown://”).
func (ErrUnknownConsumerType) Error
1
| func (e ErrUnknownConsumerType) Error() string
|
Error implements the error interface for ErrUnknownConsumerType. It returns a descriptive error message including the unknown type name.
ErrUnknownPublisherType is returned when attempting to create a publisher with an unrecognized URL scheme. This indicates that the requested message queue system is not supported for publishing or the URL is malformed.
1
2
3
| type ErrUnknownPublisherType struct {
// contains filtered or unexported fields
}
|
func (ErrUnknownPublisherType) Error
1
| func (e ErrUnknownPublisherType) Error() string
|
Error implements the error interface for ErrUnknownPublisherType. It returns a descriptive error message including the unknown type name.
HandlerFunc is the function signature for processing individual messages. Handlers receive a context for cancellation/timeout control and a Message to process. The handler is responsible for acknowledging or rejecting the message based on the processing result.
Handlers should be designed to be idempotent where possible, as messages may be redelivered in case of failures. Long-running operations should respect context cancellation to enable graceful shutdown.
Example:
func myHandler(ctx context.Context, msg types.Message) {
// Decode the message
var data MyData
if err := json.Unmarshal(msg.Body(), &data); err != nil {
msg.Logger(ctx).Error("Invalid message format", zap.Error(err))
msg.Nack() // Reject invalid messages
return
}
// Process the data
if err := processData(ctx, data); err != nil {
msg.Logger(ctx).Error("Processing failed", zap.Error(err))
msg.Nack() // Retry on failure
return
}
// Acknowledge successful processing
if err := msg.Ack(); err != nil {
msg.Logger(ctx).Error("Failed to ack message", zap.Error(err))
}
}
1
| type HandlerFunc func(ctx context.Context, msg Message)
|
1
| func GetHandler(name string) HandlerFunc
|
GetHandler retrieves a registered message handler by name. Returns nil if no handler is registered with the given name. This is typically used by consumers to find the appropriate handler for messages from a specific queue or topic.
HealthCheck interface represents the health check result for a connection
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| type HealthCheck interface {
// Status returns the current health status
Status() HealthStatus
// LastChecked returns when the health check was last performed
LastChecked() time.Time
// Message returns a human-readable status message
Message() string
// ConsumerType returns the type of consumer/transport
ConsumerType() string
// ConnectionURL returns the connection URL if available
ConnectionURL() string
// Details returns additional status details
Details() map[string]string
}
|
HealthCheckBuilder interface for building health check results
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| type HealthCheckBuilder interface {
// WithStatus sets the health status
WithStatus(status HealthStatus) HealthCheckBuilder
// WithMessage sets the status message
WithMessage(message string) HealthCheckBuilder
// WithConsumerType sets the consumer type
WithConsumerType(consumerType string) HealthCheckBuilder
// WithConnectionURL sets the connection URL
WithConnectionURL(url string) HealthCheckBuilder
// WithDetail adds a detail key-value pair
WithDetail(key, value string) HealthCheckBuilder
// WithDetails sets all details at once
WithDetails(details map[string]string) HealthCheckBuilder
// Build creates the final HealthCheck result
Build() HealthCheck
}
|
1
| func NewHealthCheckBuilder() HealthCheckBuilder
|
NewHealthCheckBuilder creates a new health check builder for constructing HealthCheck results. The builder provides a fluent interface for setting all health check properties before building the final immutable result.
The builder initializes with sensible defaults: - Status: HealthStatusClosed - LastChecked: Current time - Details: Empty map
Example usage:
health := NewHealthCheckBuilder().
WithStatus(HealthStatusHealthy).
WithMessage("Connection established").
WithConsumerType("amqp").
WithConnectionURL("amqp://localhost:5672").
WithDetail("queue_depth", "42").
Build()
HealthCheckFunc is a function type that performs a health check. It allows health check logic to be defined as simple functions that can be composed or wrapped with additional behavior. The function should respect the provided context for cancellation and timeout control.
1
| type HealthCheckFunc func(ctx context.Context) (HealthCheck, error)
|
HealthChecker interface for components that support health checks. This interface is implemented by all transports, consumers, and producers to provide standardized health monitoring capabilities.
Health checks are essential for monitoring the status of message queue connections in production environments. They enable load balancers, orchestration systems, and monitoring tools to detect and respond to connection failures.
Example usage:
health, err := consumer.HealthCheck(ctx)
if err != nil {
log.Printf("Health check failed: %v", err)
return
}
if health.Status() != types.HealthStatusHealthy {
log.Printf("Unhealthy: %s - %s", health.Status(), health.Message())
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| type HealthChecker interface {
// HealthCheck performs a health check and returns the current status.
// It should verify the component's connection to the message broker
// and return appropriate status information.
//
// The implementation should:
// - Check if the connection is established and active
// - Verify the ability to perform basic operations if applicable
// - Return detailed status information including any error messages
// - Respect the context timeout to avoid hanging health checks
//
// Returns a HealthCheck result containing the status information,
// or an error if the health check operation itself fails.
HealthCheck(ctx context.Context) (HealthCheck, error)
}
|
HealthStatus represents the health status of a connection
1
| type HealthStatus string
|
1
2
3
4
5
6
7
8
9
10
| const (
// HealthStatusHealthy indicates the connection is healthy
HealthStatusHealthy HealthStatus = "healthy"
// HealthStatusUnhealthy indicates the connection is unhealthy
HealthStatusUnhealthy HealthStatus = "unhealthy"
// HealthStatusConnecting indicates the connection is being established
HealthStatusConnecting HealthStatus = "connecting"
// HealthStatusClosed indicates the connection is closed
HealthStatusClosed HealthStatus = "closed"
)
|
Message represents a message in the message queue system. It provides methods to access message properties, acknowledge processing, and interact with the messaging system. Message implementations are typically immutable once created.
Messages can be acknowledged (Ack) to indicate successful processing or negatively acknowledged (Nack) to indicate processing failure. The exact behavior of these operations depends on the underlying transport and broker configuration.
Example usage:
func handleMessage(ctx context.Context, msg types.Message) {
// Process the message
var event UserEvent
if err := json.Unmarshal(msg.Body(), &event); err != nil {
msg.Logger(ctx).Error("Failed to unmarshal", zap.Error(err))
msg.Nack() // Reject the message
return
}
// Process the event...
// Acknowledge successful processing
if err := msg.Ack(); err != nil {
msg.Logger(ctx).Error("Failed to ack", zap.Error(err))
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
| type Message interface {
// Ack acknowledges successful message processing.
// This tells the broker that the message has been handled
// and can be removed from the queue.
// Returns an error if acknowledgment fails.
Ack() error
// Nack negatively acknowledges the message, indicating processing failure.
// The broker will handle the message according to its configuration
// (requeue, dead letter, discard, etc.).
// Returns an error if the negative acknowledgment fails.
Nack() error
// RetryCount returns the number of times this message has been retried.
// This is useful for implementing retry limits and backoff strategies.
RetryCount() int
// Body returns the message payload as a byte slice.
// The caller should not modify the returned slice.
Body() []byte
// Headers returns the message headers as a map.
// Headers contain metadata about the message.
// The returned map should not be modified.
Headers() map[string]interface{}
// ReplyTo returns the destination for response messages.
// Used in request-reply messaging patterns.
ReplyTo() string
// RoutingKey returns the routing key used for message delivery.
// The meaning varies by transport (AMQP routing key, Kafka topic, etc.).
RoutingKey() string
// Exchange returns the exchange or topic namespace for the message.
// The meaning varies by transport (AMQP exchange, Kafka topic prefix, etc.).
Exchange() string
// ContentType returns the MIME type of the message body.
// Common values: "application/json", "application/xml", "text/plain".
ContentType() string
// ContentEncoding returns the encoding of the message body.
// Examples: "gzip", "deflate", "identity" (no encoding).
ContentEncoding() string
// CorrelationId returns the correlation identifier for tracking
// related messages across systems.
CorrelationId() string
// DeliveryMode returns the message persistence mode.
// Common values: 1 (non-persistent), 2 (persistent).
DeliveryMode() uint8
// Priority returns the message priority (0-9, where 9 is highest).
// Not all transports support message priority.
Priority() uint8
// MessageId returns the unique identifier for this message.
// This can be used for deduplication or tracking.
MessageId() string
// UserId returns the user who created this message.
// May be empty if not set.
UserId() string
// AppId returns the application that created this message.
// Useful for identifying the source in multi-application systems.
AppId() string
// Expiration returns the message expiration time.
// Format varies by transport but often milliseconds or RFC3339.
Expiration() string
// MessageType returns the message type or event name.
// This helps identify the kind of message without parsing the body.
MessageType() string
// Logger returns a logger instance with message context.
// The logger includes relevant message metadata for correlation.
// This enables consistent logging across message processing.
Logger(ctx context.Context) *zap.Logger
// Publisher returns the associated publisher for this message.
// This can be used to publish response messages or forward messages.
// May return nil if no publisher is associated.
Publisher() Publisher
}
|
MessageBuilder provides a fluent interface for constructing Message objects. It follows the builder pattern to allow step-by-step configuration of message properties before creating the final immutable Message instance.
MessageBuilder implementations should be thread-safe if they are intended to be reused across goroutines. Most implementations create a new builder instance for each message, making thread safety a non-issue.
Example usage:
msg := NewMessageBuilder().
WithBody([]byte(`{"event": "user.created"}`)).
WithContentType("application/json").
WithCorrelationId("req-123").
WithRoutingKey("user.events").
WithHeaders(map[string]interface{}{
"source": "user-service",
"version": "1.0",
}).
Build()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
| type MessageBuilder interface {
// WithBody sets the message payload as a byte slice.
// The body contains the actual message content that will be
// processed by message handlers.
WithBody(body []byte) MessageBuilder
// WithHeaders sets custom message headers as key-value pairs.
// Headers can contain metadata about the message that handlers
// can use for routing, filtering, or processing decisions.
WithHeaders(headers map[string]interface{}) MessageBuilder
// WithRoutingKey sets the routing key used for message delivery.
// In AMQP, this determines which queues receive the message.
// Other transports may use this for topic-based routing.
WithRoutingKey(routingKey string) MessageBuilder
// WithContentType sets the MIME type of the message body.
// Common values include "application/json", "application/xml",
// "text/plain", or custom application-specific types.
WithContentType(contentType string) MessageBuilder
// WithContentEncoding sets the encoding of the message body.
// Examples include "gzip", "deflate", or "identity" (no encoding).
// This helps receivers properly decode the message body.
WithContentEncoding(contentEncoding string) MessageBuilder
// WithCorrelationId sets a unique identifier for tracking related messages.
// This is commonly used to match requests with responses or to trace
// message flows through distributed systems.
WithCorrelationId(correlationId string) MessageBuilder
// WithTimestamp sets when the message was created.
// If not set, implementations may use the current time.
WithTimestamp(timestamp time.Time) MessageBuilder
// WithTransport associates a Transport implementation with the message.
// This is typically used internally to enable message acknowledgment
// and other transport-specific operations.
WithTransport(transport Transport) MessageBuilder
// WithPublisher associates a Publisher implementation with the message.
// This enables the message to be published or republished using
// the associated publisher.
WithPublisher(publisher Publisher) MessageBuilder
// WithReplyTo sets the destination where responses should be sent.
// This is used in request-reply messaging patterns to indicate
// where the receiver should send the response message.
WithReplyTo(replyTo string) MessageBuilder
// WithDeliveryMode sets the message persistence mode.
// Common values: 1 (non-persistent), 2 (persistent).
// Persistent messages survive broker restarts.
WithDeliveryMode(mode uint8) MessageBuilder
// WithPriority sets the message priority (0-9, where 9 is highest).
// Higher priority messages may be delivered before lower priority ones,
// depending on broker support and configuration.
WithPriority(priority uint8) MessageBuilder
// WithMessageId sets a unique identifier for this message.
// This should be unique across all messages in the system
// and can be used for deduplication or tracking.
WithMessageId(messageId string) MessageBuilder
// WithUserId sets the user who created this message.
// This can be used for authorization or audit purposes.
WithUserId(userId string) MessageBuilder
// WithAppId sets the application that created this message.
// Useful for identifying the source application in multi-app systems.
WithAppId(appId string) MessageBuilder
// WithExpiration sets the message expiration time.
// Format is typically a string representing milliseconds or an RFC3339 timestamp.
// Expired messages may be discarded by the broker.
WithExpiration(expiration string) MessageBuilder
// WithMessageType sets the message type or event name.
// This helps handlers identify what kind of message they're processing
// without parsing the body.
WithMessageType(msgType string) MessageBuilder
// WithContext associates a context with the message.
// This can be used to propagate request-scoped values,
// deadlines, and cancellation signals.
WithContext(ctx context.Context) MessageBuilder
// Build creates the final Message instance with all configured properties.
// This method should be called after setting all desired properties.
// The returned Message is immutable.
Build() Message
// BuildWithContext creates the final Message instance with the provided context.
// This is equivalent to calling WithContext(ctx).Build() but may be more
// convenient in some cases. The returned Message is immutable.
BuildWithContext(ctx context.Context) Message
}
|
NewConsumerFunc is a factory function that creates a new Consumer instance with the provided configuration. Each message queue implementation provides its own factory function that is registered with the system.
1
| type NewConsumerFunc func(config *viper.Viper) (Consumer, error)
|
1
| func GetConsumer(url string) (NewConsumerFunc, bool)
|
GetConsumer retrieves the registered consumer factory function for the given URL. It matches the URL against registered prefixes and returns the appropriate factory function. Returns false if no matching consumer is found.
This function is used internally by mqutils.NewConsumer to create the appropriate consumer implementation based on the connection URL.
NewProducerFunc is a factory function that creates a new Producer instance with the provided configuration. Each message queue implementation provides its own factory function that is registered with the system.
1
| type NewProducerFunc func(config *viper.Viper) (Producer, error)
|
1
| func GetProducer(url string) (NewProducerFunc, bool)
|
GetProducer retrieves the registered producer factory function for the given URL. It matches the URL against registered prefixes and returns the appropriate factory function. Returns false if no matching producer is found.
This function is used internally by mqutils.NewProducer to create the appropriate producer implementation based on the connection URL.
Producer defines the interface for message producers that require explicit initialization before they can publish messages. It combines health checking, publishing capabilities, and lifecycle management. Some message queue systems require producers to establish connections or perform setup before publishing.
Producer embeds both HealthChecker and Publisher interfaces to provide comprehensive producer functionality with health monitoring.
1
2
3
4
5
6
7
8
9
10
| type Producer interface {
HealthChecker
Publisher
// Start initializes the producer and establishes any necessary connections.
// This method must be called before attempting to publish messages.
// The context can be used to control the startup timeout.
// Returns an error if initialization fails.
Start(ctx context.Context) error
}
|
Publisher defines the interface for publishing messages to a message queue. It provides two methods for publishing: a simple method that accepts individual parameters, and a message-based method that accepts a pre-built Message object.
Implementations must be thread-safe and support concurrent publishing. Publishers should handle connection failures gracefully and may implement retry logic internally.
Example usage:
// Simple publish
err := publisher.Publish(ctx, "correlation-123", "events", "user.created",
"application/json", []byte(`{"id": 123, "name": "John"}`))
// Message-based publish
msg := NewMessageBuilder().
WithBody([]byte(`{"id": 123, "name": "John"}`)).
WithContentType("application/json").
WithCorrelationId("correlation-123").
Build()
err := publisher.PublishMsg(ctx, "events", "user.created", msg)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| type Publisher interface {
// Publish sends a message with the specified parameters to the message queue.
// This is a convenience method for simple publishing scenarios.
//
// Parameters:
// - ctx: Context for cancellation and timeout control
// - correlationID: Unique identifier for tracking related messages
// - exchange: Target exchange or topic namespace (usage varies by transport)
// - topic: Routing key or topic name for message delivery
// - contentType: MIME type of the message body (e.g., "application/json")
// - body: The message payload as a byte slice
//
// Returns an error if the message cannot be published.
Publish(ctx context.Context, correlationID, exchange, topic, contentType string, body []byte) error
// PublishMsg sends a pre-built Message to the specified exchange and topic.
// This method provides more control over message properties and is preferred
// for complex publishing scenarios where additional message attributes
// (headers, priority, TTL, etc.) need to be set.
//
// Parameters:
// - ctx: Context for cancellation and timeout control
// - exchange: Target exchange or topic namespace (usage varies by transport)
// - topic: Routing key or topic name for message delivery
// - msg: The message object containing all message properties and payload
//
// Returns an error if the message cannot be published.
PublishMsg(ctx context.Context, exchange, topic string, msg Message) error
}
|
Transport defines the low-level interface for message broker operations. It provides the foundational methods for connecting to, interacting with, and managing message queue systems. Transport implementations handle the protocol-specific details of each messaging system while exposing a unified API.
Transport embeds both HealthChecker and Publisher interfaces to provide health monitoring and message publishing capabilities. All Transport implementations must be thread-safe and support concurrent operations.
Example usage:
transport := amqp.NewTransport()
err := transport.Dial(ctx, "amqp://localhost:5672", false)
if err != nil {
log.Fatal(err)
}
defer transport.Close()
messages, err := transport.Messages(ctx, "my-queue")
if err != nil {
log.Fatal(err)
}
for msg := range messages {
// Process message
if err := processMessage(msg); err != nil {
msg.Nack()
} else {
msg.Ack()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
| type Transport interface {
HealthChecker
Publisher // Embed Publisher interface to formalize the publishing contract
// Dial establishes a connection to the message broker using the provided URL.
// The url parameter should be in the format appropriate for the transport type
// (e.g., "amqp://user:pass@host:port/vhost" for AMQP).
// If skipVerify is true, TLS certificate verification is disabled (use with caution).
// Returns an error if the connection cannot be established.
Dial(ctx context.Context, url string, skipVerify bool) error
// Close gracefully shuts down the transport connection, ensuring all
// pending operations are completed or canceled. It should be called
// when the transport is no longer needed to free resources.
// Returns an error if the shutdown process encounters issues.
Close() error
// Ack acknowledges successful processing of a message. This tells the
// message broker that the message has been handled and can be removed
// from the queue. The exact behavior depends on the underlying transport.
// Returns an error if the acknowledgment fails.
Ack(Message) error
// Nack negatively acknowledges a message, indicating processing failure.
// The message broker will handle the message according to its configuration
// (requeue, move to dead letter queue, etc.). The exact behavior depends
// on the underlying transport and broker configuration.
// Returns an error if the negative acknowledgment fails.
Nack(Message) error
// DeclareExchange declares an exchange with the given name and type.
// This is primarily used by AMQP-based transports. Other transports may
// implement this as a no-op or map it to equivalent concepts.
// Common exchange types include "direct", "topic", "fanout", and "headers".
// Returns an error if the exchange cannot be declared.
DeclareExchange(name string, exchangeType string) error
// BindQueue binds a queue to an exchange with the specified routing keys
// and arguments. The autoDelete parameter indicates if the binding should
// be automatically deleted when no longer in use. This is primarily used
// by AMQP-based transports; other transports may adapt or ignore this.
// Returns an error if the binding cannot be created.
BindQueue(queue string, exchange string, routingKeys []string, args map[string]interface{}, autoDelete bool) error
// Messages returns a channel that delivers messages from the specified queue.
// The channel will close when the context is canceled or an error occurs.
// Messages should be acknowledged (Ack) or rejected (Nack) after processing.
// This method uses the default message channel buffer size.
// Returns an error if the message channel cannot be established.
Messages(ctx context.Context, queue string) (<-chan Message, error)
// MessagesWithBuffer returns a channel that delivers messages from the specified
// queue with a custom buffer size. A larger buffer can improve throughput but
// uses more memory. A buffer size of 0 creates an unbuffered channel.
// The channel will close when the context is canceled or an error occurs.
// Returns an error if the message channel cannot be established.
MessagesWithBuffer(ctx context.Context, queue string, bufferSize int) (<-chan Message, error)
// ChannelClosed returns a channel that receives an error when the underlying
// transport connection is closed unexpectedly. This allows consumers to
// detect connection failures and implement reconnection logic.
// The channel is closed when the transport is closed normally via Close().
ChannelClosed() chan error
}
|
Generated by gomarkdoc