Skip to content

Commit e2e4ea5

Browse files
committed
mqtt transport v2
Signed-off-by: Wei Liu <[email protected]>
1 parent a979478 commit e2e4ea5

File tree

11 files changed

+1074
-102
lines changed

11 files changed

+1074
-102
lines changed

pkg/cloudevents/generic/options/builder/optionsbuilder.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
99
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt"
1010
grpcv2 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/v2/grpc"
11+
mqttv2 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/v2/mqtt"
1112
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
1213
)
1314

@@ -56,7 +57,7 @@ func BuildCloudEventsSourceOptions(config any,
5657
clientId, sourceId string, dataType types.CloudEventsDataType) (*options.CloudEventsSourceOptions, error) {
5758
switch config := config.(type) {
5859
case *mqtt.MQTTOptions:
59-
return mqtt.NewSourceOptions(config, clientId, sourceId), nil
60+
return mqttv2.NewSourceOptions(config, clientId, sourceId), nil
6061
case *grpc.GRPCOptions:
6162
return grpcv2.NewSourceOptions(config, sourceId, dataType), nil
6263
default:
@@ -69,7 +70,7 @@ func BuildCloudEventsAgentOptions(config any,
6970
clusterName, clientId string, dataType types.CloudEventsDataType) (*options.CloudEventsAgentOptions, error) {
7071
switch config := config.(type) {
7172
case *mqtt.MQTTOptions:
72-
return mqtt.NewAgentOptions(config, clusterName, clientId), nil
73+
return mqttv2.NewAgentOptions(config, clusterName, clientId), nil
7374
case *grpc.GRPCOptions:
7475
return grpcv2.NewAgentOptions(config, clusterName, clientId, dataType), nil
7576
default:

pkg/cloudevents/generic/options/builder/optionsbuilder_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func TestBuildCloudEventsSourceOptions(t *testing.T) {
5656
Timeout: 60 * time.Second,
5757
},
5858
},
59-
expectedTransportType: "*mqtt.mqttSourceTransport",
59+
expectedTransportType: "*mqtt.mqttTransport",
6060
},
6161
{
6262
name: "grpc config",

pkg/cloudevents/generic/options/mqtt/agentoptions.go

Lines changed: 63 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type mqttAgentTransport struct {
2424
agentID string
2525
}
2626

27+
// Deprecated: use v2.mqtt.NewAgentOptions instead
2728
func NewAgentOptions(mqttOptions *MQTTOptions, clusterName, agentID string) *options.CloudEventsAgentOptions {
2829
mqttAgentOptions := &mqttAgentTransport{
2930
MQTTOptions: *mqttOptions,
@@ -40,8 +41,6 @@ func NewAgentOptions(mqttOptions *MQTTOptions, clusterName, agentID string) *opt
4041
}
4142

4243
func (o *mqttAgentTransport) WithContext(ctx context.Context, evtCtx cloudevents.EventContext) (context.Context, error) {
43-
logger := klog.FromContext(ctx)
44-
4544
topic, err := getAgentPubTopic(ctx)
4645
if err != nil {
4746
return nil, err
@@ -51,69 +50,23 @@ func (o *mqttAgentTransport) WithContext(ctx context.Context, evtCtx cloudevents
5150
return cloudeventscontext.WithTopic(ctx, string(*topic)), nil
5251
}
5352

54-
eventType, err := types.ParseCloudEventsType(evtCtx.GetType())
55-
if err != nil {
56-
return nil, fmt.Errorf("unsupported event type %s, %v", eventType, err)
57-
}
58-
59-
originalSource, err := evtCtx.GetExtension(types.ExtensionOriginalSource)
60-
if err != nil {
61-
return nil, err
62-
}
63-
64-
// agent request to sync resource spec from all sources
65-
if eventType.Action == types.ResyncRequestAction && originalSource == types.SourceAll {
66-
if len(o.Topics.AgentBroadcast) == 0 {
67-
logger.Info("the agent broadcast topic not set, fall back to the agent events topic")
68-
69-
// TODO after supporting multiple sources, we should list each source
70-
eventsTopic := replaceLast(o.Topics.AgentEvents, "+", o.clusterName)
71-
return cloudeventscontext.WithTopic(ctx, eventsTopic), nil
72-
}
73-
74-
resyncTopic := strings.Replace(o.Topics.AgentBroadcast, "+", o.clusterName, 1)
75-
return cloudeventscontext.WithTopic(ctx, resyncTopic), nil
76-
}
77-
78-
topicSource, err := getSourceFromEventsTopic(o.Topics.AgentEvents)
53+
pubTopic, err := AgentPubTopic(ctx, &o.MQTTOptions, o.clusterName, evtCtx)
7954
if err != nil {
8055
return nil, err
8156
}
8257

83-
// agent publishes status events or spec resync events
84-
eventsTopic := replaceLast(o.Topics.AgentEvents, "+", o.clusterName)
85-
eventsTopic = replaceLast(eventsTopic, "+", topicSource)
86-
return cloudeventscontext.WithTopic(ctx, eventsTopic), nil
58+
return cloudeventscontext.WithTopic(ctx, pubTopic), nil
8759
}
8860

8961
func (o *mqttAgentTransport) Connect(ctx context.Context) error {
90-
subscribe := &paho.Subscribe{
91-
Subscriptions: []paho.SubscribeOptions{
92-
{
93-
// TODO support multiple sources, currently the client require the source events topic has a sourceID, in
94-
// the future, client may need a source list, it will subscribe to each source
95-
// receiving the sources events
96-
Topic: replaceLast(o.Topics.SourceEvents, "+", o.clusterName), QoS: byte(o.SubQoS),
97-
},
98-
},
99-
}
100-
101-
// receiving status resync events from all sources
102-
if len(o.Topics.SourceBroadcast) != 0 {
103-
subscribe.Subscriptions = append(subscribe.Subscriptions, paho.SubscribeOptions{
104-
Topic: o.Topics.SourceBroadcast,
105-
QoS: byte(o.SubQoS),
106-
})
107-
}
108-
10962
protocol, err := o.GetCloudEventsProtocol(
11063
ctx,
11164
fmt.Sprintf("%s-client", o.agentID),
11265
func(err error) {
11366
o.errorChan <- err
11467
},
11568
cloudeventsmqtt.WithPublish(&paho.Publish{QoS: byte(o.PubQoS)}),
116-
cloudeventsmqtt.WithSubscribe(subscribe),
69+
cloudeventsmqtt.WithSubscribe(AgentSubscribe(&o.MQTTOptions, o.clusterName)),
11770
)
11871
if err != nil {
11972
return err
@@ -157,3 +110,62 @@ func (o *mqttAgentTransport) Close(ctx context.Context) error {
157110
func (o *mqttAgentTransport) ErrorChan() <-chan error {
158111
return o.errorChan
159112
}
113+
114+
func AgentPubTopic(ctx context.Context, o *MQTTOptions, clusterName string, evtCtx cloudevents.EventContext) (string, error) {
115+
logger := klog.FromContext(ctx)
116+
117+
ceType := evtCtx.GetType()
118+
eventType, err := types.ParseCloudEventsType(ceType)
119+
if err != nil {
120+
return "", fmt.Errorf("unsupported event type %q, %v", ceType, err)
121+
}
122+
123+
originalSource, err := evtCtx.GetExtension(types.ExtensionOriginalSource)
124+
if err != nil {
125+
return "", err
126+
}
127+
128+
// agent request to sync resource spec from all sources
129+
if eventType.Action == types.ResyncRequestAction && originalSource == types.SourceAll {
130+
if len(o.Topics.AgentBroadcast) == 0 {
131+
logger.Info("the agent broadcast topic not set, fall back to the agent events topic")
132+
133+
// TODO after supporting multiple sources, we should list each source
134+
return replaceLast(o.Topics.AgentEvents, "+", clusterName), nil
135+
}
136+
137+
return strings.Replace(o.Topics.AgentBroadcast, "+", clusterName, 1), nil
138+
}
139+
140+
topicSource, err := getSourceFromEventsTopic(o.Topics.AgentEvents)
141+
if err != nil {
142+
return "", err
143+
}
144+
145+
// agent publishes status events or spec resync events
146+
eventsTopic := replaceLast(o.Topics.AgentEvents, "+", clusterName)
147+
return replaceLast(eventsTopic, "+", topicSource), nil
148+
}
149+
150+
func AgentSubscribe(o *MQTTOptions, clusterName string) *paho.Subscribe {
151+
subscribe := &paho.Subscribe{
152+
Subscriptions: []paho.SubscribeOptions{
153+
{
154+
// TODO support multiple sources, currently the client require the source events topic has a sourceID, in
155+
// the future, client may need a source list, it will subscribe to each source
156+
// receiving the sources events
157+
Topic: replaceLast(o.Topics.SourceEvents, "+", clusterName), QoS: byte(o.SubQoS),
158+
},
159+
},
160+
}
161+
162+
// receiving status resync events from all sources
163+
if len(o.Topics.SourceBroadcast) != 0 {
164+
subscribe.Subscriptions = append(subscribe.Subscriptions, paho.SubscribeOptions{
165+
Topic: o.Topics.SourceBroadcast,
166+
QoS: byte(o.SubQoS),
167+
})
168+
}
169+
170+
return subscribe
171+
}

pkg/cloudevents/generic/options/mqtt/logger.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package mqtt
22

33
import (
44
"fmt"
5+
56
"github.com/eclipse/paho.golang/paho/log"
67
"k8s.io/klog/v2"
78
)
@@ -14,8 +15,13 @@ type PahoDebugLogger struct {
1415
logger klog.Logger
1516
}
1617

17-
var _ log.Logger = &PahoErrorLogger{}
18-
var _ log.Logger = &PahoDebugLogger{}
18+
func NewPahoErrorLogger(logger klog.Logger) log.Logger {
19+
return &PahoErrorLogger{logger: logger}
20+
}
21+
22+
func NewPahoDebugLogger(logger klog.Logger) log.Logger {
23+
return &PahoDebugLogger{logger: logger}
24+
}
1925

2026
func (l *PahoErrorLogger) Println(v ...interface{}) {
2127
l.logger.Error(fmt.Errorf("get err %s", fmt.Sprint(v...)), "MQTT error message")

pkg/cloudevents/generic/options/mqtt/sourceoptions.go

Lines changed: 61 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type mqttSourceTransport struct {
2323
clientID string
2424
}
2525

26+
// Deprecated: use v2.mqtt.NewSourceOptions instead
2627
func NewSourceOptions(mqttOptions *MQTTOptions, clientID, sourceID string) *options.CloudEventsSourceOptions {
2728
mqttSourceOptions := &mqttSourceTransport{
2829
MQTTOptions: *mqttOptions,
@@ -47,58 +48,20 @@ func (o *mqttSourceTransport) WithContext(ctx context.Context, evtCtx cloudevent
4748
return cloudeventscontext.WithTopic(ctx, string(*topic)), nil
4849
}
4950

50-
eventType, err := types.ParseCloudEventsType(evtCtx.GetType())
51-
if err != nil {
52-
return nil, fmt.Errorf("unsupported event type %s, %v", eventType, err)
53-
}
54-
55-
clusterName, err := evtCtx.GetExtension(types.ExtensionClusterName)
51+
pubTopic, err := SourcePubTopic(ctx, &o.MQTTOptions, o.sourceID, evtCtx)
5652
if err != nil {
5753
return nil, err
5854
}
5955

60-
if eventType.Action == types.ResyncRequestAction && clusterName == types.ClusterAll {
61-
// source request to get resources status from all agents
62-
if len(o.Topics.SourceBroadcast) == 0 {
63-
return nil, fmt.Errorf("the source broadcast topic not set")
64-
}
65-
66-
resyncTopic := strings.Replace(o.Topics.SourceBroadcast, "+", o.sourceID, 1)
67-
return cloudeventscontext.WithTopic(ctx, resyncTopic), nil
68-
}
69-
70-
// source publishes spec events or status resync events
71-
eventsTopic := strings.Replace(o.Topics.SourceEvents, "+", fmt.Sprintf("%s", clusterName), 1)
72-
return cloudeventscontext.WithTopic(ctx, eventsTopic), nil
56+
return cloudeventscontext.WithTopic(ctx, pubTopic), nil
7357
}
7458

7559
func (o *mqttSourceTransport) Connect(ctx context.Context) error {
76-
topicSource, err := getSourceFromEventsTopic(o.Topics.AgentEvents)
60+
subscribe, err := SourceSubscribe(&o.MQTTOptions, o.sourceID)
7761
if err != nil {
7862
return err
7963
}
8064

81-
if topicSource != o.sourceID {
82-
return fmt.Errorf("the topic source %q does not match with the client sourceID %q",
83-
o.Topics.AgentEvents, o.sourceID)
84-
}
85-
86-
subscribe := &paho.Subscribe{
87-
Subscriptions: []paho.SubscribeOptions{
88-
{
89-
Topic: o.Topics.AgentEvents, QoS: byte(o.SubQoS),
90-
},
91-
},
92-
}
93-
94-
if len(o.Topics.AgentBroadcast) != 0 {
95-
// receiving spec resync events from all agents
96-
subscribe.Subscriptions = append(subscribe.Subscriptions, paho.SubscribeOptions{
97-
Topic: o.Topics.AgentBroadcast,
98-
QoS: byte(o.SubQoS),
99-
})
100-
}
101-
10265
protocol, err := o.GetCloudEventsProtocol(
10366
ctx,
10467
o.clientID,
@@ -151,3 +114,60 @@ func (o *mqttSourceTransport) Close(ctx context.Context) error {
151114
func (o *mqttSourceTransport) ErrorChan() <-chan error {
152115
return o.errorChan
153116
}
117+
118+
func SourcePubTopic(ctx context.Context, o *MQTTOptions, sourceID string, evtCtx cloudevents.EventContext) (string, error) {
119+
ceType := evtCtx.GetType()
120+
eventType, err := types.ParseCloudEventsType(ceType)
121+
if err != nil {
122+
return "", fmt.Errorf("unsupported event type %q, %v", ceType, err)
123+
}
124+
125+
clusterName, err := evtCtx.GetExtension(types.ExtensionClusterName)
126+
if err != nil {
127+
return "", err
128+
}
129+
130+
if eventType.Action == types.ResyncRequestAction && clusterName == types.ClusterAll {
131+
// source request to get resources status from all agents
132+
if len(o.Topics.SourceBroadcast) == 0 {
133+
return "", fmt.Errorf("the source broadcast topic not set")
134+
}
135+
136+
resyncTopic := strings.Replace(o.Topics.SourceBroadcast, "+", sourceID, 1)
137+
return resyncTopic, nil
138+
}
139+
140+
// source publishes spec events or status resync events
141+
eventsTopic := strings.Replace(o.Topics.SourceEvents, "+", fmt.Sprintf("%s", clusterName), 1)
142+
return eventsTopic, nil
143+
}
144+
145+
func SourceSubscribe(o *MQTTOptions, sourceID string) (*paho.Subscribe, error) {
146+
topicSource, err := getSourceFromEventsTopic(o.Topics.AgentEvents)
147+
if err != nil {
148+
return nil, err
149+
}
150+
151+
if topicSource != sourceID {
152+
return nil, fmt.Errorf("the topic source %q does not match with the client sourceID %q",
153+
o.Topics.AgentEvents, sourceID)
154+
}
155+
156+
subscribe := &paho.Subscribe{
157+
Subscriptions: []paho.SubscribeOptions{
158+
{
159+
Topic: o.Topics.AgentEvents, QoS: byte(o.SubQoS),
160+
},
161+
},
162+
}
163+
164+
if len(o.Topics.AgentBroadcast) != 0 {
165+
// receiving spec resync events from all agents
166+
subscribe.Subscriptions = append(subscribe.Subscriptions, paho.SubscribeOptions{
167+
Topic: o.Topics.AgentBroadcast,
168+
QoS: byte(o.SubQoS),
169+
})
170+
}
171+
172+
return subscribe, nil
173+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package mqtt
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
v2 "github.com/cloudevents/sdk-go/v2"
8+
"github.com/eclipse/paho.golang/paho"
9+
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options"
10+
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt"
11+
)
12+
13+
func NewAgentOptions(opts *mqtt.MQTTOptions, clusterName, agentID string) *options.CloudEventsAgentOptions {
14+
return &options.CloudEventsAgentOptions{
15+
CloudEventsTransport: newTransport(
16+
fmt.Sprintf("%s-client", agentID),
17+
opts,
18+
func(ctx context.Context, e v2.Event) (string, error) {
19+
return mqtt.AgentPubTopic(ctx, opts, clusterName, e.Context)
20+
},
21+
func() (*paho.Subscribe, error) {
22+
return mqtt.AgentSubscribe(opts, clusterName), nil
23+
},
24+
),
25+
AgentID: agentID,
26+
ClusterName: clusterName,
27+
}
28+
}
29+
30+
func NewSourceOptions(opts *mqtt.MQTTOptions, clientID, sourceID string) *options.CloudEventsSourceOptions {
31+
return &options.CloudEventsSourceOptions{
32+
CloudEventsTransport: newTransport(
33+
clientID,
34+
opts,
35+
func(ctx context.Context, e v2.Event) (string, error) {
36+
return mqtt.SourcePubTopic(ctx, opts, sourceID, e.Context)
37+
},
38+
func() (*paho.Subscribe, error) {
39+
return mqtt.SourceSubscribe(opts, sourceID)
40+
},
41+
),
42+
SourceID: sourceID,
43+
}
44+
}

0 commit comments

Comments
 (0)