NATS

NATS Core and JetStream cloud-native messaging with clustering and persistence support

nats

1
import "gitlab.com/digitalxero/mqutils/nats"

Index

func AcknowledgeMessage

1
func AcknowledgeMessage(ctx context.Context, message types.Message) error

AcknowledgeMessage provides a robust way for handlers to acknowledge messages with retry logic This function is exported to allow handlers to use the same acknowledgment retry mechanism

func NacknowledgeMessage

1
func NacknowledgeMessage(ctx context.Context, message types.Message) error

NacknowledgeMessage provides a robust way for handlers to nack messages with retry logic This function is exported to allow handlers to use the same nack retry mechanism

func NewMessageBuilder

1
func NewMessageBuilder() types.MessageBuilder

NewMessageBuilder creates a new NATS message builder. The builder provides a fluent interface for constructing NATS messages with all supported properties and attributes.

Example:

msg := nats.NewMessageBuilder().
    WithBody([]byte(`{"sensor": "temp-01", "value": 23.5}`)).
    WithContentType("application/json").
    WithCorrelationId("reading-456").
    WithHeaders(map[string]interface{}{
        "location": "warehouse-1",
        "timestamp": time.Now().Unix(),
    }).
    WithReplyTo("sensor.responses"). // For request-reply pattern
    Build()

func NewNatsConsumer

1
func NewNatsConsumer(config *viper.Viper) (types.Consumer, error)

NewNatsConsumer creates a new NATS/JetStream consumer with the provided configuration. This function is typically called by mqutils.NewConsumer when it detects a NATS URL.

Configuration options:

  • url: NATS server URLs (required, e.g., “nats://localhost:4222”)
  • queue: Subject/stream name to consume from (required)
  • handler: Name of registered handler function (default: “natsLogger”)
  • queue_group: Queue group for load balancing (optional)
  • durable: Durable consumer name for JetStream (optional)
  • jetstream: Enable JetStream mode (default: false)
  • credentials_file: Path to NATS credentials file (optional)
  • tls_cert: Path to TLS certificate (optional)
  • tls_key: Path to TLS key (optional)
  • tls_ca: Path to TLS CA certificate (optional)
  • skip_verify: Skip TLS certificate verification (default: false)
  • max_reconnects: Max reconnection attempts (default: 60)
  • reconnect_wait: Wait between reconnects in ms (default: 2000)
  • message_channel_buffer: Buffer size for message channel (default: 10)
  • batch_size: Number of messages per batch (default: 5)
  • batch_timeout: Batch collection timeout in ms (default: 100)
  • enable_batch_processing: Enable batch message processing (default: false)

Returns an error if configuration validation fails or transport creation fails.

func NewNatsProducer

1
func NewNatsProducer(config *viper.Viper) (types.Producer, error)

NewNatsProducer creates a new NATS/JetStream producer with the provided configuration. This function is typically called by mqutils.NewProducer when it detects a NATS URL.

Configuration options:

  • url: NATS server URLs (required, e.g., “nats://localhost:4222”)
  • subject_prefix: Prefix to add to all subjects (optional)
  • jetstream: Enable JetStream mode for persistence (default: false)
  • credentials_file: Path to NATS credentials file (optional)
  • tls_cert: Path to TLS certificate (optional)
  • tls_key: Path to TLS key (optional)
  • tls_ca: Path to TLS CA certificate (optional)
  • skip_verify: Skip TLS certificate verification (default: false)
  • max_reconnects: Max reconnection attempts (default: 60)
  • reconnect_wait: Wait between reconnects in ms (default: 2000)
  • request_timeout: Timeout for request-reply in ms (default: 5000)

Returns an error if configuration validation fails or transport creation fails. The producer must be started with Start() before publishing messages.

func NewNatsTransport

1
func NewNatsTransport() types.Transport

NewNatsTransport creates a new NATS transport implementation. The transport provides low-level NATS operations including connection management, publishing, and subscription handling for both Core NATS and JetStream.

The transport manages:

  • NATS connections with automatic reconnection
  • JetStream context for persistent messaging
  • Subscription lifecycle management
  • Authentication (credentials, TLS)
  • Health monitoring

The transport supports both standard NATS (at-most-once delivery) and JetStream (at-least-once with persistence) messaging patterns.

type ConfigurationError

ConfigurationError represents a validation error with specific field and message

1
2
3
4
type ConfigurationError struct {
    Field   string
    Message string
}

func (*ConfigurationError) Error

1
func (e *ConfigurationError) Error() string

type ConfigurationErrors

ConfigurationErrors represents multiple validation errors

1
2
3
type ConfigurationErrors struct {
    Errors []*ConfigurationError
}

func (*ConfigurationErrors) Add

1
func (e *ConfigurationErrors) Add(field, message string)

Add adds a new configuration error to the collection

func (*ConfigurationErrors) Error

1
func (e *ConfigurationErrors) Error() string

func (*ConfigurationErrors) HasErrors

1
func (e *ConfigurationErrors) HasErrors() bool

HasErrors returns true if there are any validation errors

Generated by gomarkdoc