Examples

Comprehensive examples for all supported message queue systems

Examples

This page provides comprehensive examples for all supported message queue systems in mqutils.

Quick Start Examples

Basic Consumer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main

import (
    "context"
    "log"
    "github.com/spf13/viper"
    "gitlab.com/digitalxero/mqutils"
    "gitlab.com/digitalxero/mqutils/types"
)

func main() {
    // Register message handler globally
    types.RegisterHandler("process", func(ctx context.Context, msg types.Message) error {
        log.Printf("Received: %s", string(msg.Body()))
        return nil
    })

    // Create configuration
    config := viper.New()
    config.Set("url", "amqp://localhost:5672/myqueue")
    config.Set("handler", "process")

    // Create consumer for any supported system
    ctx := context.Background()
    consumer, err := mqutils.NewConsumer(ctx, config)
    if err != nil {
        log.Fatal(err)
    }

    // Start consuming
    if err := consumer.Run(ctx); err != nil {
        log.Fatal(err)
    }
}

Basic Publisher

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package main

import (
    "context"
    "log"
    "github.com/spf13/viper"
    "gitlab.com/digitalxero/mqutils"
    "gitlab.com/digitalxero/mqutils/kafka"
)

func main() {
    // Create configuration
    config := viper.New()
    config.Set("url", "kafka://localhost:9092/mytopic")

    ctx := context.Background()
    consumer, err := mqutils.NewConsumer(ctx, config)
    if err != nil {
        log.Fatal(err)
    }

    publisher := consumer.Publisher()
    
    // Simple publish
    err = publisher.Publish(
        context.Background(),
        "req-123",              // correlation ID
        "",                     // exchange (not used in Kafka)
        "mytopic",              // routing key/topic
        "text/plain",           // content type
        []byte("Hello, World!") // body
    )
    if err != nil {
        log.Fatal(err)
    }
    
    // Publish with message builder
    builder := kafka.NewMessageBuilder()
    message := builder.
        WithCorrelationId("req-123").
        WithHeaders(map[string]interface{}{"type": "greeting"}).
        WithBody([]byte("Hello from mqutils!")).
        Build()
    
    err = publisher.PublishMsg(context.Background(), "", "mytopic", message)
    if err != nil {
        log.Fatal(err)
    }
}

System-Specific Examples

AMQP/RabbitMQ

Basic AMQP Consumer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
    "github.com/spf13/viper"
    "gitlab.com/digitalxero/mqutils"
    "gitlab.com/digitalxero/mqutils/types"
)

func main() {
    // Register order processing handler
    types.RegisterHandler("order_processor", func(ctx context.Context, msg types.Message) error {
        log.Printf("Processing order: %s", string(msg.Body()))
        
        // Send reply if needed
        if msg.ReplyTo() != "" {
            response := []byte(`{"status": "processed"}`)
            return msg.Publisher().Publish(ctx,
                msg.CorrelationId(), // correlation ID
                "",                  // exchange (use default)
                msg.ReplyTo(),       // routing key (reply destination)
                "application/json",  // content type
                response)            // body
        }
        
        return nil
    })

    // Create configuration for AMQP with exchange and routing key
    config := viper.New()
    config.Set("url", "amqp://guest:guest@localhost:5672/orders")
    config.Set("exchange", "events")
    config.Set("routing_key", "order.created")
    config.Set("handler", "order_processor")
    
    ctx := context.Background()
    consumer, err := mqutils.NewConsumer(ctx, config)
    if err != nil {
        log.Fatal(err)
    }

    // Graceful shutdown
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

    go func() {
        <-sigChan
        log.Println("Shutting down...")
        cancel()
    }()

    if err := consumer.Run(ctx); err != nil {
        log.Printf("Consumer error: %v", err)
    }
}

