GCP Pub/Sub

Google Cloud Pub/Sub messaging with exactly-once delivery and message ordering support

gcp

1
import "gitlab.com/digitalxero/mqutils/gcp"

Index

func NewMessageBuilder

1
func NewMessageBuilder() mqtypes.MessageBuilder

NewMessageBuilder creates a new Pub/Sub message builder. The builder provides a fluent interface for constructing Pub/Sub messages with all supported properties and attributes.

Pub/Sub message limitations:

  • Message body: max 10MB
  • Attributes: max 100 attributes
  • Attribute key: max 256 bytes
  • Attribute value: max 1024 bytes
  • Ordering key: max 1024 bytes

Example:

msg := gcp.NewMessageBuilder().
    WithBody([]byte(`{"event": "user.signup", "userId": "123"}`)).
    WithContentType("application/json").
    WithCorrelationId("signup-123").
    WithRoutingKey("user-123"). // Used as ordering key
    WithHeaders(map[string]interface{}{
        "source": "web-app",
        "version": "1.2.0",
        "region": "us-central1",
    }).
    Build()

func NewPubSubConsumer

1
func NewPubSubConsumer(config *viper.Viper) (types.Consumer, error)

NewPubSubConsumer creates a new GCP Pub/Sub consumer with the provided configuration. This function is typically called by mqutils.NewConsumer when it detects a Pub/Sub URL.

Configuration options:

  • url: Pub/Sub URL (required, e.g., “pubsub://project-id/subscription-name”)
  • queue: Subscription name (extracted from URL if not provided)
  • handler: Name of registered handler function (default: “pubsubLogger”)
  • project_id: GCP project ID (extracted from URL or uses default)
  • credentials_file: Path to service account JSON file (optional)
  • max_retries: Maximum retry attempts for failed messages (default: 50)
  • max_concurrent_handlers: Max concurrent message handlers (default: 10)
  • max_extension: Max message deadline extension in seconds (default: 300)
  • ack_deadline: Message acknowledgment deadline in seconds (default: 60)
  • enable_ordering: Enable message ordering by key (default: false)
  • dead_letter_topic: Topic for undeliverable messages (optional)
  • dead_letter_max_attempts: Max attempts before dead lettering (default: 5)
  • batch_size: Number of messages per batch (default: 5)
  • batch_timeout: Batch collection timeout in ms (default: 100)
  • enable_batch_processing: Enable batch message processing (default: false)

Returns an error if configuration validation fails or transport creation fails.

func NewPubSubProducer

1
func NewPubSubProducer(config *viper.Viper) (types.Producer, error)

NewPubSubProducer creates a new GCP Pub/Sub producer with the provided configuration. This function is typically called by mqutils.NewProducer when it detects a Pub/Sub URL.

Configuration options:

  • url: Pub/Sub URL (required, e.g., “pubsub://project-id/topic-name”)
  • topic_prefix: Prefix to add to all topic names (optional)
  • project_id: GCP project ID (extracted from URL or uses default)
  • credentials_file: Path to service account JSON file (optional)
  • enable_ordering: Enable message ordering by key (default: false)
  • enable_message_ordering: Alternative flag for ordering (default: false)
  • max_publish_delay: Max delay before batch publish in ms (default: 10)
  • max_messages: Max messages per batch (default: 100)
  • max_bytes: Max bytes per batch (default: 1000000/1MB)
  • enable_compression: Enable message compression (default: false)

Returns an error if configuration validation fails or transport creation fails. The producer must be started with Start() before publishing messages.

func NewPubSubTransport

1
func NewPubSubTransport() mqtypes.Transport

NewPubSubTransport creates a new GCP Pub/Sub transport implementation. The transport provides low-level Pub/Sub operations including topic/subscription management, message publishing/receiving, and health monitoring.

The transport manages:

  • GCP Pub/Sub client with automatic credential resolution
  • Topic and subscription lifecycle management
  • Message ordering and exactly-once delivery settings
  • Dead letter topic configuration
  • Flow control and concurrency settings
  • Connection health monitoring

Pub/Sub uses gRPC for communication with automatic reconnection and retry logic built into the client library.

type ConfigurationError

ConfigurationError represents a validation error with specific field and message

1
2
3
4
type ConfigurationError struct {
    Field   string
    Message string
}

func (*ConfigurationError) Error

1
func (e *ConfigurationError) Error() string

type ConfigurationErrors

ConfigurationErrors represents multiple validation errors

1
2
3
type ConfigurationErrors struct {
    Errors []*ConfigurationError
}

func (*ConfigurationErrors) Add

1
func (e *ConfigurationErrors) Add(field, message string)

Add adds a new configuration error to the collection

func (*ConfigurationErrors) Error

1
func (e *ConfigurationErrors) Error() string

func (*ConfigurationErrors) HasErrors

1
func (e *ConfigurationErrors) HasErrors() bool

HasErrors returns true if there are any validation errors

Generated by gomarkdoc