Main Package
Entry points and factory functions for mqutils
mqutils
|
|
Package mqutils provides a unified abstraction layer for working with multiple message queue systems. It supports AMQP/RabbitMQ, Apache Kafka, NATS, AWS SQS, GCP Pub/Sub, and Redis with a consistent API.
The package uses URL-based routing to automatically select the appropriate message queue implementation based on the connection URL scheme:
- amqp://, amqps:// - RabbitMQ/AMQP
- kafka://, kafkas:// - Apache Kafka
- nats://, natss://, jetstream:// - NATS Core/JetStream
- sqs://, sqss:// - AWS SQS
- pubsub:// - GCP Pub/Sub
- redis://, rediss://, redisstream:// - Redis Pub/Sub & Streams
Example usage:
// Create a consumer
config := viper.New()
config.Set("url", "amqp://localhost:5672")
config.Set("queue", "my-queue")
consumer, err := mqutils.NewConsumer(ctx, config)
if err != nil {32..............
log.Fatal(err)
}
// Register a handler
mqutils.RegisterHandler("my-queue", myHandler)
// Run the consumer
if err := consumer.Run(ctx); err != nil {
log.Printf("Consumer error: %v", err)
}
Index
- Variables
- func NewConsumer(ctx context.Context, config *viper.Viper) (types.Consumer, error)
- func NewProducer(ctx context.Context, config *viper.Viper) (types.Producer, error)
Variables
RegisterBatchHandler is a convenience function that registers a batch message handler with the global handler registry. Batch handlers process multiple messages at once for improved efficiency.
This is an alias for types.RegisterBatchHandler.
Example:
mqutils.RegisterBatchHandler("metrics.batch", func(ctx context.Context, msgs []types.Message) {
// Process batch of metrics
var metrics []Metric
for _, msg := range msgs {
var metric Metric
if err := json.Unmarshal(msg.Body(), &metric); err != nil {
msg.Nack()
continue
}
metrics = append(metrics, metric)
}
// Bulk insert metrics
if err := db.BulkInsertMetrics(ctx, metrics); err != nil {
// Failed - nack all messages
for _, msg := range msgs {
msg.Nack()
}
return
}
// Success - ack all messages
for _, msg := range msgs {
msg.Ack()
}
})
|
|
RegisterHandler is a convenience function that registers a message handler with the global handler registry. Handlers are retrieved by consumers to process messages from specific queues or topics.
This is an alias for types.RegisterHandler.
Example:
mqutils.RegisterHandler("user.events", func(ctx context.Context, msg types.Message) {
// Process user event
var event UserEvent
if err := json.Unmarshal(msg.Body(), &event); err != nil {
msg.Nack()
return
}
// Handle the event...
msg.Ack()
})
|
|
func NewConsumer
|
|
NewConsumer creates a new message consumer based on the connection URL in the configuration. It automatically selects the appropriate implementation based on the URL scheme (e.g., amqp://, kafka://, nats://, etc.).
The configuration should contain at minimum:
- url: The connection URL for the message queue system
- queue: The name of the queue or topic to consume from
Additional configuration options vary by message queue system. See the documentation for each implementation for specific options.
Returns an error if:
- The URL scheme is not recognized (no registered consumer)
- The consumer creation fails due to invalid configuration
- Connection to the message queue system fails
Example:
config := viper.New()
config.Set("url", "kafka://localhost:9092")
config.Set("queue", "events")
config.Set("consumer_group", "my-service")
consumer, err := mqutils.NewConsumer(ctx, config)
if err != nil {
return fmt.Errorf("failed to create consumer: %w", err)
}
func NewProducer
|
|
NewProducer creates a new message producer based on the connection URL in the configuration. It automatically selects the appropriate implementation based on the URL scheme (e.g., amqp://, kafka://, nats://, etc.).
The configuration should contain at minimum:
- url: The connection URL for the message queue system
Additional configuration options vary by message queue system. Some producers may require explicit initialization with Start() before use.
Returns an error if:
- The URL scheme is not recognized (no registered producer)
- The producer creation fails due to invalid configuration
- Initial connection setup fails
Example:
config := viper.New()
config.Set("url", "amqp://localhost:5672")
producer, err := mqutils.NewProducer(ctx, config)
if err != nil {
return fmt.Errorf("failed to create producer: %w", err)
}
// Some producers need explicit start
if err := producer.Start(ctx); err != nil {
return fmt.Errorf("failed to start producer: %w", err)
}
// Publish a message
err = producer.Publish(ctx, "correlation-123", "events", "user.created",
"application/json", []byte(`{"id": 123}`))
Generated by gomarkdoc