Kafka
Apache Kafka event streaming implementation with consumer groups, partitions, and SASL/TLS support
kafka
|
|
Index
- func AcknowledgeMessage(ctx context.Context, message types.Message) error
- func NacknowledgeMessage(ctx context.Context, message types.Message) error
- func NewKafkaConsumer(config *viper.Viper) (types.Consumer, error)
- func NewKafkaProducer(config *viper.Viper) (types.Producer, error)
- func NewKafkaTransport() types.Transport
- func NewMessageBuilder() types.MessageBuilder
- type ConfigurationError
- type ConfigurationErrors
func AcknowledgeMessage
|
|
AcknowledgeMessage provides a robust way for handlers to acknowledge messages with retry logic This function is exported to allow handlers to use the same acknowledgment retry mechanism
func NacknowledgeMessage
|
|
NacknowledgeMessage provides a robust way for handlers to nack messages with retry logic This function is exported to allow handlers to use the same nack retry mechanism
func NewKafkaConsumer
|
|
NewKafkaConsumer creates a new Apache Kafka consumer with the provided configuration. This function is typically called by mqutils.NewConsumer when it detects a Kafka URL.
Configuration options:
- url: Kafka broker URLs (required, e.g., “kafka://broker1:9092,broker2:9092”)
- queue: Topic name to consume from (required)
- consumer_group: Consumer group ID (default: “mqutils-default-group”)
- handler: Name of registered handler function (default: “kafkaLogger”)
- sasl_mechanism: SASL mechanism - PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 (optional)
- sasl_username: SASL username for authentication (optional)
- sasl_password: SASL password for authentication (optional)
- tls_enabled: Enable TLS encryption (default: false)
- skip_verify: Skip TLS certificate verification (default: false)
- retry_topic: Topic for message retries (optional)
- retry_max_retries: Max retry attempts (default: 50)
- session_timeout: Session timeout in ms (default: 30000)
- heartbeat_interval: Heartbeat interval in ms (default: 3000)
- auto_offset_reset: Where to start reading - earliest/latest (default: latest)
- enable_auto_commit: Auto-commit offsets (default: true)
- batch_size: Number of messages per batch (default: 5)
- batch_timeout: Batch collection timeout in ms (default: 100)
- enable_batch_processing: Enable batch message processing (default: false)
Returns an error if configuration validation fails or transport creation fails.
func NewKafkaProducer
|
|
NewKafkaProducer creates a new Apache Kafka producer with the provided configuration. This function is typically called by mqutils.NewProducer when it detects a Kafka URL.
Configuration options:
- url: Kafka broker URLs (required, e.g., “kafka://broker1:9092,broker2:9092”)
- topic_prefix: Prefix to add to all topic names (optional)
- sasl_mechanism: SASL mechanism - PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 (optional)
- sasl_username: SASL username for authentication (optional)
- sasl_password: SASL password for authentication (optional)
- tls_enabled: Enable TLS encryption (default: false)
- skip_verify: Skip TLS certificate verification (default: false)
- compression_type: Compression - none, gzip, snappy, lz4 (default: snappy)
- required_acks: Ack mode - all, 1, 0 (default: all)
- max_retries: Max retry attempts for failed sends (default: 3)
- retry_backoff: Retry backoff in ms (default: 100)
- flush_frequency: Flush interval in ms (default: 10)
- flush_messages: Messages before flush (default: 100)
- flush_bytes: Bytes before flush (default: 1048576)
- partitioner: Partitioning strategy - hash, random, manual (default: hash)
Returns an error if configuration validation fails or transport creation fails. The producer must be started with Start() before publishing messages.
func NewKafkaTransport
|
|
NewKafkaTransport creates a new Kafka transport implementation. The transport provides low-level Kafka operations including connection management, producer/consumer creation, and message operations.
The transport manages:
- Kafka client connections with automatic broker discovery
- Producer instances for publishing
- Consumer group coordination
- SASL authentication and TLS encryption
- Connection health monitoring
Unlike AMQP, Kafka doesn’t use channels, so this transport manages Sarama client and producer instances directly.
func NewMessageBuilder
|
|
NewMessageBuilder creates a new Kafka message builder. The builder provides a fluent interface for constructing Kafka messages with all supported properties and attributes.
Example:
msg := kafka.NewMessageBuilder().
WithBody([]byte(`{"event": "order.placed"}`)).
WithContentType("application/json").
WithRoutingKey("order-123"). // Used as partition key
WithHeaders(map[string]interface{}{
"source": "order-service",
"version": "2.0",
}).
Build()
type ConfigurationError
ConfigurationError represents a validation error with specific field and message
|
|
func (*ConfigurationError) Error
|
|
type ConfigurationErrors
ConfigurationErrors represents multiple validation errors
|
|
func (*ConfigurationErrors) Add
|
|
Add adds a new configuration error to the collection
func (*ConfigurationErrors) Error
|
|
func (*ConfigurationErrors) HasErrors
|
|
HasErrors returns true if there are any validation errors
Generated by gomarkdoc