NATS
NATS Core and JetStream cloud-native messaging with clustering and persistence support
nats
|
|
Index
- func AcknowledgeMessage(ctx context.Context, message types.Message) error
- func NacknowledgeMessage(ctx context.Context, message types.Message) error
- func NewMessageBuilder() types.MessageBuilder
- func NewNatsConsumer(config *viper.Viper) (types.Consumer, error)
- func NewNatsProducer(config *viper.Viper) (types.Producer, error)
- func NewNatsTransport() types.Transport
- type ConfigurationError
- type ConfigurationErrors
func AcknowledgeMessage
|
|
AcknowledgeMessage provides a robust way for handlers to acknowledge messages with retry logic This function is exported to allow handlers to use the same acknowledgment retry mechanism
func NacknowledgeMessage
|
|
NacknowledgeMessage provides a robust way for handlers to nack messages with retry logic This function is exported to allow handlers to use the same nack retry mechanism
func NewMessageBuilder
|
|
NewMessageBuilder creates a new NATS message builder. The builder provides a fluent interface for constructing NATS messages with all supported properties and attributes.
Example:
msg := nats.NewMessageBuilder().
WithBody([]byte(`{"sensor": "temp-01", "value": 23.5}`)).
WithContentType("application/json").
WithCorrelationId("reading-456").
WithHeaders(map[string]interface{}{
"location": "warehouse-1",
"timestamp": time.Now().Unix(),
}).
WithReplyTo("sensor.responses"). // For request-reply pattern
Build()
func NewNatsConsumer
|
|
NewNatsConsumer creates a new NATS/JetStream consumer with the provided configuration. This function is typically called by mqutils.NewConsumer when it detects a NATS URL.
Configuration options:
- url: NATS server URLs (required, e.g., “nats://localhost:4222”)
- queue: Subject/stream name to consume from (required)
- handler: Name of registered handler function (default: “natsLogger”)
- queue_group: Queue group for load balancing (optional)
- durable: Durable consumer name for JetStream (optional)
- jetstream: Enable JetStream mode (default: false)
- credentials_file: Path to NATS credentials file (optional)
- tls_cert: Path to TLS certificate (optional)
- tls_key: Path to TLS key (optional)
- tls_ca: Path to TLS CA certificate (optional)
- skip_verify: Skip TLS certificate verification (default: false)
- max_reconnects: Max reconnection attempts (default: 60)
- reconnect_wait: Wait between reconnects in ms (default: 2000)
- message_channel_buffer: Buffer size for message channel (default: 10)
- batch_size: Number of messages per batch (default: 5)
- batch_timeout: Batch collection timeout in ms (default: 100)
- enable_batch_processing: Enable batch message processing (default: false)
Returns an error if configuration validation fails or transport creation fails.
func NewNatsProducer
|
|
NewNatsProducer creates a new NATS/JetStream producer with the provided configuration. This function is typically called by mqutils.NewProducer when it detects a NATS URL.
Configuration options:
- url: NATS server URLs (required, e.g., “nats://localhost:4222”)
- subject_prefix: Prefix to add to all subjects (optional)
- jetstream: Enable JetStream mode for persistence (default: false)
- credentials_file: Path to NATS credentials file (optional)
- tls_cert: Path to TLS certificate (optional)
- tls_key: Path to TLS key (optional)
- tls_ca: Path to TLS CA certificate (optional)
- skip_verify: Skip TLS certificate verification (default: false)
- max_reconnects: Max reconnection attempts (default: 60)
- reconnect_wait: Wait between reconnects in ms (default: 2000)
- request_timeout: Timeout for request-reply in ms (default: 5000)
Returns an error if configuration validation fails or transport creation fails. The producer must be started with Start() before publishing messages.
func NewNatsTransport
|
|
NewNatsTransport creates a new NATS transport implementation. The transport provides low-level NATS operations including connection management, publishing, and subscription handling for both Core NATS and JetStream.
The transport manages:
- NATS connections with automatic reconnection
- JetStream context for persistent messaging
- Subscription lifecycle management
- Authentication (credentials, TLS)
- Health monitoring
The transport supports both standard NATS (at-most-once delivery) and JetStream (at-least-once with persistence) messaging patterns.
type ConfigurationError
ConfigurationError represents a validation error with specific field and message
|
|
func (*ConfigurationError) Error
|
|
type ConfigurationErrors
ConfigurationErrors represents multiple validation errors
|
|
func (*ConfigurationErrors) Add
|
|
Add adds a new configuration error to the collection
func (*ConfigurationErrors) Error
|
|
func (*ConfigurationErrors) HasErrors
|
|
HasErrors returns true if there are any validation errors
Generated by gomarkdoc