Getting Started with mqutils
mqutils is a production-ready Go message queue abstraction library that provides unified interfaces for 6 major messaging systems. This guide will help you get up and running quickly.
Installation
Add mqutils to your Go project:
1
| go get gitlab.com/digitalxero/mqutils
|
Basic Usage
Creating a Consumer
The simplest way to start using mqutils is with the factory function:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| package main
import (
"context"
"log"
"gitlab.com/digitalxero/mqutils"
"gitlab.com/digitalxero/mqutils/types"
)
func main() {
// Register a message handler globally
types.RegisterHandler("process", func(ctx context.Context, msg types.Message) error {
log.Printf("Received: %s", string(msg.Body()))
return nil
})
// Create a consumer using URL scheme auto-detection
consumer, err := mqutils.NewConsumer("amqp://localhost:5672/myqueue?handler=process")
if err != nil {
log.Fatal(err)
}
defer consumer.Publisher().HealthCheck(context.Background()) // Check health on exit
// Start consuming messages
ctx := context.Background()
if err := consumer.Run(ctx); err != nil {
log.Fatal(err)
}
}
|
URL Schemes
mqutils automatically detects the message queue system from the URL scheme:
System | URL Schemes | Example |
---|
AMQP/RabbitMQ | amqp:// , amqps:// | amqp://localhost:5672/myqueue |
Kafka | kafka:// , kafkas:// | kafka://localhost:9092/mytopic |
NATS | nats:// , natss:// , jetstream:// | nats://localhost:4222/mysubject |
AWS SQS | sqs:// , sqss:// | sqs://region/queue-name |
GCP Pub/Sub | pubsub:// | pubsub://project-id/subscription-name |
Redis | redis:// , rediss:// , redisstream:// | redis://localhost:6379/mychannel |
Core Concepts
Messages
All messages in mqutils implement the types.Message
interface:
1
2
3
4
5
6
7
8
9
10
| type Message interface {
ID() string // Unique message identifier
CorrelationID() string // For request-response patterns
ReplyTo() string // Reply destination
Exchange() string // AMQP exchange (empty for other systems)
RoutingKey() string // Message routing key
Headers() map[string]interface{} // Message headers
Body() []byte // Message payload
Publisher() Publisher // Access to publisher for replies
}
|
Message Handlers
Register handlers to process incoming messages:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| // Simple handler
types.RegisterHandler("process", func(ctx context.Context, msg types.Message) error {
// Process the message
log.Printf("Processing: %s", string(msg.Body()))
return nil
})
// Handler with reply
types.RegisterHandler("request", func(ctx context.Context, msg types.Message) error {
// Process request and send reply
response := []byte("processed")
if msg.ReplyTo() != "" {
return msg.Publisher().Publish(ctx,
msg.CorrelationId(), // correlation ID
"", // exchange (empty for default)
msg.ReplyTo(), // routing key (reply destination)
"text/plain", // content type
response) // body
}
return nil
})
|
Batch Processing
Configure batch processing for high-throughput scenarios:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| // Register a batch handler globally
types.RegisterBatchHandler("bulk-process", func(ctx context.Context, msgs []types.Message) error {
log.Printf("Processing batch of %d messages", len(msgs))
for _, msg := range msgs {
// Process each message in the batch
log.Printf("Message: %s", string(msg.Body()))
}
return nil
})
// Configure consumer with batch processing
consumer, err := mqutils.NewConsumer("amqp://localhost:5672/myqueue?handler=bulk-process&batch_size=50&batch_timeout=1000")
|
Health Monitoring
All transports support health monitoring:
1
2
3
4
5
6
7
8
9
10
11
12
| // Check health status
health, err := consumer.HealthCheck(context.Background())
if err != nil {
log.Printf("Health check failed: %v", err)
return
}
if health.Status() == types.HealthStatusHealthy {
log.Println("Consumer is healthy")
} else {
log.Printf("Consumer health: %s - %s", health.Status(), health.Message())
}
|
Publishing Messages
Get the publisher from your consumer or transport:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| // Simple publish
publisher := consumer.Publisher()
err := publisher.Publish(
context.Background(),
"req-123", // correlation ID
"", // exchange (empty for default)
"destination", // routing key/destination
"text/plain", // content type
[]byte("Hello, World!") // body
)
// Publish with message builder
builder := amqp.NewMessageBuilder() // Use specific package builder
message := builder.
WithCorrelationId("req-123").
WithHeaders(map[string]interface{}{
"content-type": "application/json",
"priority": 1,
}).
WithBody([]byte(`{"event": "user.created", "id": 123}`)).
Build()
err = publisher.PublishMsg(context.Background(), "", "events", message)
|
Configuration Examples
AMQP/RabbitMQ
1
2
3
4
5
| // Basic connection
consumer, err := mqutils.NewConsumer("amqp://user:pass@localhost:5672/queue")
// With exchange and routing key
consumer, err := mqutils.NewConsumer("amqp://localhost:5672/queue?exchange=events&routing_key=user.created")
|
Kafka
1
2
3
4
5
| // Basic connection
consumer, err := mqutils.NewConsumer("kafka://localhost:9092/mytopic")
// With consumer group
consumer, err := mqutils.NewConsumer("kafka://localhost:9092/mytopic?group_id=my-service")
|
AWS SQS
1
2
3
4
5
| // Standard queue
consumer, err := mqutils.NewConsumer("sqs://us-east-1/my-queue")
// FIFO queue
consumer, err := mqutils.NewConsumer("sqs://us-east-1/my-queue.fifo")
|
Error Handling
mqutils provides structured error handling:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| types.RegisterHandler("process", func(ctx context.Context, msg types.Message) error {
// Your processing logic here
if err := processMessage(ctx, msg); err != nil {
// Return error to trigger message retry/dead letter
return fmt.Errorf("processing failed: %w", err)
}
return nil // Message will be acknowledged automatically
})
// For manual acknowledgment control
types.RegisterHandler("manual", func(ctx context.Context, msg types.Message) error {
if err := processMessage(ctx, msg); err != nil {
// Explicitly nack the message
return types.NacknowledgeMessage(ctx, msg)
}
// Explicitly ack the message
return types.AcknowledgeMessage(ctx, msg)
})
|
Graceful Shutdown
Always implement graceful shutdown:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| func main() {
// Register handler first
types.RegisterHandler("process", func(ctx context.Context, msg types.Message) error {
log.Printf("Processing: %s", string(msg.Body()))
return nil
})
// Enable graceful shutdown in configuration
consumer, err := mqutils.NewConsumer("amqp://localhost:5672/myqueue?handler=process&enable_graceful_shutdown=true&graceful_shutdown_timeout=30")
if err != nil {
log.Fatal(err)
}
// Handle shutdown signals
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
log.Println("Shutdown signal received, stopping consumer...")
cancel()
}()
// Run will return when context is cancelled
if err := consumer.Run(ctx); err != nil {
log.Printf("Consumer error: %v", err)
}
}
|
Next Steps