AMQP with Exchange Routing

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// Publisher that routes messages to different queues
func publishToExchange() {
    // Create configuration
    config := viper.New()
    config.Set("url", "amqp://localhost:5672/unused")

    ctx := context.Background()
    consumer, err := mqutils.NewConsumer(ctx, config)
    if err != nil {
        log.Fatal(err)
    }

    publisher := consumer.Publisher()

    // Route to different queues using routing keys
    events := []struct {
        routingKey string
        message    string
    }{
        {"user.created", `{"event": "user_created", "id": 123}`},
        {"user.updated", `{"event": "user_updated", "id": 123}`},
        {"order.placed", `{"event": "order_placed", "order_id": 456}`},
    }

    for _, event := range events {
        // Use direct publish method for routing
        err := publisher.Publish(
            context.Background(),
            "",                     // correlation ID
            "events",               // exchange
            event.routingKey,       // routing key
            "application/json",     // content type
            []byte(event.message)   // body
        )
        if err != nil {
            log.Printf("Failed to publish %s: %v", event.routingKey, err)
        }
    }
}

Apache Kafka

Kafka Consumer with Consumer Groups

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
    "context"
    "log"
    "github.com/spf13/viper"
    "gitlab.com/digitalxero/mqutils"
    "gitlab.com/digitalxero/mqutils/types"
)

func main() {
    // Register batch handler for high throughput
    types.RegisterBatchHandler("user_events", func(ctx context.Context, msgs []types.Message) error {
        log.Printf("Processing batch of %d messages", len(msgs))
        
        for _, msg := range msgs {
            // Process each message
            log.Printf("User event: %s", string(msg.Body()))
        }
        
        return nil
    })

    // Create configuration for Kafka with consumer group and batch processing
    config := viper.New()
    config.Set("url", "kafka://localhost:9092/user-events")
    config.Set("group_id", "user-service")
    config.Set("offset", "earliest")
    config.Set("handler", "user_events")
    config.Set("batch_size", 50)
    config.Set("batch_timeout", 1000)
    
    ctx := context.Background()
    consumer, err := mqutils.NewConsumer(ctx, config)
    if err != nil {
        log.Fatal(err)
    }

    if err := consumer.Run(ctx); err != nil {
        log.Fatal(err)
    }
}

Kafka Producer with Compression

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func kafkaProducerExample() {
    // Create configuration
    config := viper.New()
    config.Set("url", "kafka://localhost:9092/events")

    ctx := context.Background()
    consumer, err := mqutils.NewConsumer(ctx, config)
    if err != nil {
        log.Fatal(err)
    }

    publisher := consumer.Publisher()

    // Produce messages with headers
    for i := 0; i < 100; i++ {
        err := publisher.Publish(
            context.Background(),
            fmt.Sprintf("msg-%d", i),              // correlation ID
            "",                                    // exchange (not used in Kafka)
            "user-events",                         // topic (routing key)
            "application/json",                    // content type
            []byte(fmt.Sprintf(`{"user_id": %d, "action": "login"}`, i)), // body
        )
        if err != nil {
            log.Printf("Failed to publish message %d: %v", i, err)
        }
    }
}

NATS Core and JetStream

NATS Core Consumer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
    "context"
    "log"
    "github.com/spf13/viper"
    "gitlab.com/digitalxero/mqutils"
    "gitlab.com/digitalxero/mqutils/types"
)

func main() {
    // Register event processor handler
    types.RegisterHandler("event_processor", func(ctx context.Context, msg types.Message) error {
        subject := msg.RoutingKey() // NATS subject
        log.Printf("Received on %s: %s", subject, string(msg.Body()))
        return nil
    })

    // Create configuration for NATS Core with subject pattern
    config := viper.New()
    config.Set("url", "nats://localhost:4222/events.>")
    config.Set("handler", "event_processor")
    
    ctx := context.Background()
    consumer, err := mqutils.NewConsumer(ctx, config)
    if err != nil {
        log.Fatal(err)
    }

    if err := consumer.Run(ctx); err != nil {
        log.Fatal(err)
    }
}

NATS JetStream with Persistence

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func jetStreamExample() {
    // Register order processor handler
    types.RegisterHandler("order_processor", func(ctx context.Context, msg types.Message) error {
        log.Printf("Processing order: %s", string(msg.Body()))
        
        // JetStream provides message replay and persistence
        return nil
    })

    // Create configuration for JetStream with durable consumer
    config := viper.New()
    config.Set("url", "jetstream://localhost:4222/orders.created")
    config.Set("stream", "orders")
    config.Set("consumer", "order-processor")
    config.Set("durable", true)
    config.Set("handler", "order_processor")
    
    ctx := context.Background()
    consumer, err := mqutils.NewConsumer(ctx, config)
    if err != nil {
        log.Fatal(err)
    }

    if err := consumer.Run(ctx); err != nil {
        log.Fatal(err)
    }
}

