API Reference

Complete API documentation for mqutils

API Reference

This page provides detailed documentation for all public interfaces and types in mqutils.

Factory Functions

mqutils.NewConsumer

1
func NewConsumer(url string) (types.Consumer, error)

Creates a new consumer based on the URL scheme. Automatically detects the message queue system and returns the appropriate implementation.

Parameters:

  • url: Connection URL with scheme (e.g., amqp://localhost:5672/queue)

Returns:

  • types.Consumer: Consumer implementation for the detected system
  • error: Configuration or connection error

Core Interfaces

types.Consumer

Main interface for consuming messages from message queues.

1
2
3
4
5
6
7
8
type Consumer interface {
    RegisterHandler(name string, handler HandlerFunc) error
    RegisterBatchHandler(name string, handler BatchHandlerFunc) error
    Run(ctx context.Context) error
    Close() error
    Publisher() Publisher
    HealthChecker
}

Methods:

RegisterHandler

1
RegisterHandler(name string, handler HandlerFunc) error

Registers a handler function for processing individual messages.

RegisterBatchHandler

1
RegisterBatchHandler(name string, handler BatchHandlerFunc) error

Registers a handler function for processing message batches.

Run

1
Run(ctx context.Context) error

Starts the consumer. Blocks until context is cancelled or an error occurs.

Close

1
Close() error

Closes the consumer and releases resources.

Publisher

1
Publisher() Publisher

Returns the associated publisher for sending messages.

types.Publisher

Interface for publishing messages to message queues.

1
2
3
4
5
6
type Publisher interface {
    Publish(destination string, body []byte) error
    PublishMessage(destination string, msg Message) error
    NewMessageBuilder() MessageBuilder
    Close() error
}

Methods:

Publish

1
Publish(destination string, body []byte) error

Publishes a simple message with just a body to the specified destination.

PublishMessage

1
PublishMessage(destination string, msg Message) error

Publishes a complete message with headers, correlation ID, etc.

NewMessageBuilder

1
NewMessageBuilder() MessageBuilder

Creates a new message builder for constructing messages.

types.Message

Interface representing a message in the system.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
type Message interface {
    ID() string
    CorrelationID() string
    ReplyTo() string
    Exchange() string
    RoutingKey() string
    Headers() map[string]interface{}
    Body() []byte
    Publisher() Publisher
}

Methods:

ID

1
ID() string

Returns the unique message identifier.

CorrelationID

1
CorrelationID() string

Returns the correlation ID for request-response patterns.

ReplyTo

1
ReplyTo() string

Returns the reply destination for responses.

Exchange

1
Exchange() string

Returns the AMQP exchange name (empty for other systems).

RoutingKey

1
RoutingKey() string

Returns the message routing key.

Headers

1
Headers() map[string]interface{}

Returns the message headers as a map.

Body

1
Body() []byte

Returns the message payload.

Publisher

1
Publisher() Publisher

Returns the associated publisher for sending replies.

types.MessageBuilder

Builder interface for constructing messages.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
type MessageBuilder interface {
    SetID(id string) MessageBuilder
    SetCorrelationID(correlationID string) MessageBuilder
    SetReplyTo(replyTo string) MessageBuilder
    SetExchange(exchange string) MessageBuilder
    SetRoutingKey(routingKey string) MessageBuilder
    SetHeaders(headers map[string]interface{}) MessageBuilder
    SetHeader(key string, value interface{}) MessageBuilder
    SetBody(body []byte) MessageBuilder
    Build() Message
}

Methods:

All setter methods return the builder instance for method chaining. Call Build() to create the final message.

types.Transport

Low-level interface for message queue operations.

1
2
3
4
5
6
7
8
type Transport interface {
    Publisher
    HealthChecker
    Connect() error
    Close() error
    Consume(ctx context.Context, handler HandlerFunc) error
    ConsumeBatch(ctx context.Context, handler BatchHandlerFunc) error
}

Extends Publisher and HealthChecker interfaces with connection and consumption methods.

types.HealthChecker

Interface for health monitoring.

1
2
3
type HealthChecker interface {
    HealthCheck() HealthCheck
}

HealthCheck

1
HealthCheck() HealthCheck

Returns the current health status of the component.

types.HealthCheck

Interface representing health check results.

1
2
3
4
5
6
type HealthCheck interface {
    Status() HealthStatus
    Message() string
    Timestamp() time.Time
    Details() map[string]interface{}
}

Methods:

Status

1
Status() HealthStatus

Returns the health status (Healthy, Unhealthy, Connecting, Closed).

Message

1
Message() string

Returns a human-readable status message.

Timestamp

1
Timestamp() time.Time

Returns when the health check was performed.

Details

1
Details() map[string]interface{}

Returns additional health check details.

Handler Functions

types.HandlerFunc

1
type HandlerFunc func(Message) error

Function signature for processing individual messages. Return nil for successful processing or an error to trigger retry/dead letter behavior.

types.BatchHandlerFunc

1
type BatchHandlerFunc func([]Message) error

Function signature for processing message batches. Return nil for successful processing of the entire batch.

Health Status Types

types.HealthStatus

1
2
3
4
5
6
7
8
type HealthStatus string

const (
    HealthStatusHealthy    HealthStatus = "healthy"
    HealthStatusUnhealthy  HealthStatus = "unhealthy"
    HealthStatusConnecting HealthStatus = "connecting"
    HealthStatusClosed     HealthStatus = "closed"
)

Enumeration of possible health states.

Error Types

types.ConfigurationError

1
2
3
4
5
6
7
type ConfigurationError struct {
    Field   string
    Value   interface{}
    Message string
}

func (e *ConfigurationError) Error() string

Represents configuration validation errors.

types.ValidationError

1
2
3
4
5
6
type ValidationError struct {
    Errors []ConfigurationError
}

func (e *ValidationError) Error() string
func (e *ValidationError) Add(field string, value interface{}, message string)

Contains multiple configuration errors for comprehensive validation reporting.

Package-Specific Types

AMQP Package

amqp.Consumer

Implementation of types.Consumer for AMQP/RabbitMQ.

amqp.Transport

Implementation of types.Transport for AMQP/RabbitMQ with channel pooling.

amqp.Message

Implementation of types.Message for AMQP messages.

Kafka Package

kafka.Consumer

Implementation of types.Consumer for Apache Kafka.

kafka.Transport

Implementation of types.Transport for Kafka with consumer groups.

kafka.Message

Implementation of types.Message for Kafka messages.

kafka.Producer

Kafka-specific producer with additional configuration options.

NATS Package

nats.Consumer

Implementation of types.Consumer for NATS Core and JetStream.

nats.Transport

Implementation of types.Transport for NATS.

nats.Message

Implementation of types.Message for NATS messages.

nats.Producer

NATS-specific producer supporting both Core and JetStream.

AWS Package

aws.Consumer

Implementation of types.Consumer for AWS SQS.

aws.Transport

Implementation of types.Transport for SQS with visibility timeout handling.

aws.Message

Implementation of types.Message for SQS messages.

GCP Package

gcp.Consumer

Implementation of types.Consumer for GCP Pub/Sub.

gcp.Transport

Implementation of types.Transport for Pub/Sub with acknowledgment handling.

gcp.Message

Implementation of types.Message for Pub/Sub messages.

Redis Package

redis.Consumer

Implementation of types.Consumer for Redis Pub/Sub and Streams.

redis.Transport

Implementation of types.Transport for Redis.

redis.Message

Implementation of types.Message for Redis messages.

Configuration Examples

URL Parameters

Different systems support various URL parameters:

AMQP

  • exchange: Exchange name
  • routing_key: Routing key for messages
  • durable: Queue durability (true/false)
  • auto_delete: Auto-delete queue (true/false)

Kafka

  • group_id: Consumer group ID
  • partition: Specific partition (optional)
  • offset: Starting offset (earliest/latest)

NATS

  • max_msgs: Maximum messages in subscription
  • durable: Durable subscription name (JetStream)

SQS

  • visibility_timeout: Message visibility timeout
  • max_messages: Maximum messages per receive

Pub/Sub

  • max_outstanding_messages: Flow control limit
  • subscription_id: Subscription identifier

Redis

  • stream: Use Redis Streams instead of Pub/Sub
  • group: Consumer group for streams
  • consumer: Consumer name within group

Performance Tuning

Buffer Configuration

Configure message channel buffers for optimal performance:

1
2
3
4
// Example for AMQP transport
transport := &amqp.Transport{
    MessageBuffer: 1000, // Channel buffer size (0-10,000)
}

Batch Processing

Configure batch processing parameters:

1
2
3
4
5
// Example batch configuration
consumer.RegisterBatchHandler("bulk", func(msgs []Message) error {
    // Process up to configured batch size
    return processBatch(msgs)
})

Batch size and timeout are transport-specific and configured via URL parameters or transport options.

Best Practices

  1. Always use context for cancellation in Run() methods
  2. Implement proper error handling in handler functions
  3. Use health checks to monitor connection status
  4. Configure appropriate timeouts for your use case
  5. Handle graceful shutdown with defer Close()
  6. Use correlation IDs for request-response patterns
  7. Set appropriate batch sizes for high-throughput scenarios