@@ -24,6 +24,7 @@ type mqttAgentTransport struct {
2424 agentID string
2525}
2626
27+ // Deprecated: use v2.mqtt.NewAgentOptions instead
2728func NewAgentOptions (mqttOptions * MQTTOptions , clusterName , agentID string ) * options.CloudEventsAgentOptions {
2829 mqttAgentOptions := & mqttAgentTransport {
2930 MQTTOptions : * mqttOptions ,
@@ -40,8 +41,6 @@ func NewAgentOptions(mqttOptions *MQTTOptions, clusterName, agentID string) *opt
4041}
4142
4243func (o * mqttAgentTransport ) WithContext (ctx context.Context , evtCtx cloudevents.EventContext ) (context.Context , error ) {
43- logger := klog .FromContext (ctx )
44-
4544 topic , err := getAgentPubTopic (ctx )
4645 if err != nil {
4746 return nil , err
@@ -51,59 +50,18 @@ func (o *mqttAgentTransport) WithContext(ctx context.Context, evtCtx cloudevents
5150 return cloudeventscontext .WithTopic (ctx , string (* topic )), nil
5251 }
5352
54- eventType , err := types .ParseCloudEventsType (evtCtx .GetType ())
55- if err != nil {
56- return nil , fmt .Errorf ("unsupported event type %s, %v" , eventType , err )
57- }
58-
59- originalSource , err := evtCtx .GetExtension (types .ExtensionOriginalSource )
60- if err != nil {
61- return nil , err
62- }
63-
64- // agent request to sync resource spec from all sources
65- if eventType .Action == types .ResyncRequestAction && originalSource == types .SourceAll {
66- if len (o .Topics .AgentBroadcast ) == 0 {
67- logger .Info ("the agent broadcast topic not set, fall back to the agent events topic" )
68-
69- // TODO after supporting multiple sources, we should list each source
70- eventsTopic := replaceLast (o .Topics .AgentEvents , "+" , o .clusterName )
71- return cloudeventscontext .WithTopic (ctx , eventsTopic ), nil
72- }
73-
74- resyncTopic := strings .Replace (o .Topics .AgentBroadcast , "+" , o .clusterName , 1 )
75- return cloudeventscontext .WithTopic (ctx , resyncTopic ), nil
76- }
77-
78- topicSource , err := getSourceFromEventsTopic (o .Topics .AgentEvents )
53+ pubTopic , err := AgentPubTopic (ctx , & o .MQTTOptions , o .clusterName , evtCtx )
7954 if err != nil {
8055 return nil , err
8156 }
8257
83- // agent publishes status events or spec resync events
84- eventsTopic := replaceLast (o .Topics .AgentEvents , "+" , o .clusterName )
85- eventsTopic = replaceLast (eventsTopic , "+" , topicSource )
86- return cloudeventscontext .WithTopic (ctx , eventsTopic ), nil
58+ return cloudeventscontext .WithTopic (ctx , pubTopic ), nil
8759}
8860
8961func (o * mqttAgentTransport ) Connect (ctx context.Context ) error {
90- subscribe := & paho.Subscribe {
91- Subscriptions : []paho.SubscribeOptions {
92- {
93- // TODO support multiple sources, currently the client require the source events topic has a sourceID, in
94- // the future, client may need a source list, it will subscribe to each source
95- // receiving the sources events
96- Topic : replaceLast (o .Topics .SourceEvents , "+" , o .clusterName ), QoS : byte (o .SubQoS ),
97- },
98- },
99- }
100-
101- // receiving status resync events from all sources
102- if len (o .Topics .SourceBroadcast ) != 0 {
103- subscribe .Subscriptions = append (subscribe .Subscriptions , paho.SubscribeOptions {
104- Topic : o .Topics .SourceBroadcast ,
105- QoS : byte (o .SubQoS ),
106- })
62+ subscribe , err := AgentSubscribe (& o .MQTTOptions , o .clusterName )
63+ if err != nil {
64+ return err
10765 }
10866
10967 protocol , err := o .GetCloudEventsProtocol (
@@ -157,3 +115,62 @@ func (o *mqttAgentTransport) Close(ctx context.Context) error {
157115func (o * mqttAgentTransport ) ErrorChan () <- chan error {
158116 return o .errorChan
159117}
118+
119+ func AgentPubTopic (ctx context.Context , o * MQTTOptions , clusterName string , evtCtx cloudevents.EventContext ) (string , error ) {
120+ logger := klog .FromContext (ctx )
121+
122+ ceType := evtCtx .GetType ()
123+ eventType , err := types .ParseCloudEventsType (ceType )
124+ if err != nil {
125+ return "" , fmt .Errorf ("unsupported event type %q, %v" , ceType , err )
126+ }
127+
128+ originalSource , err := evtCtx .GetExtension (types .ExtensionOriginalSource )
129+ if err != nil {
130+ return "" , err
131+ }
132+
133+ // agent request to sync resource spec from all sources
134+ if eventType .Action == types .ResyncRequestAction && originalSource == types .SourceAll {
135+ if len (o .Topics .AgentBroadcast ) == 0 {
136+ logger .Info ("the agent broadcast topic not set, fall back to the agent events topic" )
137+
138+ // TODO after supporting multiple sources, we should list each source
139+ return replaceLast (o .Topics .AgentEvents , "+" , clusterName ), nil
140+ }
141+
142+ return strings .Replace (o .Topics .AgentBroadcast , "+" , clusterName , 1 ), nil
143+ }
144+
145+ topicSource , err := getSourceFromEventsTopic (o .Topics .AgentEvents )
146+ if err != nil {
147+ return "" , err
148+ }
149+
150+ // agent publishes status events or spec resync events
151+ eventsTopic := replaceLast (o .Topics .AgentEvents , "+" , clusterName )
152+ return replaceLast (eventsTopic , "+" , topicSource ), nil
153+ }
154+
155+ func AgentSubscribe (o * MQTTOptions , clusterName string ) (* paho.Subscribe , error ) {
156+ subscribe := & paho.Subscribe {
157+ Subscriptions : []paho.SubscribeOptions {
158+ {
159+ // TODO support multiple sources, currently the client require the source events topic has a sourceID, in
160+ // the future, client may need a source list, it will subscribe to each source
161+ // receiving the sources events
162+ Topic : replaceLast (o .Topics .SourceEvents , "+" , clusterName ), QoS : byte (o .SubQoS ),
163+ },
164+ },
165+ }
166+
167+ // receiving status resync events from all sources
168+ if len (o .Topics .SourceBroadcast ) != 0 {
169+ subscribe .Subscriptions = append (subscribe .Subscriptions , paho.SubscribeOptions {
170+ Topic : o .Topics .SourceBroadcast ,
171+ QoS : byte (o .SubQoS ),
172+ })
173+ }
174+
175+ return subscribe , nil
176+ }
0 commit comments