AWS SQS

SQS Consumer with Visibility Timeout

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
    "context"
    "log"
    "github.com/spf13/viper"
    "gitlab.com/digitalxero/mqutils"
    "gitlab.com/digitalxero/mqutils/types"
)

func main() {
    // Register SQS processor handler
    types.RegisterHandler("sqs_processor", func(ctx context.Context, msg types.Message) error {
        log.Printf("Processing SQS message: %s", string(msg.Body()))
        
        // Long processing time handled by visibility timeout
        return processLongRunningTask(msg.Body())
    })

    // Create configuration for SQS with region and visibility timeout
    config := viper.New()
    config.Set("url", "sqs://us-east-1/my-queue")
    config.Set("visibility_timeout", 300)
    config.Set("max_messages", 10)
    config.Set("handler", "sqs_processor")
    
    ctx := context.Background()
    consumer, err := mqutils.NewConsumer(ctx, config)
    if err != nil {
        log.Fatal(err)
    }

    if err := consumer.Run(ctx); err != nil {
        log.Fatal(err)
    }
}

func processLongRunningTask(data []byte) error {
    // Simulate long-running task
    time.Sleep(60 * time.Second)
    return nil
}

SQS FIFO Queue

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func sqsFifoExample() {
    // Create configuration for FIFO queue
    config := viper.New()
    config.Set("url", "sqs://us-east-1/my-queue.fifo")
    
    ctx := context.Background()
    consumer, err := mqutils.NewConsumer(ctx, config)
    if err != nil {
        log.Fatal(err)
    }

    publisher := consumer.Publisher()

    // Publish messages with group ID for ordering
    for i := 0; i < 5; i++ {
        err := publisher.Publish(
            context.Background(),
            fmt.Sprintf("msg-%d", i),              // correlation ID (deduplication ID)
            "",                                    // exchange (not used in SQS)
            "my-queue.fifo",                       // queue name
            "application/json",                    // content type
            []byte(fmt.Sprintf(`{"sequence": %d}`, i)), // body
        )
        if err != nil {
            log.Printf("Failed to publish FIFO message: %v", err)
        }
    }
}

GCP Pub/Sub

Pub/Sub with Message Ordering

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import (
    "context"
    "log"
    "github.com/spf13/viper"
    "gitlab.com/digitalxero/mqutils"
    "gitlab.com/digitalxero/mqutils/types"
    _ "gitlab.com/digitalxero/mqutils/gcp"
)

func main() {
    // Register Pub/Sub processor handler
    types.RegisterHandler("pubsub_processor", func(ctx context.Context, msg types.Message) error {
        log.Printf("Pub/Sub message: %s", string(msg.Body()))
        
        // Extract Pub/Sub attributes from headers
        if attrs := msg.Headers(); attrs != nil {
            if orderKey, ok := attrs["ordering_key"]; ok {
                log.Printf("Ordering key: %v", orderKey)
            }
        }
        
        return nil
    })

    // Create configuration for GCP Pub/Sub with project and subscription
    config := viper.New()
    config.Set("url", "pubsub://my-project/my-subscription")
    config.Set("max_outstanding_messages", 1000)
    config.Set("handler", "pubsub_processor")
    
    ctx := context.Background()
    consumer, err := mqutils.NewConsumer(ctx, config)
    if err != nil {
        log.Fatal(err)
    }

    ctx := context.Background()
    if err := consumer.Run(ctx); err != nil {
        log.Fatal(err)
    }
}

Redis Pub/Sub and Streams

Redis Pub/Sub

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
    "context"
    "log"
    "github.com/spf13/viper"
    "gitlab.com/digitalxero/mqutils"
    "gitlab.com/digitalxero/mqutils/types"
    _ "gitlab.com/digitalxero/mqutils/redis"
)

func main() {
    // Register Redis processor handler
    types.RegisterHandler("redis_processor", func(ctx context.Context, msg types.Message) error {
        channel := msg.RoutingKey()
        log.Printf("Redis channel %s: %s", channel, string(msg.Body()))
        return nil
    })

    // Create configuration for Redis Pub/Sub
    config := viper.New()
    config.Set("url", "redis://localhost:6379/events.*")
    config.Set("handler", "redis_processor")
    
    ctx := context.Background()
    consumer, err := mqutils.NewConsumer(ctx, config)
    if err != nil {
        log.Fatal(err)
    }

    ctx := context.Background()
    if err := consumer.Run(ctx); err != nil {
        log.Fatal(err)
    }
}

