Examples

Comprehensive examples for all supported message queue systems

Examples

This page provides comprehensive examples for all supported message queue systems in mqutils.

Quick Start Examples

Basic Consumer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
    "context"
    "log"
    "gitlab.com/digitalxero/mqutils"
    "gitlab.com/digitalxero/mqutils/types"
)

func main() {
    // Create consumer for any supported system
    consumer, err := mqutils.NewConsumer("amqp://localhost:5672/myqueue")
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    // Register message handler
    consumer.RegisterHandler("process", func(msg types.Message) error {
        log.Printf("Received: %s", string(msg.Body()))
        return nil
    })

    // Start consuming
    ctx := context.Background()
    if err := consumer.Run(ctx); err != nil {
        log.Fatal(err)
    }
}

Basic Publisher

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
    "log"
    "gitlab.com/digitalxero/mqutils"
)

func main() {
    consumer, err := mqutils.NewConsumer("kafka://localhost:9092/mytopic")
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    publisher := consumer.Publisher()
    
    // Simple publish
    err = publisher.Publish("mytopic", []byte("Hello, World!"))
    if err != nil {
        log.Fatal(err)
    }
    
    // Publish with message builder
    builder := publisher.NewMessageBuilder()
    message := builder.
        SetCorrelationID("req-123").
        SetHeaders(map[string]interface{}{"type": "greeting"}).
        SetBody([]byte("Hello from mqutils!")).
        Build()
    
    err = publisher.PublishMessage("mytopic", message)
    if err != nil {
        log.Fatal(err)
    }
}

System-Specific Examples

AMQP/RabbitMQ

Basic AMQP Consumer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
    "gitlab.com/digitalxero/mqutils"
    "gitlab.com/digitalxero/mqutils/types"
)

func main() {
    // AMQP with exchange and routing key
    url := "amqp://guest:guest@localhost:5672/orders?exchange=events&routing_key=order.created"
    
    consumer, err := mqutils.NewConsumer(url)
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    // Register order processing handler
    consumer.RegisterHandler("order_processor", func(msg types.Message) error {
        log.Printf("Processing order: %s", string(msg.Body()))
        
        // Send reply if needed
        if msg.ReplyTo() != "" {
            response := []byte(`{"status": "processed"}`)
            builder := msg.Publisher().NewMessageBuilder()
            reply := builder.
                SetCorrelationID(msg.CorrelationID()).
                SetBody(response).
                Build()
            
            return msg.Publisher().PublishMessage(msg.ReplyTo(), reply)
        }
        
        return nil
    })

    // Graceful shutdown
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

    go func() {
        <-sigChan
        log.Println("Shutting down...")
        cancel()
    }()

    if err := consumer.Run(ctx); err != nil {
        log.Printf("Consumer error: %v", err)
    }
}

AMQP with Exchange Routing

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// Publisher that routes messages to different queues
func publishToExchange() {
    consumer, err := mqutils.NewConsumer("amqp://localhost:5672/unused")
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    publisher := consumer.Publisher()

    // Route to different queues using routing keys
    events := []struct {
        routingKey string
        message    string
    }{
        {"user.created", `{"event": "user_created", "id": 123}`},
        {"user.updated", `{"event": "user_updated", "id": 123}`},
        {"order.placed", `{"event": "order_placed", "order_id": 456}`},
    }

    for _, event := range events {
        builder := publisher.NewMessageBuilder()
        msg := builder.
            SetRoutingKey(event.routingKey).
            SetExchange("events").
            SetHeaders(map[string]interface{}{
                "event_type": event.routingKey,
                "timestamp": time.Now().Unix(),
            }).
            SetBody([]byte(event.message)).
            Build()

        err := publisher.PublishMessage("", msg) // Empty destination, routing key used
        if err != nil {
            log.Printf("Failed to publish %s: %v", event.routingKey, err)
        }
    }
}

Apache Kafka

Kafka Consumer with Consumer Groups

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
    "context"
    "log"
    "gitlab.com/digitalxero/mqutils"
    "gitlab.com/digitalxero/mqutils/types"
)

