Skip to content

Commit 6343644

Browse files
committed
mqtt transport v2
Signed-off-by: Wei Liu <[email protected]>
1 parent a979478 commit 6343644

File tree

10 files changed

+1061
-100
lines changed

10 files changed

+1061
-100
lines changed

pkg/cloudevents/generic/options/builder/optionsbuilder.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
99
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt"
1010
grpcv2 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/v2/grpc"
11+
mqttv2 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/v2/mqtt"
1112
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
1213
)
1314

@@ -56,7 +57,7 @@ func BuildCloudEventsSourceOptions(config any,
5657
clientId, sourceId string, dataType types.CloudEventsDataType) (*options.CloudEventsSourceOptions, error) {
5758
switch config := config.(type) {
5859
case *mqtt.MQTTOptions:
59-
return mqtt.NewSourceOptions(config, clientId, sourceId), nil
60+
return mqttv2.NewSourceOptions(config, clientId, sourceId), nil
6061
case *grpc.GRPCOptions:
6162
return grpcv2.NewSourceOptions(config, sourceId, dataType), nil
6263
default:
@@ -69,7 +70,7 @@ func BuildCloudEventsAgentOptions(config any,
6970
clusterName, clientId string, dataType types.CloudEventsDataType) (*options.CloudEventsAgentOptions, error) {
7071
switch config := config.(type) {
7172
case *mqtt.MQTTOptions:
72-
return mqtt.NewAgentOptions(config, clusterName, clientId), nil
73+
return mqttv2.NewAgentOptions(config, clusterName, clientId), nil
7374
case *grpc.GRPCOptions:
7475
return grpcv2.NewAgentOptions(config, clusterName, clientId, dataType), nil
7576
default:

pkg/cloudevents/generic/options/builder/optionsbuilder_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func TestBuildCloudEventsSourceOptions(t *testing.T) {
5656
Timeout: 60 * time.Second,
5757
},
5858
},
59-
expectedTransportType: "*mqtt.mqttSourceTransport",
59+
expectedTransportType: "*mqtt.mqttTransport",
6060
},
6161
{
6262
name: "grpc config",

pkg/cloudevents/generic/options/mqtt/agentoptions.go

Lines changed: 62 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type mqttAgentTransport struct {
2424
agentID string
2525
}
2626

27+
// Deprecated: use v2.mqtt.NewAgentOptions instead
2728
func NewAgentOptions(mqttOptions *MQTTOptions, clusterName, agentID string) *options.CloudEventsAgentOptions {
2829
mqttAgentOptions := &mqttAgentTransport{
2930
MQTTOptions: *mqttOptions,
@@ -40,8 +41,6 @@ func NewAgentOptions(mqttOptions *MQTTOptions, clusterName, agentID string) *opt
4041
}
4142

4243
func (o *mqttAgentTransport) WithContext(ctx context.Context, evtCtx cloudevents.EventContext) (context.Context, error) {
43-
logger := klog.FromContext(ctx)
44-
4544
topic, err := getAgentPubTopic(ctx)
4645
if err != nil {
4746
return nil, err
@@ -51,69 +50,23 @@ func (o *mqttAgentTransport) WithContext(ctx context.Context, evtCtx cloudevents
5150
return cloudeventscontext.WithTopic(ctx, string(*topic)), nil
5251
}
5352

54-
eventType, err := types.ParseCloudEventsType(evtCtx.GetType())
55-
if err != nil {
56-
return nil, fmt.Errorf("unsupported event type %s, %v", eventType, err)
57-
}
58-
59-
originalSource, err := evtCtx.GetExtension(types.ExtensionOriginalSource)
60-
if err != nil {
61-
return nil, err
62-
}
63-
64-
// agent request to sync resource spec from all sources
65-
if eventType.Action == types.ResyncRequestAction && originalSource == types.SourceAll {
66-
if len(o.Topics.AgentBroadcast) == 0 {
67-
logger.Info("the agent broadcast topic not set, fall back to the agent events topic")
68-
69-
// TODO after supporting multiple sources, we should list each source
70-
eventsTopic := replaceLast(o.Topics.AgentEvents, "+", o.clusterName)
71-
return cloudeventscontext.WithTopic(ctx, eventsTopic), nil
72-
}
73-
74-
resyncTopic := strings.Replace(o.Topics.AgentBroadcast, "+", o.clusterName, 1)
75-
return cloudeventscontext.WithTopic(ctx, resyncTopic), nil
76-
}
77-
78-
topicSource, err := getSourceFromEventsTopic(o.Topics.AgentEvents)
53+
pubTopic, err := AgentPubTopic(ctx, &o.MQTTOptions, o.clusterName, evtCtx)
7954
if err != nil {
8055
return nil, err
8156
}
8257

83-
// agent publishes status events or spec resync events
84-
eventsTopic := replaceLast(o.Topics.AgentEvents, "+", o.clusterName)
85-
eventsTopic = replaceLast(eventsTopic, "+", topicSource)
86-
return cloudeventscontext.WithTopic(ctx, eventsTopic), nil
58+
return cloudeventscontext.WithTopic(ctx, pubTopic), nil
8759
}
8860

8961
func (o *mqttAgentTransport) Connect(ctx context.Context) error {
90-
subscribe := &paho.Subscribe{
91-
Subscriptions: []paho.SubscribeOptions{
92-
{
93-
// TODO support multiple sources, currently the client require the source events topic has a sourceID, in
94-
// the future, client may need a source list, it will subscribe to each source
95-
// receiving the sources events
96-
Topic: replaceLast(o.Topics.SourceEvents, "+", o.clusterName), QoS: byte(o.SubQoS),
97-
},
98-
},
99-
}
100-
101-
// receiving status resync events from all sources
102-
if len(o.Topics.SourceBroadcast) != 0 {
103-
subscribe.Subscriptions = append(subscribe.Subscriptions, paho.SubscribeOptions{
104-
Topic: o.Topics.SourceBroadcast,
105-
QoS: byte(o.SubQoS),
106-
})
107-
}
108-
10962
protocol, err := o.GetCloudEventsProtocol(
11063
ctx,
11164
fmt.Sprintf("%s-client", o.agentID),
11265
func(err error) {
11366
o.errorChan <- err
11467
},
11568
cloudeventsmqtt.WithPublish(&paho.Publish{QoS: byte(o.PubQoS)}),
116-
cloudeventsmqtt.WithSubscribe(subscribe),
69+
cloudeventsmqtt.WithSubscribe(AgentSubscribe(&o.MQTTOptions, o.clusterName)),
11770
)
11871
if err != nil {
11972
return err
@@ -157,3 +110,61 @@ func (o *mqttAgentTransport) Close(ctx context.Context) error {
157110
func (o *mqttAgentTransport) ErrorChan() <-chan error {
158111
return o.errorChan
159112
}
113+
114+
func AgentPubTopic(ctx context.Context, o *MQTTOptions, clusterName string, evtCtx cloudevents.EventContext) (string, error) {
115+
logger := klog.FromContext(ctx)
116+
117+
eventType, err := types.ParseCloudEventsType(evtCtx.GetType())
118+
if err != nil {
119+
return "", fmt.Errorf("unsupported event type %s, %v", eventType, err)
120+
}
121+
122+
originalSource, err := evtCtx.GetExtension(types.ExtensionOriginalSource)
123+
if err != nil {
124+
return "", err
125+
}
126+
127+
// agent request to sync resource spec from all sources
128+
if eventType.Action == types.ResyncRequestAction && originalSource == types.SourceAll {
129+
if len(o.Topics.AgentBroadcast) == 0 {
130+
logger.Info("the agent broadcast topic not set, fall back to the agent events topic")
131+
132+
// TODO after supporting multiple sources, we should list each source
133+
return replaceLast(o.Topics.AgentEvents, "+", clusterName), nil
134+
}
135+
136+
return strings.Replace(o.Topics.AgentBroadcast, "+", clusterName, 1), nil
137+
}
138+
139+
topicSource, err := getSourceFromEventsTopic(o.Topics.AgentEvents)
140+
if err != nil {
141+
return "", err
142+
}
143+
144+
// agent publishes status events or spec resync events
145+
eventsTopic := replaceLast(o.Topics.AgentEvents, "+", clusterName)
146+
return replaceLast(eventsTopic, "+", topicSource), nil
147+
}
148+
149+
func AgentSubscribe(o *MQTTOptions, clusterName string) *paho.Subscribe {
150+
subscribe := &paho.Subscribe{
151+
Subscriptions: []paho.SubscribeOptions{
152+
{
153+
// TODO support multiple sources, currently the client require the source events topic has a sourceID, in
154+
// the future, client may need a source list, it will subscribe to each source
155+
// receiving the sources events
156+
Topic: replaceLast(o.Topics.SourceEvents, "+", clusterName), QoS: byte(o.SubQoS),
157+
},
158+
},
159+
}
160+
161+
// receiving status resync events from all sources
162+
if len(o.Topics.SourceBroadcast) != 0 {
163+
subscribe.Subscriptions = append(subscribe.Subscriptions, paho.SubscribeOptions{
164+
Topic: o.Topics.SourceBroadcast,
165+
QoS: byte(o.SubQoS),
166+
})
167+
}
168+
169+
return subscribe
170+
}

pkg/cloudevents/generic/options/mqtt/sourceoptions.go

Lines changed: 60 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type mqttSourceTransport struct {
2323
clientID string
2424
}
2525

26+
// Deprecated: use v2.mqtt.NewSourceOptions instead
2627
func NewSourceOptions(mqttOptions *MQTTOptions, clientID, sourceID string) *options.CloudEventsSourceOptions {
2728
mqttSourceOptions := &mqttSourceTransport{
2829
MQTTOptions: *mqttOptions,
@@ -47,58 +48,20 @@ func (o *mqttSourceTransport) WithContext(ctx context.Context, evtCtx cloudevent
4748
return cloudeventscontext.WithTopic(ctx, string(*topic)), nil
4849
}
4950

50-
eventType, err := types.ParseCloudEventsType(evtCtx.GetType())
51-
if err != nil {
52-
return nil, fmt.Errorf("unsupported event type %s, %v", eventType, err)
53-
}
54-
55-
clusterName, err := evtCtx.GetExtension(types.ExtensionClusterName)
51+
pubTopic, err := SourcePubTopic(ctx, &o.MQTTOptions, o.sourceID, evtCtx)
5652
if err != nil {
5753
return nil, err
5854
}
5955

60-
if eventType.Action == types.ResyncRequestAction && clusterName == types.ClusterAll {
61-
// source request to get resources status from all agents
62-
if len(o.Topics.SourceBroadcast) == 0 {
63-
return nil, fmt.Errorf("the source broadcast topic not set")
64-
}
65-
66-
resyncTopic := strings.Replace(o.Topics.SourceBroadcast, "+", o.sourceID, 1)
67-
return cloudeventscontext.WithTopic(ctx, resyncTopic), nil
68-
}
69-
70-
// source publishes spec events or status resync events
71-
eventsTopic := strings.Replace(o.Topics.SourceEvents, "+", fmt.Sprintf("%s", clusterName), 1)
72-
return cloudeventscontext.WithTopic(ctx, eventsTopic), nil
56+
return cloudeventscontext.WithTopic(ctx, pubTopic), nil
7357
}
7458

7559
func (o *mqttSourceTransport) Connect(ctx context.Context) error {
76-
topicSource, err := getSourceFromEventsTopic(o.Topics.AgentEvents)
60+
subscribe, err := SourceSubscribe(&o.MQTTOptions, o.sourceID)
7761
if err != nil {
7862
return err
7963
}
8064

81-
if topicSource != o.sourceID {
82-
return fmt.Errorf("the topic source %q does not match with the client sourceID %q",
83-
o.Topics.AgentEvents, o.sourceID)
84-
}
85-
86-
subscribe := &paho.Subscribe{
87-
Subscriptions: []paho.SubscribeOptions{
88-
{
89-
Topic: o.Topics.AgentEvents, QoS: byte(o.SubQoS),
90-
},
91-
},
92-
}
93-
94-
if len(o.Topics.AgentBroadcast) != 0 {
95-
// receiving spec resync events from all agents
96-
subscribe.Subscriptions = append(subscribe.Subscriptions, paho.SubscribeOptions{
97-
Topic: o.Topics.AgentBroadcast,
98-
QoS: byte(o.SubQoS),
99-
})
100-
}
101-
10265
protocol, err := o.GetCloudEventsProtocol(
10366
ctx,
10467
o.clientID,
@@ -151,3 +114,59 @@ func (o *mqttSourceTransport) Close(ctx context.Context) error {
151114
func (o *mqttSourceTransport) ErrorChan() <-chan error {
152115
return o.errorChan
153116
}
117+
118+
func SourcePubTopic(ctx context.Context, o *MQTTOptions, sourceID string, evtCtx cloudevents.EventContext) (string, error) {
119+
eventType, err := types.ParseCloudEventsType(evtCtx.GetType())
120+
if err != nil {
121+
return "", fmt.Errorf("unsupported event type %s, %v", eventType, err)
122+
}
123+
124+
clusterName, err := evtCtx.GetExtension(types.ExtensionClusterName)
125+
if err != nil {
126+
return "", err
127+
}
128+
129+
if eventType.Action == types.ResyncRequestAction && clusterName == types.ClusterAll {
130+
// source request to get resources status from all agents
131+
if len(o.Topics.SourceBroadcast) == 0 {
132+
return "", fmt.Errorf("the source broadcast topic not set")
133+
}
134+
135+
resyncTopic := strings.Replace(o.Topics.SourceBroadcast, "+", sourceID, 1)
136+
return resyncTopic, nil
137+
}
138+
139+
// source publishes spec events or status resync events
140+
eventsTopic := strings.Replace(o.Topics.SourceEvents, "+", fmt.Sprintf("%s", clusterName), 1)
141+
return eventsTopic, nil
142+
}
143+
144+
func SourceSubscribe(o *MQTTOptions, sourceID string) (*paho.Subscribe, error) {
145+
topicSource, err := getSourceFromEventsTopic(o.Topics.AgentEvents)
146+
if err != nil {
147+
return nil, err
148+
}
149+
150+
if topicSource != sourceID {
151+
return nil, fmt.Errorf("the topic source %q does not match with the client sourceID %q",
152+
o.Topics.AgentEvents, sourceID)
153+
}
154+
155+
subscribe := &paho.Subscribe{
156+
Subscriptions: []paho.SubscribeOptions{
157+
{
158+
Topic: o.Topics.AgentEvents, QoS: byte(o.SubQoS),
159+
},
160+
},
161+
}
162+
163+
if len(o.Topics.AgentBroadcast) != 0 {
164+
// receiving spec resync events from all agents
165+
subscribe.Subscriptions = append(subscribe.Subscriptions, paho.SubscribeOptions{
166+
Topic: o.Topics.AgentBroadcast,
167+
QoS: byte(o.SubQoS),
168+
})
169+
}
170+
171+
return subscribe, nil
172+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package mqtt
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
v2 "github.com/cloudevents/sdk-go/v2"
8+
"github.com/eclipse/paho.golang/paho"
9+
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options"
10+
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt"
11+
)
12+
13+
func NewAgentOptions(opts *mqtt.MQTTOptions, clusterName, agentID string) *options.CloudEventsAgentOptions {
14+
return &options.CloudEventsAgentOptions{
15+
CloudEventsTransport: newTransport(
16+
fmt.Sprintf("%s-client", agentID),
17+
opts,
18+
func(ctx context.Context, e v2.Event) (string, error) {
19+
return mqtt.AgentPubTopic(ctx, opts, clusterName, e.Context)
20+
},
21+
func() (*paho.Subscribe, error) {
22+
return mqtt.AgentSubscribe(opts, clusterName), nil
23+
},
24+
),
25+
AgentID: agentID,
26+
ClusterName: clusterName,
27+
}
28+
}
29+
30+
func NewSourceOptions(opts *mqtt.MQTTOptions, clientID, sourceID string) *options.CloudEventsSourceOptions {
31+
return &options.CloudEventsSourceOptions{
32+
CloudEventsTransport: newTransport(
33+
clientID,
34+
opts,
35+
func(ctx context.Context, e v2.Event) (string, error) {
36+
return mqtt.SourcePubTopic(ctx, opts, sourceID, e.Context)
37+
},
38+
func() (*paho.Subscribe, error) {
39+
return mqtt.SourceSubscribe(opts, sourceID)
40+
},
41+
),
42+
SourceID: sourceID,
43+
}
44+
}

0 commit comments

Comments
 (0)