Kafka

Apache Kafka event streaming implementation with consumer groups, partitions, and SASL/TLS support

kafka

1
import "gitlab.com/digitalxero/mqutils/kafka"

Index

func AcknowledgeMessage

1
func AcknowledgeMessage(ctx context.Context, message types.Message) error

AcknowledgeMessage provides a robust way for handlers to acknowledge messages with retry logic This function is exported to allow handlers to use the same acknowledgment retry mechanism

func NacknowledgeMessage

1
func NacknowledgeMessage(ctx context.Context, message types.Message) error

NacknowledgeMessage provides a robust way for handlers to nack messages with retry logic This function is exported to allow handlers to use the same nack retry mechanism

func NewKafkaConsumer

1
func NewKafkaConsumer(config *viper.Viper) (types.Consumer, error)

NewKafkaConsumer creates a new Apache Kafka consumer with the provided configuration. This function is typically called by mqutils.NewConsumer when it detects a Kafka URL.

Configuration options:

  • url: Kafka broker URLs (required, e.g., “kafka://broker1:9092,broker2:9092”)
  • queue: Topic name to consume from (required)
  • consumer_group: Consumer group ID (default: “mqutils-default-group”)
  • handler: Name of registered handler function (default: “kafkaLogger”)
  • sasl_mechanism: SASL mechanism - PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 (optional)
  • sasl_username: SASL username for authentication (optional)
  • sasl_password: SASL password for authentication (optional)
  • tls_enabled: Enable TLS encryption (default: false)
  • skip_verify: Skip TLS certificate verification (default: false)
  • retry_topic: Topic for message retries (optional)
  • retry_max_retries: Max retry attempts (default: 50)
  • session_timeout: Session timeout in ms (default: 30000)
  • heartbeat_interval: Heartbeat interval in ms (default: 3000)
  • auto_offset_reset: Where to start reading - earliest/latest (default: latest)
  • enable_auto_commit: Auto-commit offsets (default: true)
  • batch_size: Number of messages per batch (default: 5)
  • batch_timeout: Batch collection timeout in ms (default: 100)
  • enable_batch_processing: Enable batch message processing (default: false)

Returns an error if configuration validation fails or transport creation fails.

func NewKafkaProducer

1
func NewKafkaProducer(config *viper.Viper) (types.Producer, error)

NewKafkaProducer creates a new Apache Kafka producer with the provided configuration. This function is typically called by mqutils.NewProducer when it detects a Kafka URL.

Configuration options:

  • url: Kafka broker URLs (required, e.g., “kafka://broker1:9092,broker2:9092”)
  • topic_prefix: Prefix to add to all topic names (optional)
  • sasl_mechanism: SASL mechanism - PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 (optional)
  • sasl_username: SASL username for authentication (optional)
  • sasl_password: SASL password for authentication (optional)
  • tls_enabled: Enable TLS encryption (default: false)
  • skip_verify: Skip TLS certificate verification (default: false)
  • compression_type: Compression - none, gzip, snappy, lz4 (default: snappy)
  • required_acks: Ack mode - all, 1, 0 (default: all)
  • max_retries: Max retry attempts for failed sends (default: 3)
  • retry_backoff: Retry backoff in ms (default: 100)
  • flush_frequency: Flush interval in ms (default: 10)
  • flush_messages: Messages before flush (default: 100)
  • flush_bytes: Bytes before flush (default: 1048576)
  • partitioner: Partitioning strategy - hash, random, manual (default: hash)

Returns an error if configuration validation fails or transport creation fails. The producer must be started with Start() before publishing messages.

func NewKafkaTransport

1
func NewKafkaTransport() types.Transport

NewKafkaTransport creates a new Kafka transport implementation. The transport provides low-level Kafka operations including connection management, producer/consumer creation, and message operations.

The transport manages:

  • Kafka client connections with automatic broker discovery
  • Producer instances for publishing
  • Consumer group coordination
  • SASL authentication and TLS encryption
  • Connection health monitoring

Unlike AMQP, Kafka doesn’t use channels, so this transport manages Sarama client and producer instances directly.

func NewMessageBuilder

1
func NewMessageBuilder() types.MessageBuilder

NewMessageBuilder creates a new Kafka message builder. The builder provides a fluent interface for constructing Kafka messages with all supported properties and attributes.

Example:

msg := kafka.NewMessageBuilder().
    WithBody([]byte(`{"event": "order.placed"}`)).
    WithContentType("application/json").
    WithRoutingKey("order-123"). // Used as partition key
    WithHeaders(map[string]interface{}{
        "source": "order-service",
        "version": "2.0",
    }).
    Build()

type ConfigurationError

ConfigurationError represents a validation error with specific field and message

1
2
3
4
type ConfigurationError struct {
    Field   string
    Message string
}

func (*ConfigurationError) Error

1
func (e *ConfigurationError) Error() string

type ConfigurationErrors

ConfigurationErrors represents multiple validation errors

1
2
3
type ConfigurationErrors struct {
    Errors []*ConfigurationError
}

func (*ConfigurationErrors) Add

1
func (e *ConfigurationErrors) Add(field, message string)

Add adds a new configuration error to the collection

func (*ConfigurationErrors) Error

1
func (e *ConfigurationErrors) Error() string

func (*ConfigurationErrors) HasErrors

1
func (e *ConfigurationErrors) HasErrors() bool

HasErrors returns true if there are any validation errors

Generated by gomarkdoc