func main() {
    // Kafka with consumer group
    url := "kafka://localhost:9092/user-events?group_id=user-service&offset=earliest"
    
    consumer, err := mqutils.NewConsumer(url)
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    // Register batch handler for high throughput
    consumer.RegisterBatchHandler("user_events", func(msgs []types.Message) error {
        log.Printf("Processing batch of %d messages", len(msgs))
        
        for _, msg := range msgs {
            // Process each message
            log.Printf("User event: %s", string(msg.Body()))
        }
        
        return nil
    })

    ctx := context.Background()
    if err := consumer.Run(ctx); err != nil {
        log.Fatal(err)
    }
}

Kafka Producer with Compression

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func kafkaProducerExample() {
    consumer, err := mqutils.NewConsumer("kafka://localhost:9092/events")
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    publisher := consumer.Publisher()

    // Produce messages with headers
    for i := 0; i < 100; i++ {
        builder := publisher.NewMessageBuilder()
        msg := builder.
            SetID(fmt.Sprintf("msg-%d", i)).
            SetHeaders(map[string]interface{}{
                "partition":  i % 10, // Distribute across partitions
                "timestamp":  time.Now().Unix(),
                "message_type": "user_event",
            }).
            SetBody([]byte(fmt.Sprintf(`{"user_id": %d, "action": "login"}`, i))).
            Build()

        err := publisher.PublishMessage("user-events", msg)
        if err != nil {
            log.Printf("Failed to publish message %d: %v", i, err)
        }
    }
}

NATS Core and JetStream

NATS Core Consumer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
    "context"
    "log"
    "gitlab.com/digitalxero/mqutils"
    "gitlab.com/digitalxero/mqutils/types"
)

func main() {
    // NATS Core with subject pattern
    url := "nats://localhost:4222/events.>"
    
    consumer, err := mqutils.NewConsumer(url)
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    consumer.RegisterHandler("event_processor", func(msg types.Message) error {
        subject := msg.RoutingKey() // NATS subject
        log.Printf("Received on %s: %s", subject, string(msg.Body()))
        return nil
    })

    ctx := context.Background()
    if err := consumer.Run(ctx); err != nil {
        log.Fatal(err)
    }
}

NATS JetStream with Persistence

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func jetStreamExample() {
    // JetStream with durable consumer
    url := "jetstream://localhost:4222/orders.created?stream=orders&consumer=order-processor&durable=true"
    
    consumer, err := mqutils.NewConsumer(url)
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    consumer.RegisterHandler("order_processor", func(msg types.Message) error {
        log.Printf("Processing order: %s", string(msg.Body()))
        
        // JetStream provides message replay and persistence
        return nil
    })

    ctx := context.Background()
    if err := consumer.Run(ctx); err != nil {
        log.Fatal(err)
    }
}

AWS SQS

SQS Consumer with Visibility Timeout

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main

import (
    "context"
    "log"
    "gitlab.com/digitalxero/mqutils"
    "gitlab.com/digitalxero/mqutils/types"
)

func main() {
    // SQS with region and visibility timeout
    url := "sqs://us-east-1/my-queue?visibility_timeout=300&max_messages=10"
    
    consumer, err := mqutils.NewConsumer(url)
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    consumer.RegisterHandler("sqs_processor", func(msg types.Message) error {
        log.Printf("Processing SQS message: %s", string(msg.Body()))
        
        // Long processing time handled by visibility timeout
        return processLongRunningTask(msg.Body())
    })

    ctx := context.Background()
    if err := consumer.Run(ctx); err != nil {
        log.Fatal(err)
    }
}

func processLongRunningTask(data []byte) error {
    // Simulate long-running task
    time.Sleep(60 * time.Second)
    return nil
}

SQS FIFO Queue

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func sqsFifoExample() {
    // FIFO queue ensures message ordering
    url := "sqs://us-east-1/my-queue.fifo"
    
    consumer, err := mqutils.NewConsumer(url)
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    publisher := consumer.Publisher()

    // Publish messages with group ID for ordering
    for i := 0; i < 5; i++ {
        builder := publisher.NewMessageBuilder()
        msg := builder.
            SetID(fmt.Sprintf("msg-%d", i)). // Deduplication ID
            SetHeaders(map[string]interface{}{
                "MessageGroupId": "order-processing", // FIFO group
            }).
            SetBody([]byte(fmt.Sprintf(`{"sequence": %d}`, i))).
            Build()

        err := publisher.PublishMessage("my-queue.fifo", msg)
        if err != nil {
            log.Printf("Failed to publish FIFO message: %v", err)
        }
    }
}

