Redis

Redis Pub/Sub and Streams messaging with in-memory performance and optional persistence

redis

1
import "gitlab.com/digitalxero/mqutils/redis"

Index

func NewMessageBuilder

1
func NewMessageBuilder() mqtypes.MessageBuilder

NewMessageBuilder creates a new Redis message builder. The builder provides a fluent interface for constructing Redis messages with support for both Pub/Sub and Streams modes.

Redis message considerations:

  • Pub/Sub: Messages are ephemeral, no size limits beyond memory
  • Streams: Messages are persistent with configurable retention
  • Both modes serialize the entire message structure to JSON

Example:

msg := redis.NewMessageBuilder().
    WithBody([]byte(`{"action": "process", "itemId": 789}`)).
    WithContentType("application/json").
    WithCorrelationId("task-789").
    WithRoutingKey("*"). // Stream ID (* = auto-generate)
    WithHeaders(map[string]interface{}{
        "priority": 1,
        "retry_count": 0,
        "timestamp": time.Now().Unix(),
    }).
    Build()

func NewRedisConsumer

1
func NewRedisConsumer(config *viper.Viper) (types.Consumer, error)

NewRedisConsumer creates a new Redis consumer with the provided configuration. This function is typically called by mqutils.NewConsumer when it detects a Redis URL.

Configuration options:

  • url: Redis connection URL (required, e.g., “redis://localhost:6379/0”)
  • queue: Channel/key name for Pub/Sub or stream (required)
  • handler: Name of registered handler function (default: “redisLogger”)
  • mode: Consumer mode - “pubsub” or “stream” (default: auto-detect)
  • password: Redis password for authentication (optional)
  • username: Redis username for ACL (Redis 6.0+) (optional)
  • database: Redis database number (default: 0)
  • max_retries: Maximum retry attempts for failed operations (default: 50)
  • min_retry_backoff: Min retry backoff in ms (default: 8)
  • max_retry_backoff: Max retry backoff in ms (default: 512)
  • dial_timeout: Connection timeout in ms (default: 5000)
  • read_timeout: Read timeout in ms (default: 3000)
  • write_timeout: Write timeout in ms (default: 3000)
  • pool_size: Connection pool size (default: 10)
  • min_idle_conns: Min idle connections in pool (default: 5)
  • consumer_group: Redis Streams consumer group name (stream mode only)
  • consumer_name: Redis Streams consumer name (stream mode only)
  • batch_size: Number of messages per batch (default: 5)
  • batch_timeout: Batch collection timeout in ms (default: 100)
  • enable_batch_processing: Enable batch message processing (default: false)

Returns an error if configuration validation fails or transport creation fails.

func NewRedisProducer

1
func NewRedisProducer(config *viper.Viper) (types.Producer, error)

NewRedisProducer creates a new Redis producer with the provided configuration. This function is typically called by mqutils.NewProducer when it detects a Redis URL.

Configuration options:

  • url: Redis connection URL (required, e.g., “redis://localhost:6379/0”)
  • channel_prefix: Prefix to add to all channel/stream names (optional)
  • mode: Producer mode - “pubsub” or “stream” (default: “pubsub”)
  • password: Redis password for authentication (optional)
  • username: Redis username for ACL (Redis 6.0+) (optional)
  • database: Redis database number (default: 0)
  • dial_timeout: Connection timeout in ms (default: 5000)
  • read_timeout: Read timeout in ms (default: 3000)
  • write_timeout: Write timeout in ms (default: 3000)
  • pool_size: Connection pool size (default: 10)
  • min_idle_conns: Min idle connections in pool (default: 5)
  • max_len: Max stream length for trimming (stream mode, default: 10000)
  • approximate: Use approximate trimming for streams (default: true)

Returns an error if configuration validation fails or transport creation fails. The producer must be started with Start() before publishing messages.

func NewRedisTransport

1
func NewRedisTransport() mqtypes.Transport

NewRedisTransport creates a new Redis transport implementation. The transport provides low-level Redis operations for both Pub/Sub and Streams modes, with automatic mode detection based on URL scheme.

The transport manages:

  • Redis UniversalClient supporting single node, sentinel, and cluster
  • Connection pooling with configurable limits
  • Automatic reconnection with exponential backoff
  • Mode switching between Pub/Sub and Streams
  • Health monitoring with server statistics

Supported URL schemes:

  • redis:// - Standard Redis with Pub/Sub mode
  • rediss:// - Redis with TLS
  • redisstream:// - Redis Streams mode
  • redisstreams:// - Redis Streams with TLS

type ConfigurationError

ConfigurationError represents a validation error with specific field and message

1
2
3
4
type ConfigurationError struct {
    Field   string
    Message string
}

func (*ConfigurationError) Error

1
func (e *ConfigurationError) Error() string

type ConfigurationErrors

ConfigurationErrors represents multiple validation errors

1
2
3
type ConfigurationErrors struct {
    Errors []*ConfigurationError
}

func (*ConfigurationErrors) Add

1
func (e *ConfigurationErrors) Add(field, message string)

Add adds a new configuration error to the collection

func (*ConfigurationErrors) Error

1
func (e *ConfigurationErrors) Error() string

func (*ConfigurationErrors) HasErrors

1
func (e *ConfigurationErrors) HasErrors() bool

HasErrors returns true if there are any validation errors

Generated by gomarkdoc