Skip to content

Commit f5d5a58

Browse files
committed
mqtt transport v2
Signed-off-by: Wei Liu <[email protected]>
1 parent 500719f commit f5d5a58

File tree

3 files changed

+19
-8
lines changed

3 files changed

+19
-8
lines changed

pkg/cloudevents/generic/options/mqtt/agentoptions.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,11 +125,16 @@ func AgentPubTopic(ctx context.Context, o *MQTTOptions, clusterName string, evtC
125125
return "", fmt.Errorf("unsupported event type %q, %v", ceType, err)
126126
}
127127

128-
originalSource, err := evtCtx.GetExtension(types.ExtensionOriginalSource)
128+
originalSourceVal, err := evtCtx.GetExtension(types.ExtensionOriginalSource)
129129
if err != nil {
130130
return "", err
131131
}
132132

133+
originalSource, ok := originalSourceVal.(string)
134+
if !ok {
135+
return "", fmt.Errorf("originalsource extension must be a string, got %T", originalSourceVal)
136+
}
137+
133138
// agent request to sync resource spec from all sources
134139
if eventType.Action == types.ResyncRequestAction && originalSource == types.SourceAll {
135140
if len(o.Topics.AgentBroadcast) == 0 {

pkg/cloudevents/generic/options/mqtt/options.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@ import (
44
"context"
55
"crypto/tls"
66
"fmt"
7-
"k8s.io/klog/v2"
87
"net"
98
"os"
109
"regexp"
1110
"strings"
1211
"time"
1312

13+
"k8s.io/klog/v2"
14+
1415
cloudeventsmqtt "github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2"
1516
"github.com/eclipse/paho.golang/packets"
1617
"github.com/eclipse/paho.golang/paho"
@@ -235,8 +236,8 @@ func (o *MQTTOptions) GetCloudEventsProtocol(
235236

236237
opts := []cloudeventsmqtt.Option{
237238
cloudeventsmqtt.WithConnect(o.GetMQTTConnectOption(clientID)),
238-
cloudeventsmqtt.WithDebugLogger(&PahoDebugLogger{logger: logger}),
239-
cloudeventsmqtt.WithErrorLogger(&PahoErrorLogger{logger: logger}),
239+
cloudeventsmqtt.WithDebugLogger(NewPahoDebugLogger(logger)),
240+
cloudeventsmqtt.WithErrorLogger(NewPahoErrorLogger(logger)),
240241
}
241242
opts = append(opts, clientOpts...)
242243
return cloudeventsmqtt.New(ctx, config, opts...)

pkg/cloudevents/generic/options/mqtt/sourceoptions.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -122,11 +122,16 @@ func SourcePubTopic(ctx context.Context, o *MQTTOptions, sourceID string, evtCtx
122122
return "", fmt.Errorf("unsupported event type %q, %v", ceType, err)
123123
}
124124

125-
clusterName, err := evtCtx.GetExtension(types.ExtensionClusterName)
125+
clusterNameVal, err := evtCtx.GetExtension(types.ExtensionClusterName)
126126
if err != nil {
127127
return "", err
128128
}
129129

130+
clusterName, ok := clusterNameVal.(string)
131+
if !ok {
132+
return "", fmt.Errorf("clustername extension must be a string, got %T", clusterNameVal)
133+
}
134+
130135
if eventType.Action == types.ResyncRequestAction && clusterName == types.ClusterAll {
131136
// source request to get resources status from all agents
132137
if len(o.Topics.SourceBroadcast) == 0 {
@@ -138,7 +143,7 @@ func SourcePubTopic(ctx context.Context, o *MQTTOptions, sourceID string, evtCtx
138143
}
139144

140145
// source publishes spec events or status resync events
141-
eventsTopic := strings.Replace(o.Topics.SourceEvents, "+", fmt.Sprintf("%s", clusterName), 1)
146+
eventsTopic := strings.Replace(o.Topics.SourceEvents, "+", clusterName, 1)
142147
return eventsTopic, nil
143148
}
144149

@@ -149,8 +154,8 @@ func SourceSubscribe(o *MQTTOptions, sourceID string) (*paho.Subscribe, error) {
149154
}
150155

151156
if topicSource != sourceID {
152-
return nil, fmt.Errorf("the topic source %q does not match with the client sourceID %q",
153-
o.Topics.AgentEvents, sourceID)
157+
return nil, fmt.Errorf("the topic source %q does not match the client sourceID %q",
158+
topicSource, sourceID)
154159
}
155160

156161
subscribe := &paho.Subscribe{

0 commit comments

Comments
 (0)