GCP Pub/Sub

Pub/Sub with Message Ordering

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main

import (
    "context"
    "log"
    "gitlab.com/digitalxero/mqutils"
    "gitlab.com/digitalxero/mqutils/types"
)

func main() {
    // GCP Pub/Sub with project and subscription
    url := "pubsub://my-project/my-subscription?max_outstanding_messages=1000"
    
    consumer, err := mqutils.NewConsumer(url)
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    consumer.RegisterHandler("pubsub_processor", func(msg types.Message) error {
        log.Printf("Pub/Sub message: %s", string(msg.Body()))
        
        // Extract Pub/Sub attributes from headers
        if attrs := msg.Headers(); attrs != nil {
            if orderKey, ok := attrs["ordering_key"]; ok {
                log.Printf("Ordering key: %v", orderKey)
            }
        }
        
        return nil
    })

    ctx := context.Background()
    if err := consumer.Run(ctx); err != nil {
        log.Fatal(err)
    }
}

Redis Pub/Sub and Streams

Redis Pub/Sub

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
    "context"
    "log"
    "gitlab.com/digitalxero/mqutils"
    "gitlab.com/digitalxero/mqutils/types"
)

func main() {
    // Redis Pub/Sub
    url := "redis://localhost:6379/events.*"
    
    consumer, err := mqutils.NewConsumer(url)
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    consumer.RegisterHandler("redis_processor", func(msg types.Message) error {
        channel := msg.RoutingKey()
        log.Printf("Redis channel %s: %s", channel, string(msg.Body()))
        return nil
    })

    ctx := context.Background()
    if err := consumer.Run(ctx); err != nil {
        log.Fatal(err)
    }
}

Redis Streams with Consumer Groups

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func redisStreamsExample() {
    // Redis Streams with consumer group
    url := "redisstream://localhost:6379/mystream?group=processors&consumer=worker-1"
    
    consumer, err := mqutils.NewConsumer(url)
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    consumer.RegisterHandler("stream_processor", func(msg types.Message) error {
        log.Printf("Stream message ID %s: %s", msg.ID(), string(msg.Body()))
        
        // Redis Streams provide message ID and ordering
        return nil
    })

    ctx := context.Background()
    if err := consumer.Run(ctx); err != nil {
        log.Fatal(err)
    }
}

Advanced Examples

Request-Response Pattern

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package main

import (
    "context"
    "fmt"
    "log"
    "time"
    "gitlab.com/digitalxero/mqutils"
    "gitlab.com/digitalxero/mqutils/types"
)

// Request-response pattern implementation
func requestResponseExample() {
    // Setup responder
    go setupResponder()
    
    // Setup requester
    time.Sleep(time.Second) // Wait for responder to start
    sendRequest()
}

func setupResponder() {
    consumer, err := mqutils.NewConsumer("amqp://localhost:5672/requests")
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    consumer.RegisterHandler("request_handler", func(msg types.Message) error {
        log.Printf("Processing request: %s", string(msg.Body()))
        
        // Process request and send response
        if msg.ReplyTo() != "" {
            response := fmt.Sprintf("Processed: %s", string(msg.Body()))
            
            builder := msg.Publisher().NewMessageBuilder()
            reply := builder.
                SetCorrelationID(msg.CorrelationID()).
                SetBody([]byte(response)).
                Build()
            
            return msg.Publisher().PublishMessage(msg.ReplyTo(), reply)
        }
        
        return nil
    })

    ctx := context.Background()
    if err := consumer.Run(ctx); err != nil {
        log.Printf("Responder error: %v", err)
    }
}

