1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
| package main
import (
"context"
"fmt"
"log"
"time"
"github.com/spf13/viper"
"gitlab.com/digitalxero/mqutils"
"gitlab.com/digitalxero/mqutils/types"
_ "gitlab.com/digitalxero/mqutils/amqp"
)
// Request-response pattern implementation
func requestResponseExample() {
// Setup responder
go setupResponder()
// Setup requester
time.Sleep(time.Second) // Wait for responder to start
sendRequest()
}
func setupResponder() {
// Register request handler
types.RegisterHandler("request_handler", func(ctx context.Context, msg types.Message) error {
log.Printf("Processing request: %s", string(msg.Body()))
// Process request and send response
if msg.ReplyTo() != "" {
response := fmt.Sprintf("Processed: %s", string(msg.Body()))
return msg.Publisher().Publish(ctx,
msg.CorrelationId(), // correlation ID
"", // exchange (use default)
msg.ReplyTo(), // routing key (reply destination)
"text/plain", // content type
[]byte(response)) // body
}
return nil
})
// Create configuration for responder
config := viper.New()
config.Set("url", "amqp://localhost:5672/requests")
config.Set("handler", "request_handler")
ctx := context.Background()
consumer, err := mqutils.NewConsumer(ctx, config)
if err != nil {
log.Fatal(err)
}
ctx := context.Background()
if err := consumer.Run(ctx); err != nil {
log.Printf("Responder error: %v", err)
}
}
func sendRequest() {
// Register response handler
types.RegisterHandler("response_handler", func(ctx context.Context, msg types.Message) error {
log.Printf("Received response: %s", string(msg.Body()))
return nil
})
// Create configuration for requester
config := viper.New()
config.Set("url", "amqp://localhost:5672/responses")
config.Set("handler", "response_handler")
ctx := context.Background()
consumer, err := mqutils.NewConsumer(ctx, config)
if err != nil {
log.Fatal(err)
}
publisher := consumer.Publisher()
// Send request
err = publisher.Publish(
context.Background(),
"req-123", // correlation ID
"", // exchange (use default)
"requests", // routing key (destination)
"text/plain", // content type
[]byte("Hello, please process this") // body
)
if err != nil {
log.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := consumer.Run(ctx); err != nil {
log.Printf("Requester error: %v", err)
}
}
|