Main Package

Entry points and factory functions for mqutils

mqutils

1
import "gitlab.com/digitalxero/mqutils"

Package mqutils provides a unified abstraction layer for working with multiple message queue systems. It supports AMQP/RabbitMQ, Apache Kafka, NATS, AWS SQS, GCP Pub/Sub, and Redis with a consistent API.

The package uses URL-based routing to automatically select the appropriate message queue implementation based on the connection URL scheme:

  • amqp://, amqps:// - RabbitMQ/AMQP
  • kafka://, kafkas:// - Apache Kafka
  • nats://, natss://, jetstream:// - NATS Core/JetStream
  • sqs://, sqss:// - AWS SQS
  • pubsub:// - GCP Pub/Sub
  • redis://, rediss://, redisstream:// - Redis Pub/Sub & Streams

Example usage:

// Create a consumer
config := viper.New()
config.Set("url", "amqp://localhost:5672")
config.Set("queue", "my-queue")

consumer, err := mqutils.NewConsumer(ctx, config)
if err != nil {32..............
    log.Fatal(err)
}

// Register a handler
mqutils.RegisterHandler("my-queue", myHandler)

// Run the consumer
if err := consumer.Run(ctx); err != nil {
    log.Printf("Consumer error: %v", err)
}

Index

Variables

RegisterBatchHandler is a convenience function that registers a batch message handler with the global handler registry. Batch handlers process multiple messages at once for improved efficiency.

This is an alias for types.RegisterBatchHandler.

Example:

mqutils.RegisterBatchHandler("metrics.batch", func(ctx context.Context, msgs []types.Message) {
    // Process batch of metrics
    var metrics []Metric
    for _, msg := range msgs {
        var metric Metric
        if err := json.Unmarshal(msg.Body(), &metric); err != nil {
            msg.Nack()
            continue
        }
        metrics = append(metrics, metric)
    }

    // Bulk insert metrics
    if err := db.BulkInsertMetrics(ctx, metrics); err != nil {
        // Failed - nack all messages
        for _, msg := range msgs {
            msg.Nack()
        }
        return
    }

    // Success - ack all messages
    for _, msg := range msgs {
        msg.Ack()
    }
})
1
var RegisterBatchHandler = types.RegisterBatchHandler

RegisterHandler is a convenience function that registers a message handler with the global handler registry. Handlers are retrieved by consumers to process messages from specific queues or topics.

This is an alias for types.RegisterHandler.

Example:

mqutils.RegisterHandler("user.events", func(ctx context.Context, msg types.Message) {
    // Process user event
    var event UserEvent
    if err := json.Unmarshal(msg.Body(), &event); err != nil {
        msg.Nack()
        return
    }
    // Handle the event...
    msg.Ack()
})
1
var RegisterHandler = types.RegisterHandler

func NewConsumer

1
func NewConsumer(ctx context.Context, config *viper.Viper) (types.Consumer, error)

NewConsumer creates a new message consumer based on the connection URL in the configuration. It automatically selects the appropriate implementation based on the URL scheme (e.g., amqp://, kafka://, nats://, etc.).

The configuration should contain at minimum:

  • url: The connection URL for the message queue system
  • queue: The name of the queue or topic to consume from

Additional configuration options vary by message queue system. See the documentation for each implementation for specific options.

Returns an error if:

  • The URL scheme is not recognized (no registered consumer)
  • The consumer creation fails due to invalid configuration
  • Connection to the message queue system fails

Example:

config := viper.New()
config.Set("url", "kafka://localhost:9092")
config.Set("queue", "events")
config.Set("consumer_group", "my-service")

consumer, err := mqutils.NewConsumer(ctx, config)
if err != nil {
    return fmt.Errorf("failed to create consumer: %w", err)
}

func NewProducer

1
func NewProducer(ctx context.Context, config *viper.Viper) (types.Producer, error)

NewProducer creates a new message producer based on the connection URL in the configuration. It automatically selects the appropriate implementation based on the URL scheme (e.g., amqp://, kafka://, nats://, etc.).

The configuration should contain at minimum:

  • url: The connection URL for the message queue system

Additional configuration options vary by message queue system. Some producers may require explicit initialization with Start() before use.

Returns an error if:

  • The URL scheme is not recognized (no registered producer)
  • The producer creation fails due to invalid configuration
  • Initial connection setup fails

Example:

config := viper.New()
config.Set("url", "amqp://localhost:5672")

producer, err := mqutils.NewProducer(ctx, config)
if err != nil {
    return fmt.Errorf("failed to create producer: %w", err)
}

// Some producers need explicit start
if err := producer.Start(ctx); err != nil {
    return fmt.Errorf("failed to start producer: %w", err)
}

// Publish a message
err = producer.Publish(ctx, "correlation-123", "events", "user.created",
    "application/json", []byte(`{"id": 123}`))

Generated by gomarkdoc