Redis Streams with Consumer Groups

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func redisStreamsExample() {
    // Register stream processor handler
    types.RegisterHandler("stream_processor", func(ctx context.Context, msg types.Message) error {
        log.Printf("Stream message ID %s: %s", msg.MessageId(), string(msg.Body()))
        
        // Redis Streams provide message ID and ordering
        return nil
    })

    // Create configuration for Redis Streams with consumer group
    config := viper.New()
    config.Set("url", "redisstream://localhost:6379/mystream")
    config.Set("group", "processors")
    config.Set("consumer", "worker-1")
    config.Set("handler", "stream_processor")
    
    ctx := context.Background()
    consumer, err := mqutils.NewConsumer(ctx, config)
    if err != nil {
        log.Fatal(err)
    }

    ctx := context.Background()
    if err := consumer.Run(ctx); err != nil {
        log.Fatal(err)
    }
}

Advanced Examples

Request-Response Pattern

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package main

import (
    "context"
    "fmt"
    "log"
    "time"
    "github.com/spf13/viper"
    "gitlab.com/digitalxero/mqutils"
    "gitlab.com/digitalxero/mqutils/types"
    _ "gitlab.com/digitalxero/mqutils/amqp"
)

// Request-response pattern implementation
func requestResponseExample() {
    // Setup responder
    go setupResponder()
    
    // Setup requester
    time.Sleep(time.Second) // Wait for responder to start
    sendRequest()
}

func setupResponder() {
    // Register request handler
    types.RegisterHandler("request_handler", func(ctx context.Context, msg types.Message) error {
        log.Printf("Processing request: %s", string(msg.Body()))
        
        // Process request and send response
        if msg.ReplyTo() != "" {
            response := fmt.Sprintf("Processed: %s", string(msg.Body()))
            return msg.Publisher().Publish(ctx,
                msg.CorrelationId(), // correlation ID
                "",                  // exchange (use default)
                msg.ReplyTo(),       // routing key (reply destination)
                "text/plain",        // content type
                []byte(response))    // body
        }
        
        return nil
    })

    // Create configuration for responder
    config := viper.New()
    config.Set("url", "amqp://localhost:5672/requests")
    config.Set("handler", "request_handler")
    
    ctx := context.Background()
    consumer, err := mqutils.NewConsumer(ctx, config)
    if err != nil {
        log.Fatal(err)
    }

    ctx := context.Background()
    if err := consumer.Run(ctx); err != nil {
        log.Printf("Responder error: %v", err)
    }
}

func sendRequest() {
    // Register response handler
    types.RegisterHandler("response_handler", func(ctx context.Context, msg types.Message) error {
        log.Printf("Received response: %s", string(msg.Body()))
        return nil
    })

    // Create configuration for requester
    config := viper.New()
    config.Set("url", "amqp://localhost:5672/responses")
    config.Set("handler", "response_handler")
    
    ctx := context.Background()
    consumer, err := mqutils.NewConsumer(ctx, config)
    if err != nil {
        log.Fatal(err)
    }

    publisher := consumer.Publisher()

    // Send request
    err = publisher.Publish(
        context.Background(),
        "req-123",                              // correlation ID
        "",                                     // exchange (use default)
        "requests",                             // routing key (destination)
        "text/plain",                           // content type
        []byte("Hello, please process this")   // body
    )
    if err != nil {
        log.Fatal(err)
    }

    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    if err := consumer.Run(ctx); err != nil {
        log.Printf("Requester error: %v", err)
    }
}

Health Monitoring

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package main

import (
    "context"
    "log"
    "time"
    "github.com/spf13/viper"
    "gitlab.com/digitalxero/mqutils"
    "gitlab.com/digitalxero/mqutils/types"
    _ "gitlab.com/digitalxero/mqutils/kafka"
)

