Skip to content

Commit a780f83

Browse files
committed
mqtt transport v2
Signed-off-by: Wei Liu <[email protected]>
1 parent a979478 commit a780f83

File tree

12 files changed

+1090
-102
lines changed

12 files changed

+1090
-102
lines changed

pkg/cloudevents/generic/options/builder/optionsbuilder.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
99
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt"
1010
grpcv2 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/v2/grpc"
11+
mqttv2 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/v2/mqtt"
1112
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
1213
)
1314

@@ -56,7 +57,7 @@ func BuildCloudEventsSourceOptions(config any,
5657
clientId, sourceId string, dataType types.CloudEventsDataType) (*options.CloudEventsSourceOptions, error) {
5758
switch config := config.(type) {
5859
case *mqtt.MQTTOptions:
59-
return mqtt.NewSourceOptions(config, clientId, sourceId), nil
60+
return mqttv2.NewSourceOptions(config, clientId, sourceId), nil
6061
case *grpc.GRPCOptions:
6162
return grpcv2.NewSourceOptions(config, sourceId, dataType), nil
6263
default:
@@ -69,7 +70,7 @@ func BuildCloudEventsAgentOptions(config any,
6970
clusterName, clientId string, dataType types.CloudEventsDataType) (*options.CloudEventsAgentOptions, error) {
7071
switch config := config.(type) {
7172
case *mqtt.MQTTOptions:
72-
return mqtt.NewAgentOptions(config, clusterName, clientId), nil
73+
return mqttv2.NewAgentOptions(config, clusterName, clientId), nil
7374
case *grpc.GRPCOptions:
7475
return grpcv2.NewAgentOptions(config, clusterName, clientId, dataType), nil
7576
default:

pkg/cloudevents/generic/options/builder/optionsbuilder_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func TestBuildCloudEventsSourceOptions(t *testing.T) {
5656
Timeout: 60 * time.Second,
5757
},
5858
},
59-
expectedTransportType: "*mqtt.mqttSourceTransport",
59+
expectedTransportType: "*mqtt.mqttTransport",
6060
},
6161
{
6262
name: "grpc config",

pkg/cloudevents/generic/options/mqtt/agentoptions.go

Lines changed: 70 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type mqttAgentTransport struct {
2424
agentID string
2525
}
2626

27+
// Deprecated: use v2.mqtt.NewAgentOptions instead
2728
func NewAgentOptions(mqttOptions *MQTTOptions, clusterName, agentID string) *options.CloudEventsAgentOptions {
2829
mqttAgentOptions := &mqttAgentTransport{
2930
MQTTOptions: *mqttOptions,
@@ -40,8 +41,6 @@ func NewAgentOptions(mqttOptions *MQTTOptions, clusterName, agentID string) *opt
4041
}
4142

4243
func (o *mqttAgentTransport) WithContext(ctx context.Context, evtCtx cloudevents.EventContext) (context.Context, error) {
43-
logger := klog.FromContext(ctx)
44-
4544
topic, err := getAgentPubTopic(ctx)
4645
if err != nil {
4746
return nil, err
@@ -51,59 +50,18 @@ func (o *mqttAgentTransport) WithContext(ctx context.Context, evtCtx cloudevents
5150
return cloudeventscontext.WithTopic(ctx, string(*topic)), nil
5251
}
5352

54-
eventType, err := types.ParseCloudEventsType(evtCtx.GetType())
55-
if err != nil {
56-
return nil, fmt.Errorf("unsupported event type %s, %v", eventType, err)
57-
}
58-
59-
originalSource, err := evtCtx.GetExtension(types.ExtensionOriginalSource)
60-
if err != nil {
61-
return nil, err
62-
}
63-
64-
// agent request to sync resource spec from all sources
65-
if eventType.Action == types.ResyncRequestAction && originalSource == types.SourceAll {
66-
if len(o.Topics.AgentBroadcast) == 0 {
67-
logger.Info("the agent broadcast topic not set, fall back to the agent events topic")
68-
69-
// TODO after supporting multiple sources, we should list each source
70-
eventsTopic := replaceLast(o.Topics.AgentEvents, "+", o.clusterName)
71-
return cloudeventscontext.WithTopic(ctx, eventsTopic), nil
72-
}
73-
74-
resyncTopic := strings.Replace(o.Topics.AgentBroadcast, "+", o.clusterName, 1)
75-
return cloudeventscontext.WithTopic(ctx, resyncTopic), nil
76-
}
77-
78-
topicSource, err := getSourceFromEventsTopic(o.Topics.AgentEvents)
53+
pubTopic, err := AgentPubTopic(ctx, &o.MQTTOptions, o.clusterName, evtCtx)
7954
if err != nil {
8055
return nil, err
8156
}
8257

83-
// agent publishes status events or spec resync events
84-
eventsTopic := replaceLast(o.Topics.AgentEvents, "+", o.clusterName)
85-
eventsTopic = replaceLast(eventsTopic, "+", topicSource)
86-
return cloudeventscontext.WithTopic(ctx, eventsTopic), nil
58+
return cloudeventscontext.WithTopic(ctx, pubTopic), nil
8759
}
8860

8961
func (o *mqttAgentTransport) Connect(ctx context.Context) error {
90-
subscribe := &paho.Subscribe{
91-
Subscriptions: []paho.SubscribeOptions{
92-
{
93-
// TODO support multiple sources, currently the client require the source events topic has a sourceID, in
94-
// the future, client may need a source list, it will subscribe to each source
95-
// receiving the sources events
96-
Topic: replaceLast(o.Topics.SourceEvents, "+", o.clusterName), QoS: byte(o.SubQoS),
97-
},
98-
},
99-
}
100-
101-
// receiving status resync events from all sources
102-
if len(o.Topics.SourceBroadcast) != 0 {
103-
subscribe.Subscriptions = append(subscribe.Subscriptions, paho.SubscribeOptions{
104-
Topic: o.Topics.SourceBroadcast,
105-
QoS: byte(o.SubQoS),
106-
})
62+
subscribe, err := AgentSubscribe(&o.MQTTOptions, o.clusterName)
63+
if err != nil {
64+
return err
10765
}
10866

10967
protocol, err := o.GetCloudEventsProtocol(
@@ -157,3 +115,67 @@ func (o *mqttAgentTransport) Close(ctx context.Context) error {
157115
func (o *mqttAgentTransport) ErrorChan() <-chan error {
158116
return o.errorChan
159117
}
118+
119+
func AgentPubTopic(ctx context.Context, o *MQTTOptions, clusterName string, evtCtx cloudevents.EventContext) (string, error) {
120+
logger := klog.FromContext(ctx)
121+
122+
ceType := evtCtx.GetType()
123+
eventType, err := types.ParseCloudEventsType(ceType)
124+
if err != nil {
125+
return "", fmt.Errorf("unsupported event type %q, %v", ceType, err)
126+
}
127+
128+
originalSourceVal, err := evtCtx.GetExtension(types.ExtensionOriginalSource)
129+
if err != nil {
130+
return "", err
131+
}
132+
133+
originalSource, ok := originalSourceVal.(string)
134+
if !ok {
135+
return "", fmt.Errorf("originalsource extension must be a string, got %T", originalSourceVal)
136+
}
137+
138+
// agent request to sync resource spec from all sources
139+
if eventType.Action == types.ResyncRequestAction && originalSource == types.SourceAll {
140+
if len(o.Topics.AgentBroadcast) == 0 {
141+
logger.Info("the agent broadcast topic not set, fall back to the agent events topic")
142+
143+
// TODO after supporting multiple sources, we should list each source
144+
return replaceLast(o.Topics.AgentEvents, "+", clusterName), nil
145+
}
146+
147+
return strings.Replace(o.Topics.AgentBroadcast, "+", clusterName, 1), nil
148+
}
149+
150+
topicSource, err := getSourceFromEventsTopic(o.Topics.AgentEvents)
151+
if err != nil {
152+
return "", err
153+
}
154+
155+
// agent publishes status events or spec resync events
156+
eventsTopic := replaceLast(o.Topics.AgentEvents, "+", clusterName)
157+
return replaceLast(eventsTopic, "+", topicSource), nil
158+
}
159+
160+
func AgentSubscribe(o *MQTTOptions, clusterName string) (*paho.Subscribe, error) {
161+
subscribe := &paho.Subscribe{
162+
Subscriptions: []paho.SubscribeOptions{
163+
{
164+
// TODO support multiple sources, currently the client require the source events topic has a sourceID, in
165+
// the future, client may need a source list, it will subscribe to each source
166+
// receiving the sources events
167+
Topic: replaceLast(o.Topics.SourceEvents, "+", clusterName), QoS: byte(o.SubQoS),
168+
},
169+
},
170+
}
171+
172+
// receiving status resync events from all sources
173+
if len(o.Topics.SourceBroadcast) != 0 {
174+
subscribe.Subscriptions = append(subscribe.Subscriptions, paho.SubscribeOptions{
175+
Topic: o.Topics.SourceBroadcast,
176+
QoS: byte(o.SubQoS),
177+
})
178+
}
179+
180+
return subscribe, nil
181+
}

pkg/cloudevents/generic/options/mqtt/logger.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package mqtt
22

33
import (
44
"fmt"
5+
56
"github.com/eclipse/paho.golang/paho/log"
67
"k8s.io/klog/v2"
78
)
@@ -14,8 +15,13 @@ type PahoDebugLogger struct {
1415
logger klog.Logger
1516
}
1617

17-
var _ log.Logger = &PahoErrorLogger{}
18-
var _ log.Logger = &PahoDebugLogger{}
18+
func NewPahoErrorLogger(logger klog.Logger) log.Logger {
19+
return &PahoErrorLogger{logger: logger}
20+
}
21+
22+
func NewPahoDebugLogger(logger klog.Logger) log.Logger {
23+
return &PahoDebugLogger{logger: logger}
24+
}
1925

2026
func (l *PahoErrorLogger) Println(v ...interface{}) {
2127
l.logger.Error(fmt.Errorf("get err %s", fmt.Sprint(v...)), "MQTT error message")

pkg/cloudevents/generic/options/mqtt/options.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@ import (
44
"context"
55
"crypto/tls"
66
"fmt"
7-
"k8s.io/klog/v2"
87
"net"
98
"os"
109
"regexp"
1110
"strings"
1211
"time"
1312

13+
"k8s.io/klog/v2"
14+
1415
cloudeventsmqtt "github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2"
1516
"github.com/eclipse/paho.golang/packets"
1617
"github.com/eclipse/paho.golang/paho"
@@ -235,8 +236,8 @@ func (o *MQTTOptions) GetCloudEventsProtocol(
235236

236237
opts := []cloudeventsmqtt.Option{
237238
cloudeventsmqtt.WithConnect(o.GetMQTTConnectOption(clientID)),
238-
cloudeventsmqtt.WithDebugLogger(&PahoDebugLogger{logger: logger}),
239-
cloudeventsmqtt.WithErrorLogger(&PahoErrorLogger{logger: logger}),
239+
cloudeventsmqtt.WithDebugLogger(NewPahoDebugLogger(logger)),
240+
cloudeventsmqtt.WithErrorLogger(NewPahoErrorLogger(logger)),
240241
}
241242
opts = append(opts, clientOpts...)
242243
return cloudeventsmqtt.New(ctx, config, opts...)

pkg/cloudevents/generic/options/mqtt/sourceoptions.go

Lines changed: 66 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type mqttSourceTransport struct {
2323
clientID string
2424
}
2525

26+
// Deprecated: use v2.mqtt.NewSourceOptions instead
2627
func NewSourceOptions(mqttOptions *MQTTOptions, clientID, sourceID string) *options.CloudEventsSourceOptions {
2728
mqttSourceOptions := &mqttSourceTransport{
2829
MQTTOptions: *mqttOptions,
@@ -47,58 +48,20 @@ func (o *mqttSourceTransport) WithContext(ctx context.Context, evtCtx cloudevent
4748
return cloudeventscontext.WithTopic(ctx, string(*topic)), nil
4849
}
4950

50-
eventType, err := types.ParseCloudEventsType(evtCtx.GetType())
51-
if err != nil {
52-
return nil, fmt.Errorf("unsupported event type %s, %v", eventType, err)
53-
}
54-
55-
clusterName, err := evtCtx.GetExtension(types.ExtensionClusterName)
51+
pubTopic, err := SourcePubTopic(ctx, &o.MQTTOptions, o.sourceID, evtCtx)
5652
if err != nil {
5753
return nil, err
5854
}
5955

60-
if eventType.Action == types.ResyncRequestAction && clusterName == types.ClusterAll {
61-
// source request to get resources status from all agents
62-
if len(o.Topics.SourceBroadcast) == 0 {
63-
return nil, fmt.Errorf("the source broadcast topic not set")
64-
}
65-
66-
resyncTopic := strings.Replace(o.Topics.SourceBroadcast, "+", o.sourceID, 1)
67-
return cloudeventscontext.WithTopic(ctx, resyncTopic), nil
68-
}
69-
70-
// source publishes spec events or status resync events
71-
eventsTopic := strings.Replace(o.Topics.SourceEvents, "+", fmt.Sprintf("%s", clusterName), 1)
72-
return cloudeventscontext.WithTopic(ctx, eventsTopic), nil
56+
return cloudeventscontext.WithTopic(ctx, pubTopic), nil
7357
}
7458

7559
func (o *mqttSourceTransport) Connect(ctx context.Context) error {
76-
topicSource, err := getSourceFromEventsTopic(o.Topics.AgentEvents)
60+
subscribe, err := SourceSubscribe(&o.MQTTOptions, o.sourceID)
7761
if err != nil {
7862
return err
7963
}
8064

81-
if topicSource != o.sourceID {
82-
return fmt.Errorf("the topic source %q does not match with the client sourceID %q",
83-
o.Topics.AgentEvents, o.sourceID)
84-
}
85-
86-
subscribe := &paho.Subscribe{
87-
Subscriptions: []paho.SubscribeOptions{
88-
{
89-
Topic: o.Topics.AgentEvents, QoS: byte(o.SubQoS),
90-
},
91-
},
92-
}
93-
94-
if len(o.Topics.AgentBroadcast) != 0 {
95-
// receiving spec resync events from all agents
96-
subscribe.Subscriptions = append(subscribe.Subscriptions, paho.SubscribeOptions{
97-
Topic: o.Topics.AgentBroadcast,
98-
QoS: byte(o.SubQoS),
99-
})
100-
}
101-
10265
protocol, err := o.GetCloudEventsProtocol(
10366
ctx,
10467
o.clientID,
@@ -151,3 +114,65 @@ func (o *mqttSourceTransport) Close(ctx context.Context) error {
151114
func (o *mqttSourceTransport) ErrorChan() <-chan error {
152115
return o.errorChan
153116
}
117+
118+
func SourcePubTopic(ctx context.Context, o *MQTTOptions, sourceID string, evtCtx cloudevents.EventContext) (string, error) {
119+
ceType := evtCtx.GetType()
120+
eventType, err := types.ParseCloudEventsType(ceType)
121+
if err != nil {
122+
return "", fmt.Errorf("unsupported event type %q, %v", ceType, err)
123+
}
124+
125+
clusterNameVal, err := evtCtx.GetExtension(types.ExtensionClusterName)
126+
if err != nil {
127+
return "", err
128+
}
129+
130+
clusterName, ok := clusterNameVal.(string)
131+
if !ok {
132+
return "", fmt.Errorf("clustername extension must be a string, got %T", clusterNameVal)
133+
}
134+
135+
if eventType.Action == types.ResyncRequestAction && clusterName == types.ClusterAll {
136+
// source request to get resources status from all agents
137+
if len(o.Topics.SourceBroadcast) == 0 {
138+
return "", fmt.Errorf("the source broadcast topic not set")
139+
}
140+
141+
resyncTopic := strings.Replace(o.Topics.SourceBroadcast, "+", sourceID, 1)
142+
return resyncTopic, nil
143+
}
144+
145+
// source publishes spec events or status resync events
146+
eventsTopic := strings.Replace(o.Topics.SourceEvents, "+", clusterName, 1)
147+
return eventsTopic, nil
148+
}
149+
150+
func SourceSubscribe(o *MQTTOptions, sourceID string) (*paho.Subscribe, error) {
151+
topicSource, err := getSourceFromEventsTopic(o.Topics.AgentEvents)
152+
if err != nil {
153+
return nil, err
154+
}
155+
156+
if topicSource != sourceID {
157+
return nil, fmt.Errorf("the topic source %q does not match the client sourceID %q",
158+
topicSource, sourceID)
159+
}
160+
161+
subscribe := &paho.Subscribe{
162+
Subscriptions: []paho.SubscribeOptions{
163+
{
164+
Topic: o.Topics.AgentEvents, QoS: byte(o.SubQoS),
165+
},
166+
},
167+
}
168+
169+
if len(o.Topics.AgentBroadcast) != 0 {
170+
// receiving spec resync events from all agents
171+
subscribe.Subscriptions = append(subscribe.Subscriptions, paho.SubscribeOptions{
172+
Topic: o.Topics.AgentBroadcast,
173+
QoS: byte(o.SubQoS),
174+
})
175+
}
176+
177+
return subscribe, nil
178+
}

0 commit comments

Comments
 (0)