Getting Started

Get up and running with mqutils quickly

Getting Started with mqutils

mqutils is a production-ready Go message queue abstraction library that provides unified interfaces for 6 major messaging systems. This guide will help you get up and running quickly.

Installation

Add mqutils to your Go project:

1
go get gitlab.com/digitalxero/mqutils

Basic Usage

Creating a Consumer

The simplest way to start using mqutils is with the factory function:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
    "context"
    "log"
    "gitlab.com/digitalxero/mqutils"
    "gitlab.com/digitalxero/mqutils/types"
)

func main() {
    // Create a consumer using URL scheme auto-detection
    consumer, err := mqutils.NewConsumer("amqp://localhost:5672/myqueue")
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    // Register a message handler
    consumer.RegisterHandler("process", func(msg types.Message) error {
        log.Printf("Received: %s", string(msg.Body()))
        return nil
    })

    // Start consuming messages
    ctx := context.Background()
    if err := consumer.Run(ctx); err != nil {
        log.Fatal(err)
    }
}

URL Schemes

mqutils automatically detects the message queue system from the URL scheme:

SystemURL SchemesExample
AMQP/RabbitMQamqp://, amqps://amqp://localhost:5672/myqueue
Kafkakafka://, kafkas://kafka://localhost:9092/mytopic
NATSnats://, natss://, jetstream://nats://localhost:4222/mysubject
AWS SQSsqs://, sqss://sqs://region/queue-name
GCP Pub/Subpubsub://pubsub://project-id/subscription-name
Redisredis://, rediss://, redisstream://redis://localhost:6379/mychannel

Core Concepts

Messages

All messages in mqutils implement the types.Message interface:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
type Message interface {
    ID() string                    // Unique message identifier
    CorrelationID() string         // For request-response patterns
    ReplyTo() string              // Reply destination
    Exchange() string             // AMQP exchange (empty for other systems)
    RoutingKey() string           // Message routing key
    Headers() map[string]interface{} // Message headers
    Body() []byte                 // Message payload
    Publisher() Publisher         // Access to publisher for replies
}

Message Handlers

Register handlers to process incoming messages:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Simple handler
consumer.RegisterHandler("process", func(msg types.Message) error {
    // Process the message
    log.Printf("Processing: %s", string(msg.Body()))
    return nil
})

// Handler with reply
consumer.RegisterHandler("request", func(msg types.Message) error {
    // Process request and send reply
    response := []byte("processed")
    
    if msg.ReplyTo() != "" {
        builder := msg.Publisher().NewMessageBuilder()
        reply := builder.
            SetCorrelationID(msg.CorrelationID()).
            SetBody(response).
            Build()
        
        return msg.Publisher().PublishMessage(msg.ReplyTo(), reply)
    }
    
    return nil
})

Batch Processing

Configure batch processing for high-throughput scenarios:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// Register a batch handler
consumer.RegisterBatchHandler("bulk-process", func(msgs []types.Message) error {
    log.Printf("Processing batch of %d messages", len(msgs))
    
    for _, msg := range msgs {
        // Process each message in the batch
        log.Printf("Message: %s", string(msg.Body()))
    }
    
    return nil
})

Health Monitoring

All transports support health monitoring:

1
2
3
4
5
6
7
// Check health status
health := consumer.HealthCheck()
if health.Status() == types.HealthStatusHealthy {
    log.Println("Consumer is healthy")
} else {
    log.Printf("Consumer health: %s - %s", health.Status(), health.Message())
}

Publishing Messages

Get the publisher from your consumer or transport:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// Simple publish
publisher := consumer.Publisher()
err := publisher.Publish("destination", []byte("Hello, World!"))

// Publish with message builder
builder := publisher.NewMessageBuilder()
message := builder.
    SetCorrelationID("req-123").
    SetHeaders(map[string]interface{}{
        "content-type": "application/json",
        "priority": 1,
    }).
    SetBody([]byte(`{"event": "user.created", "id": 123}`)).
    Build()

err = publisher.PublishMessage("events", message)

Configuration Examples

AMQP/RabbitMQ

1
2
3
4
5
// Basic connection
consumer, err := mqutils.NewConsumer("amqp://user:pass@localhost:5672/queue")

// With exchange and routing key
consumer, err := mqutils.NewConsumer("amqp://localhost:5672/queue?exchange=events&routing_key=user.created")

Kafka

1
2
3
4
5
// Basic connection
consumer, err := mqutils.NewConsumer("kafka://localhost:9092/mytopic")

// With consumer group
consumer, err := mqutils.NewConsumer("kafka://localhost:9092/mytopic?group_id=my-service")

AWS SQS

1
2
3
4
5
// Standard queue
consumer, err := mqutils.NewConsumer("sqs://us-east-1/my-queue")

// FIFO queue
consumer, err := mqutils.NewConsumer("sqs://us-east-1/my-queue.fifo")

Error Handling

mqutils provides structured error handling:

1
2
3
4
5
6
7
8
9
consumer.RegisterHandler("process", func(msg types.Message) error {
    // Your processing logic here
    if err := processMessage(msg); err != nil {
        // Return error to trigger message retry/dead letter
        return fmt.Errorf("processing failed: %w", err)
    }
    
    return nil // Message will be acknowledged
})

Graceful Shutdown

Always implement graceful shutdown:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func main() {
    consumer, err := mqutils.NewConsumer("amqp://localhost:5672/myqueue")
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    // Handle shutdown signals
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    go func() {
        sigChan := make(chan os.Signal, 1)
        signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
        <-sigChan
        cancel()
    }()

    // Run will return when context is cancelled
    if err := consumer.Run(ctx); err != nil {
        log.Printf("Consumer error: %v", err)
    }
}

Next Steps