GCP Pub/Sub
Google Cloud Pub/Sub messaging with exactly-once delivery and message ordering support
gcp
|
|
Index
- func NewMessageBuilder() mqtypes.MessageBuilder
- func NewPubSubConsumer(config *viper.Viper) (types.Consumer, error)
- func NewPubSubProducer(config *viper.Viper) (types.Producer, error)
- func NewPubSubTransport() mqtypes.Transport
- type ConfigurationError
- type ConfigurationErrors
func NewMessageBuilder
|
|
NewMessageBuilder creates a new Pub/Sub message builder. The builder provides a fluent interface for constructing Pub/Sub messages with all supported properties and attributes.
Pub/Sub message limitations:
- Message body: max 10MB
- Attributes: max 100 attributes
- Attribute key: max 256 bytes
- Attribute value: max 1024 bytes
- Ordering key: max 1024 bytes
Example:
msg := gcp.NewMessageBuilder().
WithBody([]byte(`{"event": "user.signup", "userId": "123"}`)).
WithContentType("application/json").
WithCorrelationId("signup-123").
WithRoutingKey("user-123"). // Used as ordering key
WithHeaders(map[string]interface{}{
"source": "web-app",
"version": "1.2.0",
"region": "us-central1",
}).
Build()
func NewPubSubConsumer
|
|
NewPubSubConsumer creates a new GCP Pub/Sub consumer with the provided configuration. This function is typically called by mqutils.NewConsumer when it detects a Pub/Sub URL.
Configuration options:
- url: Pub/Sub URL (required, e.g., “pubsub://project-id/subscription-name”)
- queue: Subscription name (extracted from URL if not provided)
- handler: Name of registered handler function (default: “pubsubLogger”)
- project_id: GCP project ID (extracted from URL or uses default)
- credentials_file: Path to service account JSON file (optional)
- max_retries: Maximum retry attempts for failed messages (default: 50)
- max_concurrent_handlers: Max concurrent message handlers (default: 10)
- max_extension: Max message deadline extension in seconds (default: 300)
- ack_deadline: Message acknowledgment deadline in seconds (default: 60)
- enable_ordering: Enable message ordering by key (default: false)
- dead_letter_topic: Topic for undeliverable messages (optional)
- dead_letter_max_attempts: Max attempts before dead lettering (default: 5)
- batch_size: Number of messages per batch (default: 5)
- batch_timeout: Batch collection timeout in ms (default: 100)
- enable_batch_processing: Enable batch message processing (default: false)
Returns an error if configuration validation fails or transport creation fails.
func NewPubSubProducer
|
|
NewPubSubProducer creates a new GCP Pub/Sub producer with the provided configuration. This function is typically called by mqutils.NewProducer when it detects a Pub/Sub URL.
Configuration options:
- url: Pub/Sub URL (required, e.g., “pubsub://project-id/topic-name”)
- topic_prefix: Prefix to add to all topic names (optional)
- project_id: GCP project ID (extracted from URL or uses default)
- credentials_file: Path to service account JSON file (optional)
- enable_ordering: Enable message ordering by key (default: false)
- enable_message_ordering: Alternative flag for ordering (default: false)
- max_publish_delay: Max delay before batch publish in ms (default: 10)
- max_messages: Max messages per batch (default: 100)
- max_bytes: Max bytes per batch (default: 1000000/1MB)
- enable_compression: Enable message compression (default: false)
Returns an error if configuration validation fails or transport creation fails. The producer must be started with Start() before publishing messages.
func NewPubSubTransport
|
|
NewPubSubTransport creates a new GCP Pub/Sub transport implementation. The transport provides low-level Pub/Sub operations including topic/subscription management, message publishing/receiving, and health monitoring.
The transport manages:
- GCP Pub/Sub client with automatic credential resolution
- Topic and subscription lifecycle management
- Message ordering and exactly-once delivery settings
- Dead letter topic configuration
- Flow control and concurrency settings
- Connection health monitoring
Pub/Sub uses gRPC for communication with automatic reconnection and retry logic built into the client library.
type ConfigurationError
ConfigurationError represents a validation error with specific field and message
|
|
func (*ConfigurationError) Error
|
|
type ConfigurationErrors
ConfigurationErrors represents multiple validation errors
|
|
func (*ConfigurationErrors) Add
|
|
Add adds a new configuration error to the collection
func (*ConfigurationErrors) Error
|
|
func (*ConfigurationErrors) HasErrors
|
|
HasErrors returns true if there are any validation errors
Generated by gomarkdoc