Redis
Redis Pub/Sub and Streams messaging with in-memory performance and optional persistence
redis
|
|
Index
- func NewMessageBuilder() mqtypes.MessageBuilder
- func NewRedisConsumer(config *viper.Viper) (types.Consumer, error)
- func NewRedisProducer(config *viper.Viper) (types.Producer, error)
- func NewRedisTransport() mqtypes.Transport
- type ConfigurationError
- type ConfigurationErrors
func NewMessageBuilder
|
|
NewMessageBuilder creates a new Redis message builder. The builder provides a fluent interface for constructing Redis messages with support for both Pub/Sub and Streams modes.
Redis message considerations:
- Pub/Sub: Messages are ephemeral, no size limits beyond memory
- Streams: Messages are persistent with configurable retention
- Both modes serialize the entire message structure to JSON
Example:
msg := redis.NewMessageBuilder().
WithBody([]byte(`{"action": "process", "itemId": 789}`)).
WithContentType("application/json").
WithCorrelationId("task-789").
WithRoutingKey("*"). // Stream ID (* = auto-generate)
WithHeaders(map[string]interface{}{
"priority": 1,
"retry_count": 0,
"timestamp": time.Now().Unix(),
}).
Build()
func NewRedisConsumer
|
|
NewRedisConsumer creates a new Redis consumer with the provided configuration. This function is typically called by mqutils.NewConsumer when it detects a Redis URL.
Configuration options:
- url: Redis connection URL (required, e.g., “redis://localhost:6379/0”)
- queue: Channel/key name for Pub/Sub or stream (required)
- handler: Name of registered handler function (default: “redisLogger”)
- mode: Consumer mode - “pubsub” or “stream” (default: auto-detect)
- password: Redis password for authentication (optional)
- username: Redis username for ACL (Redis 6.0+) (optional)
- database: Redis database number (default: 0)
- max_retries: Maximum retry attempts for failed operations (default: 50)
- min_retry_backoff: Min retry backoff in ms (default: 8)
- max_retry_backoff: Max retry backoff in ms (default: 512)
- dial_timeout: Connection timeout in ms (default: 5000)
- read_timeout: Read timeout in ms (default: 3000)
- write_timeout: Write timeout in ms (default: 3000)
- pool_size: Connection pool size (default: 10)
- min_idle_conns: Min idle connections in pool (default: 5)
- consumer_group: Redis Streams consumer group name (stream mode only)
- consumer_name: Redis Streams consumer name (stream mode only)
- batch_size: Number of messages per batch (default: 5)
- batch_timeout: Batch collection timeout in ms (default: 100)
- enable_batch_processing: Enable batch message processing (default: false)
Returns an error if configuration validation fails or transport creation fails.
func NewRedisProducer
|
|
NewRedisProducer creates a new Redis producer with the provided configuration. This function is typically called by mqutils.NewProducer when it detects a Redis URL.
Configuration options:
- url: Redis connection URL (required, e.g., “redis://localhost:6379/0”)
- channel_prefix: Prefix to add to all channel/stream names (optional)
- mode: Producer mode - “pubsub” or “stream” (default: “pubsub”)
- password: Redis password for authentication (optional)
- username: Redis username for ACL (Redis 6.0+) (optional)
- database: Redis database number (default: 0)
- dial_timeout: Connection timeout in ms (default: 5000)
- read_timeout: Read timeout in ms (default: 3000)
- write_timeout: Write timeout in ms (default: 3000)
- pool_size: Connection pool size (default: 10)
- min_idle_conns: Min idle connections in pool (default: 5)
- max_len: Max stream length for trimming (stream mode, default: 10000)
- approximate: Use approximate trimming for streams (default: true)
Returns an error if configuration validation fails or transport creation fails. The producer must be started with Start() before publishing messages.
func NewRedisTransport
|
|
NewRedisTransport creates a new Redis transport implementation. The transport provides low-level Redis operations for both Pub/Sub and Streams modes, with automatic mode detection based on URL scheme.
The transport manages:
- Redis UniversalClient supporting single node, sentinel, and cluster
- Connection pooling with configurable limits
- Automatic reconnection with exponential backoff
- Mode switching between Pub/Sub and Streams
- Health monitoring with server statistics
Supported URL schemes:
- redis:// - Standard Redis with Pub/Sub mode
- rediss:// - Redis with TLS
- redisstream:// - Redis Streams mode
- redisstreams:// - Redis Streams with TLS
type ConfigurationError
ConfigurationError represents a validation error with specific field and message
|
|
func (*ConfigurationError) Error
|
|
type ConfigurationErrors
ConfigurationErrors represents multiple validation errors
|
|
func (*ConfigurationErrors) Add
|
|
Add adds a new configuration error to the collection
func (*ConfigurationErrors) Error
|
|
func (*ConfigurationErrors) HasErrors
|
|
HasErrors returns true if there are any validation errors
Generated by gomarkdoc