func healthMonitoringExample() {
    // Register health processor handler
    types.RegisterHandler("health_processor", func(ctx context.Context, msg types.Message) error {
        return nil
    })

    // Create configuration for health monitoring
    config := viper.New()
    config.Set("url", "kafka://localhost:9092/health-test")
    config.Set("handler", "health_processor")
    
    ctx := context.Background()
    consumer, err := mqutils.NewConsumer(ctx, config)
    if err != nil {
        log.Fatal(err)
    }

    // Start health monitoring
    go monitorHealth(consumer)

    ctx := context.Background()
    if err := consumer.Run(ctx); err != nil {
        log.Fatal(err)
    }
}

func monitorHealth(consumer types.Consumer) {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        health, err := consumer.HealthCheck(context.Background())
        if err != nil {
            log.Printf("Health check failed: %v", err)
            continue
        }
        
        log.Printf("Health Status: %s", health.Status())
        log.Printf("Health Message: %s", health.Message())
        log.Printf("Health Timestamp: %s", health.Timestamp())
        
        if details := health.Details(); details != nil {
            log.Printf("Health Details: %+v", details)
        }
        
        if health.Status() != types.HealthStatusHealthy {
            log.Printf("WARNING: Consumer is not healthy!")
            // Implement alerting logic here
        }
    }
}

Batch Processing with Error Handling

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func batchProcessingExample() {
    // Register batch processor handler
    types.RegisterBatchHandler("batch_processor", func(ctx context.Context, msgs []types.Message) error {
        log.Printf("Processing batch of %d messages", len(msgs))
        
        var failed []types.Message
        
        for _, msg := range msgs {
            if err := processIndividualMessage(msg); err != nil {
                log.Printf("Failed to process message %s: %v", msg.MessageId(), err)
                failed = append(failed, msg)
            }
        }
        
        if len(failed) > 0 {
            log.Printf("Batch processing completed with %d failures", len(failed))
            // Handle failed messages (retry, dead letter, etc.)
            return handleFailedMessages(failed)
        }
        
        log.Printf("Batch processing completed successfully")
        return nil
    })

    // Create configuration for batch processing
    config := viper.New()
    config.Set("url", "kafka://localhost:9092/batch-events")
    config.Set("group_id", "batch-processor")
    config.Set("handler", "batch_processor")
    config.Set("batch_size", 100)
    config.Set("batch_timeout", 5000)
    
    ctx := context.Background()
    consumer, err := mqutils.NewConsumer(ctx, config)
    if err != nil {
        log.Fatal(err)
    }

    ctx := context.Background()
    if err := consumer.Run(ctx); err != nil {
        log.Fatal(err)
    }
}

func processIndividualMessage(msg types.Message) error {
    // Simulate processing
    return nil
}

func handleFailedMessages(failed []types.Message) error {
    // Implement retry logic or send to dead letter queue
    return nil
}

Testing Examples

Unit Testing with mqutils

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package main

import (
    "context"
    "testing"
    "github.com/spf13/viper"
    "gitlab.com/digitalxero/mqutils"
    "gitlab.com/digitalxero/mqutils/types"
    _ "gitlab.com/digitalxero/mqutils/amqp"
)

func TestMessageProcessing(t *testing.T) {
    // Test message handler
    var processedMessage types.Message
    types.RegisterHandler("test_handler", func(ctx context.Context, msg types.Message) error {
        processedMessage = msg
        return nil
    })

    // Create configuration for test consumer
    config := viper.New()
    config.Set("url", "amqp://localhost:5672/test-queue")
    config.Set("handler", "test_handler")
    
    ctx := context.Background()
    consumer, err := mqutils.NewConsumer(ctx, config)
    if err != nil {
        t.Fatalf("Failed to create consumer: %v", err)
    }

    // Publish test message
    publisher := consumer.Publisher()
    testMessage := []byte("test message")
    
    err = publisher.Publish(
        context.Background(),
        "test-corr-id",      // correlation ID
        "",                  // exchange (use default)
        "test-queue",        // routing key/destination
        "text/plain",        // content type
        testMessage          // body
    )
    if err != nil {
        t.Fatalf("Failed to publish test message: %v", err)
    }

    // Verify message was processed
    // (In real tests, you'd need proper synchronization)
    if processedMessage == nil {
        t.Error("Message was not processed")
    }
}

This comprehensive examples documentation covers all major use cases and systems supported by mqutils. Each example demonstrates best practices for the specific message queue system while maintaining the unified mqutils API.