API Reference
Complete API documentation for mqutils
API Reference
This page provides detailed documentation for all public interfaces and types in mqutils.
Factory Functions
mqutils.NewConsumer
|
|
Creates a new consumer based on the URL scheme. Automatically detects the message queue system and returns the appropriate implementation.
Parameters:
url
: Connection URL with scheme (e.g.,amqp://localhost:5672/queue
)
Returns:
types.Consumer
: Consumer implementation for the detected systemerror
: Configuration or connection error
Core Interfaces
types.Consumer
Main interface for consuming messages from message queues.
|
|
Methods:
RegisterHandler
|
|
Registers a handler function for processing individual messages.
RegisterBatchHandler
|
|
Registers a handler function for processing message batches.
Run
|
|
Starts the consumer. Blocks until context is cancelled or an error occurs.
Close
|
|
Closes the consumer and releases resources.
Publisher
|
|
Returns the associated publisher for sending messages.
types.Publisher
Interface for publishing messages to message queues.
|
|
Methods:
Publish
|
|
Publishes a simple message with just a body to the specified destination.
PublishMessage
|
|
Publishes a complete message with headers, correlation ID, etc.
NewMessageBuilder
|
|
Creates a new message builder for constructing messages.
types.Message
Interface representing a message in the system.
|
|
Methods:
ID
|
|
Returns the unique message identifier.
CorrelationID
|
|
Returns the correlation ID for request-response patterns.
ReplyTo
|
|
Returns the reply destination for responses.
Exchange
|
|
Returns the AMQP exchange name (empty for other systems).
RoutingKey
|
|
Returns the message routing key.
Headers
|
|
Returns the message headers as a map.
Body
|
|
Returns the message payload.
Publisher
|
|
Returns the associated publisher for sending replies.
types.MessageBuilder
Builder interface for constructing messages.
|
|
Methods:
All setter methods return the builder instance for method chaining. Call Build()
to create the final message.
types.Transport
Low-level interface for message queue operations.
|
|
Extends Publisher
and HealthChecker
interfaces with connection and consumption methods.
types.HealthChecker
Interface for health monitoring.
|
|
HealthCheck
|
|
Returns the current health status of the component.
types.HealthCheck
Interface representing health check results.
|
|
Methods:
Status
|
|
Returns the health status (Healthy, Unhealthy, Connecting, Closed).
Message
|
|
Returns a human-readable status message.
Timestamp
|
|
Returns when the health check was performed.
Details
|
|
Returns additional health check details.
Handler Functions
types.HandlerFunc
|
|
Function signature for processing individual messages. Return nil
for successful processing or an error to trigger retry/dead letter behavior.
types.BatchHandlerFunc
|
|
Function signature for processing message batches. Return nil
for successful processing of the entire batch.
Health Status Types
types.HealthStatus
|
|
Enumeration of possible health states.
Error Types
types.ConfigurationError
|
|
Represents configuration validation errors.
types.ValidationError
|
|
Contains multiple configuration errors for comprehensive validation reporting.
Package-Specific Types
AMQP Package
amqp.Consumer
Implementation of types.Consumer
for AMQP/RabbitMQ.
amqp.Transport
Implementation of types.Transport
for AMQP/RabbitMQ with channel pooling.
amqp.Message
Implementation of types.Message
for AMQP messages.
Kafka Package
kafka.Consumer
Implementation of types.Consumer
for Apache Kafka.
kafka.Transport
Implementation of types.Transport
for Kafka with consumer groups.
kafka.Message
Implementation of types.Message
for Kafka messages.
kafka.Producer
Kafka-specific producer with additional configuration options.
NATS Package
nats.Consumer
Implementation of types.Consumer
for NATS Core and JetStream.
nats.Transport
Implementation of types.Transport
for NATS.
nats.Message
Implementation of types.Message
for NATS messages.
nats.Producer
NATS-specific producer supporting both Core and JetStream.
AWS Package
aws.Consumer
Implementation of types.Consumer
for AWS SQS.
aws.Transport
Implementation of types.Transport
for SQS with visibility timeout handling.
aws.Message
Implementation of types.Message
for SQS messages.
GCP Package
gcp.Consumer
Implementation of types.Consumer
for GCP Pub/Sub.
gcp.Transport
Implementation of types.Transport
for Pub/Sub with acknowledgment handling.
gcp.Message
Implementation of types.Message
for Pub/Sub messages.
Redis Package
redis.Consumer
Implementation of types.Consumer
for Redis Pub/Sub and Streams.
redis.Transport
Implementation of types.Transport
for Redis.
redis.Message
Implementation of types.Message
for Redis messages.
Configuration Examples
URL Parameters
Different systems support various URL parameters:
AMQP
exchange
: Exchange namerouting_key
: Routing key for messagesdurable
: Queue durability (true/false)auto_delete
: Auto-delete queue (true/false)
Kafka
group_id
: Consumer group IDpartition
: Specific partition (optional)offset
: Starting offset (earliest/latest)
NATS
max_msgs
: Maximum messages in subscriptiondurable
: Durable subscription name (JetStream)
SQS
visibility_timeout
: Message visibility timeoutmax_messages
: Maximum messages per receive
Pub/Sub
max_outstanding_messages
: Flow control limitsubscription_id
: Subscription identifier
Redis
stream
: Use Redis Streams instead of Pub/Subgroup
: Consumer group for streamsconsumer
: Consumer name within group
Performance Tuning
Buffer Configuration
Configure message channel buffers for optimal performance:
|
|
Batch Processing
Configure batch processing parameters:
|
|
Batch size and timeout are transport-specific and configured via URL parameters or transport options.
Best Practices
- Always use context for cancellation in
Run()
methods - Implement proper error handling in handler functions
- Use health checks to monitor connection status
- Configure appropriate timeouts for your use case
- Handle graceful shutdown with
defer Close()
- Use correlation IDs for request-response patterns
- Set appropriate batch sizes for high-throughput scenarios