Getting Started with mqutils
mqutils is a production-ready Go message queue abstraction library that provides unified interfaces for 6 major messaging systems. This guide will help you get up and running quickly.
Installation
Add mqutils to your Go project:
1
| go get gitlab.com/digitalxero/mqutils
|
Basic Usage
Creating a Consumer
The simplest way to start using mqutils is with the factory function:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| package main
import (
"context"
"log"
"gitlab.com/digitalxero/mqutils"
"gitlab.com/digitalxero/mqutils/types"
)
func main() {
// Create a consumer using URL scheme auto-detection
consumer, err := mqutils.NewConsumer("amqp://localhost:5672/myqueue")
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
// Register a message handler
consumer.RegisterHandler("process", func(msg types.Message) error {
log.Printf("Received: %s", string(msg.Body()))
return nil
})
// Start consuming messages
ctx := context.Background()
if err := consumer.Run(ctx); err != nil {
log.Fatal(err)
}
}
|
URL Schemes
mqutils automatically detects the message queue system from the URL scheme:
System | URL Schemes | Example |
---|
AMQP/RabbitMQ | amqp:// , amqps:// | amqp://localhost:5672/myqueue |
Kafka | kafka:// , kafkas:// | kafka://localhost:9092/mytopic |
NATS | nats:// , natss:// , jetstream:// | nats://localhost:4222/mysubject |
AWS SQS | sqs:// , sqss:// | sqs://region/queue-name |
GCP Pub/Sub | pubsub:// | pubsub://project-id/subscription-name |
Redis | redis:// , rediss:// , redisstream:// | redis://localhost:6379/mychannel |
Core Concepts
Messages
All messages in mqutils implement the types.Message
interface:
1
2
3
4
5
6
7
8
9
10
| type Message interface {
ID() string // Unique message identifier
CorrelationID() string // For request-response patterns
ReplyTo() string // Reply destination
Exchange() string // AMQP exchange (empty for other systems)
RoutingKey() string // Message routing key
Headers() map[string]interface{} // Message headers
Body() []byte // Message payload
Publisher() Publisher // Access to publisher for replies
}
|
Message Handlers
Register handlers to process incoming messages:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| // Simple handler
consumer.RegisterHandler("process", func(msg types.Message) error {
// Process the message
log.Printf("Processing: %s", string(msg.Body()))
return nil
})
// Handler with reply
consumer.RegisterHandler("request", func(msg types.Message) error {
// Process request and send reply
response := []byte("processed")
if msg.ReplyTo() != "" {
builder := msg.Publisher().NewMessageBuilder()
reply := builder.
SetCorrelationID(msg.CorrelationID()).
SetBody(response).
Build()
return msg.Publisher().PublishMessage(msg.ReplyTo(), reply)
}
return nil
})
|
Batch Processing
Configure batch processing for high-throughput scenarios:
1
2
3
4
5
6
7
8
9
10
11
| // Register a batch handler
consumer.RegisterBatchHandler("bulk-process", func(msgs []types.Message) error {
log.Printf("Processing batch of %d messages", len(msgs))
for _, msg := range msgs {
// Process each message in the batch
log.Printf("Message: %s", string(msg.Body()))
}
return nil
})
|
Health Monitoring
All transports support health monitoring:
1
2
3
4
5
6
7
| // Check health status
health := consumer.HealthCheck()
if health.Status() == types.HealthStatusHealthy {
log.Println("Consumer is healthy")
} else {
log.Printf("Consumer health: %s - %s", health.Status(), health.Message())
}
|
Publishing Messages
Get the publisher from your consumer or transport:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| // Simple publish
publisher := consumer.Publisher()
err := publisher.Publish("destination", []byte("Hello, World!"))
// Publish with message builder
builder := publisher.NewMessageBuilder()
message := builder.
SetCorrelationID("req-123").
SetHeaders(map[string]interface{}{
"content-type": "application/json",
"priority": 1,
}).
SetBody([]byte(`{"event": "user.created", "id": 123}`)).
Build()
err = publisher.PublishMessage("events", message)
|
Configuration Examples
AMQP/RabbitMQ
1
2
3
4
5
| // Basic connection
consumer, err := mqutils.NewConsumer("amqp://user:pass@localhost:5672/queue")
// With exchange and routing key
consumer, err := mqutils.NewConsumer("amqp://localhost:5672/queue?exchange=events&routing_key=user.created")
|
Kafka
1
2
3
4
5
| // Basic connection
consumer, err := mqutils.NewConsumer("kafka://localhost:9092/mytopic")
// With consumer group
consumer, err := mqutils.NewConsumer("kafka://localhost:9092/mytopic?group_id=my-service")
|
AWS SQS
1
2
3
4
5
| // Standard queue
consumer, err := mqutils.NewConsumer("sqs://us-east-1/my-queue")
// FIFO queue
consumer, err := mqutils.NewConsumer("sqs://us-east-1/my-queue.fifo")
|
Error Handling
mqutils provides structured error handling:
1
2
3
4
5
6
7
8
9
| consumer.RegisterHandler("process", func(msg types.Message) error {
// Your processing logic here
if err := processMessage(msg); err != nil {
// Return error to trigger message retry/dead letter
return fmt.Errorf("processing failed: %w", err)
}
return nil // Message will be acknowledged
})
|
Graceful Shutdown
Always implement graceful shutdown:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| func main() {
consumer, err := mqutils.NewConsumer("amqp://localhost:5672/myqueue")
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
// Handle shutdown signals
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
cancel()
}()
// Run will return when context is cancelled
if err := consumer.Run(ctx); err != nil {
log.Printf("Consumer error: %v", err)
}
}
|
Next Steps