func sendRequest() {
    consumer, err := mqutils.NewConsumer("amqp://localhost:5672/responses")
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    publisher := consumer.Publisher()

    // Setup response handler
    consumer.RegisterHandler("response_handler", func(msg types.Message) error {
        log.Printf("Received response: %s", string(msg.Body()))
        return nil
    })

    // Send request
    builder := publisher.NewMessageBuilder()
    request := builder.
        SetCorrelationID("req-123").
        SetReplyTo("responses").
        SetBody([]byte("Hello, please process this")).
        Build()

    err = publisher.PublishMessage("requests", request)
    if err != nil {
        log.Fatal(err)
    }

    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    if err := consumer.Run(ctx); err != nil {
        log.Printf("Requester error: %v", err)
    }
}

Health Monitoring

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package main

import (
    "context"
    "log"
    "time"
    "gitlab.com/digitalxero/mqutils"
    "gitlab.com/digitalxero/mqutils/types"
)

func healthMonitoringExample() {
    consumer, err := mqutils.NewConsumer("kafka://localhost:9092/health-test")
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    // Start health monitoring
    go monitorHealth(consumer)

    consumer.RegisterHandler("health_processor", func(msg types.Message) error {
        return nil
    })

    ctx := context.Background()
    if err := consumer.Run(ctx); err != nil {
        log.Fatal(err)
    }
}

func monitorHealth(consumer types.Consumer) {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        health := consumer.HealthCheck()
        
        log.Printf("Health Status: %s", health.Status())
        log.Printf("Health Message: %s", health.Message())
        log.Printf("Health Timestamp: %s", health.Timestamp())
        
        if details := health.Details(); details != nil {
            log.Printf("Health Details: %+v", details)
        }
        
        if health.Status() != types.HealthStatusHealthy {
            log.Printf("WARNING: Consumer is not healthy!")
            // Implement alerting logic here
        }
    }
}

Batch Processing with Error Handling

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func batchProcessingExample() {
    consumer, err := mqutils.NewConsumer("kafka://localhost:9092/batch-events?group_id=batch-processor")
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    consumer.RegisterBatchHandler("batch_processor", func(msgs []types.Message) error {
        log.Printf("Processing batch of %d messages", len(msgs))
        
        var failed []types.Message
        
        for _, msg := range msgs {
            if err := processIndividualMessage(msg); err != nil {
                log.Printf("Failed to process message %s: %v", msg.ID(), err)
                failed = append(failed, msg)
            }
        }
        
        if len(failed) > 0 {
            log.Printf("Batch processing completed with %d failures", len(failed))
            // Handle failed messages (retry, dead letter, etc.)
            return handleFailedMessages(failed)
        }
        
        log.Printf("Batch processing completed successfully")
        return nil
    })

    ctx := context.Background()
    if err := consumer.Run(ctx); err != nil {
        log.Fatal(err)
    }
}

func processIndividualMessage(msg types.Message) error {
    // Simulate processing
    return nil
}

func handleFailedMessages(failed []types.Message) error {
    // Implement retry logic or send to dead letter queue
    return nil
}

Testing Examples

Unit Testing with mqutils

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package main

import (
    "context"
    "testing"
    "gitlab.com/digitalxero/mqutils"
    "gitlab.com/digitalxero/mqutils/types"
)

func TestMessageProcessing(t *testing.T) {
    // Create test consumer (you might use a test transport)
    consumer, err := mqutils.NewConsumer("amqp://localhost:5672/test-queue")
    if err != nil {
        t.Fatalf("Failed to create consumer: %v", err)
    }
    defer consumer.Close()

    // Test message handler
    var processedMessage types.Message
    consumer.RegisterHandler("test_handler", func(msg types.Message) error {
        processedMessage = msg
        return nil
    })

    // Publish test message
    publisher := consumer.Publisher()
    testMessage := []byte("test message")
    
    err = publisher.Publish("test-queue", testMessage)
    if err != nil {
        t.Fatalf("Failed to publish test message: %v", err)
    }

    // Verify message was processed
    // (In real tests, you'd need proper synchronization)
    if processedMessage == nil {
        t.Error("Message was not processed")
    }
}

This comprehensive examples documentation covers all major use cases and systems supported by mqutils. Each example demonstrates best practices for the specific message queue system while maintaining the unified mqutils API.