Getting Started

Get up and running with mqutils quickly

Getting Started with mqutils

mqutils is a production-ready Go message queue abstraction library that provides unified interfaces for 6 major messaging systems. This guide will help you get up and running quickly.

Installation

Add mqutils to your Go project:

1
go get gitlab.com/digitalxero/mqutils

Basic Usage

Creating a Consumer

The simplest way to start using mqutils is with the factory function:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
    "context"
    "log"
    "gitlab.com/digitalxero/mqutils"
    "gitlab.com/digitalxero/mqutils/types"
)

func main() {
    // Register a message handler globally
    types.RegisterHandler("process", func(ctx context.Context, msg types.Message) error {
        log.Printf("Received: %s", string(msg.Body()))
        return nil
    })

    // Create a consumer using URL scheme auto-detection
    consumer, err := mqutils.NewConsumer("amqp://localhost:5672/myqueue?handler=process")
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Publisher().HealthCheck(context.Background()) // Check health on exit

    // Start consuming messages
    ctx := context.Background()
    if err := consumer.Run(ctx); err != nil {
        log.Fatal(err)
    }
}

URL Schemes

mqutils automatically detects the message queue system from the URL scheme:

SystemURL SchemesExample
AMQP/RabbitMQamqp://, amqps://amqp://localhost:5672/myqueue
Kafkakafka://, kafkas://kafka://localhost:9092/mytopic
NATSnats://, natss://, jetstream://nats://localhost:4222/mysubject
AWS SQSsqs://, sqss://sqs://region/queue-name
GCP Pub/Subpubsub://pubsub://project-id/subscription-name
Redisredis://, rediss://, redisstream://redis://localhost:6379/mychannel

Core Concepts

Messages

All messages in mqutils implement the types.Message interface:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
type Message interface {
    ID() string                    // Unique message identifier
    CorrelationID() string         // For request-response patterns
    ReplyTo() string              // Reply destination
    Exchange() string             // AMQP exchange (empty for other systems)
    RoutingKey() string           // Message routing key
    Headers() map[string]interface{} // Message headers
    Body() []byte                 // Message payload
    Publisher() Publisher         // Access to publisher for replies
}

Message Handlers

Register handlers to process incoming messages:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Simple handler
types.RegisterHandler("process", func(ctx context.Context, msg types.Message) error {
    // Process the message
    log.Printf("Processing: %s", string(msg.Body()))
    return nil
})

// Handler with reply
types.RegisterHandler("request", func(ctx context.Context, msg types.Message) error {
    // Process request and send reply
    response := []byte("processed")
    
    if msg.ReplyTo() != "" {
        return msg.Publisher().Publish(ctx, 
            msg.CorrelationId(), // correlation ID
            "",                  // exchange (empty for default)
            msg.ReplyTo(),       // routing key (reply destination)
            "text/plain",        // content type
            response)            // body
    }
    
    return nil
})

Batch Processing

Configure batch processing for high-throughput scenarios:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// Register a batch handler globally
types.RegisterBatchHandler("bulk-process", func(ctx context.Context, msgs []types.Message) error {
    log.Printf("Processing batch of %d messages", len(msgs))
    
    for _, msg := range msgs {
        // Process each message in the batch
        log.Printf("Message: %s", string(msg.Body()))
    }
    
    return nil
})

// Configure consumer with batch processing
consumer, err := mqutils.NewConsumer("amqp://localhost:5672/myqueue?handler=bulk-process&batch_size=50&batch_timeout=1000")

Health Monitoring

All transports support health monitoring:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// Check health status
health, err := consumer.HealthCheck(context.Background())
if err != nil {
    log.Printf("Health check failed: %v", err)
    return
}

if health.Status() == types.HealthStatusHealthy {
    log.Println("Consumer is healthy")
} else {
    log.Printf("Consumer health: %s - %s", health.Status(), health.Message())
}

Publishing Messages

Get the publisher from your consumer or transport:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Simple publish
publisher := consumer.Publisher()
err := publisher.Publish(
    context.Background(),
    "req-123",              // correlation ID
    "",                     // exchange (empty for default)
    "destination",          // routing key/destination
    "text/plain",           // content type
    []byte("Hello, World!") // body
)

// Publish with message builder
builder := amqp.NewMessageBuilder() // Use specific package builder
message := builder.
    WithCorrelationId("req-123").
    WithHeaders(map[string]interface{}{
        "content-type": "application/json",
        "priority": 1,
    }).
    WithBody([]byte(`{"event": "user.created", "id": 123}`)).
    Build()

err = publisher.PublishMsg(context.Background(), "", "events", message)

Configuration Examples

AMQP/RabbitMQ

1
2
3
4
5
// Basic connection
consumer, err := mqutils.NewConsumer("amqp://user:pass@localhost:5672/queue")

// With exchange and routing key
consumer, err := mqutils.NewConsumer("amqp://localhost:5672/queue?exchange=events&routing_key=user.created")

Kafka

1
2
3
4
5
// Basic connection
consumer, err := mqutils.NewConsumer("kafka://localhost:9092/mytopic")

// With consumer group
consumer, err := mqutils.NewConsumer("kafka://localhost:9092/mytopic?group_id=my-service")

AWS SQS

1
2
3
4
5
// Standard queue
consumer, err := mqutils.NewConsumer("sqs://us-east-1/my-queue")

// FIFO queue
consumer, err := mqutils.NewConsumer("sqs://us-east-1/my-queue.fifo")

Error Handling

mqutils provides structured error handling:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
types.RegisterHandler("process", func(ctx context.Context, msg types.Message) error {
    // Your processing logic here
    if err := processMessage(ctx, msg); err != nil {
        // Return error to trigger message retry/dead letter
        return fmt.Errorf("processing failed: %w", err)
    }
    
    return nil // Message will be acknowledged automatically
})

// For manual acknowledgment control
types.RegisterHandler("manual", func(ctx context.Context, msg types.Message) error {
    if err := processMessage(ctx, msg); err != nil {
        // Explicitly nack the message
        return types.NacknowledgeMessage(ctx, msg)
    }
    
    // Explicitly ack the message  
    return types.AcknowledgeMessage(ctx, msg)
})

Graceful Shutdown

Always implement graceful shutdown:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func main() {
    // Register handler first
    types.RegisterHandler("process", func(ctx context.Context, msg types.Message) error {
        log.Printf("Processing: %s", string(msg.Body()))
        return nil
    })

    // Enable graceful shutdown in configuration
    consumer, err := mqutils.NewConsumer("amqp://localhost:5672/myqueue?handler=process&enable_graceful_shutdown=true&graceful_shutdown_timeout=30")
    if err != nil {
        log.Fatal(err)
    }

    // Handle shutdown signals
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    go func() {
        sigChan := make(chan os.Signal, 1)
        signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
        <-sigChan
        log.Println("Shutdown signal received, stopping consumer...")
        cancel()
    }()

    // Run will return when context is cancelled
    if err := consumer.Run(ctx); err != nil {
        log.Printf("Consumer error: %v", err)
    }
}

Next Steps