Skip to content

Conversation

@skeeey
Copy link
Member

@skeeey skeeey commented Nov 19, 2025

Summary

Related issue(s)

Fixes #

Summary by CodeRabbit

  • New Features

    • v2 MQTT transport for CloudEvents with improved lifecycle, publish/subscribe handling, and graceful shutdown.
    • v2 constructors for agent and source MQTT options; legacy wrappers marked deprecated.
    • Helpers to compute publish topics and build subscription options.
    • New logger constructor functions for MQTT client logging.
  • Refactor

    • Switched call sites to the v2 MQTT API to centralize topic/subscription logic and simplify option construction.
  • Tests

    • Added comprehensive end-to-end MQTT tests covering connect, send/receive, reconnect, and error paths.

✏️ Tip: You can customize this high-level summary in your review settings.

@openshift-ci openshift-ci bot requested review from deads2k and qiujian16 November 19, 2025 06:22
@coderabbitai
Copy link

coderabbitai bot commented Nov 19, 2025

Walkthrough

Replaces v1 MQTT CloudEvents constructors with v2 variants; extracts topic/subscription logic into helpers; adds an internal v2 MQTT transport with lifecycle, concurrency, and message plumbing; introduces v2 transport tests; provides deprecated v1 wrappers; adds Paho logger constructors; updates integration tests to use v2 constructors.

Changes

Cohort / File(s) Change Summary
Builder API migration
pkg/cloudevents/generic/options/builder/optionsbuilder.go, pkg/cloudevents/generic/options/builder/optionsbuilder_test.go
Builders and tests updated to call v2 constructors (mqttv2.NewSourceOptions, mqttv2.NewAgentOptions) and to expect v2 transport types.
Legacy MQTT helpers & loggers
pkg/cloudevents/generic/options/mqtt/sourceoptions.go, pkg/cloudevents/generic/options/mqtt/agentoptions.go, pkg/cloudevents/generic/options/mqtt/logger.go, pkg/cloudevents/generic/options/mqtt/options.go
Added deprecated NewSourceOptions/NewAgentOptions wrappers; extracted topic/subscription logic into helpers (SourcePubTopic, SourceSubscribe, AgentPubTopic, AgentSubscribe); refactored WithContext/Connect to delegate; added NewPahoErrorLogger/NewPahoDebugLogger constructors and replaced inline logger struct literals.
V2 MQTT options (constructors)
pkg/cloudevents/generic/options/v2/mqtt/options.go
New v2 constructors NewAgentOptions and NewSourceOptions that build CloudEvents transport instances with topic builders and subscription getters and set Agent/Source identifiers.
V2 MQTT transport implementation
pkg/cloudevents/generic/options/v2/mqtt/transport.go
New internal mqttTransport implementing Connect/Close/Send/Subscribe/Receive/ErrorChan with MQTT-Paho client management, message channels, subscription wiring, and guarded shutdown.
V2 MQTT transport tests
pkg/cloudevents/generic/options/v2/mqtt/transport_test.go
New end-to-end tests (in-process broker) covering connect/reconnect, send/receive, subscribe semantics, error paths, cancellation, and edge cases.
Integration tests / call-site updates
test/integration/cloudevents/certrotation_mqtt_test.go, test/integration/cloudevents/cloudevents_resync_test.go, test/integration/cloudevents/cloudevetns_mqtt_test.go
Imports changed to mqttv2; call sites updated to mqttv2.NewAgentOptions / mqttv2.NewSourceOptions; one test adjusted to return (options, constants.ConfigTypeMQTT).

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • New transport and tests require careful review of concurrency, channel lifecycle, reconnection, and error propagation.
  • Refactored topic/subscription helpers must be validated for behavioral parity with prior logic (resync/all-clusters handling, broadcast vs. agent vs. source topics).
  • Pay extra attention to:
    • Topic derivation and validation rules in SourcePubTopic/AgentPubTopic.
    • Subscription construction and client-source matching in SourceSubscribe/AgentSubscribe.
    • Race conditions and double-close/ double-subscribe handling in mqttTransport (Connect/Subscribe/Receive/Close).
    • Test timing/flakiness and broker setup/teardown in transport tests.

Possibly related PRs

Suggested reviewers

  • deads2k
  • qiujian16

Pre-merge checks and finishing touches

❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description is incomplete. It contains only the template boilerplate with empty Summary and Related issue sections, providing no actual implementation details or context about the mqtt transport v2 changes. Complete the Summary section with details about the mqtt transport v2 implementation, and fill in the Related issue(s) section if applicable. Include key implementation details and the motivation for these changes.
Docstring Coverage ⚠️ Warning Docstring coverage is 61.76% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title ':seedling: mqtt transport v2' is specific and directly reflects the main change: introducing a new MQTT transport v2 implementation. It accurately summarizes the primary focus of the changeset.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@skeeey
Copy link
Member Author

skeeey commented Nov 19, 2025

/hold

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (5)
pkg/cloudevents/generic/options/builder/optionsbuilder_test.go (1)

40-59: Updated transport type assertion matches v2 mqttTransport

Changing expectedTransportType to *mqtt.mqttTransport correctly reflects the unified v2 MQTT transport implementation. If you ever want to decouple from concrete type names, you could instead assert expected behavior or use interface/type assertions rather than reflect.Type.String(), but this is fine as-is.

pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (1)

50-118: Transport tests are thorough; consider tightening invalid‑event coverage

The in‑process broker tests give good confidence in connect/reconnect, send/receive, load handling, and error paths (no connect, double subscribe, no subscribe, close/cancel). Two small, non‑blocking nits:

  • TestTransportReceiveInvalidEvent currently never publishes an actually malformed MQTT payload, so it only verifies that the handler isn’t called when nothing arrives. If you want to assert the invalid‑event branch, consider sending a non‑CloudEvents message on the subscribed topic.
  • createTestMQTTOptions’s topic parameter is unused; you could drop it or wire it into MQTTOptions.Topics to avoid confusion.

These are nice‑to‑have cleanups; the test suite is otherwise solid.

Also applies to: 239-316, 318-401, 570-607

pkg/cloudevents/generic/options/mqtt/sourceoptions.go (2)

118-142: Minor improvements for SourcePubTopic: error message and extension typing

The logic for choosing between broadcast vs. events topics looks consistent, but there are a couple of polish points:

  • The error on parse failure uses %s with eventType, which is a struct pointer and may not render meaningfully; using the raw type string would be clearer.
  • clusterName is an interface{} from GetExtension; using it directly in %s and topic substitution relies on it always being string-typed.

You could tighten this up as:

-func SourcePubTopic(ctx context.Context, o *MQTTOptions, sourceID string, evtCtx cloudevents.EventContext) (string, error) {
-	eventType, err := types.ParseCloudEventsType(evtCtx.GetType())
+func SourcePubTopic(ctx context.Context, o *MQTTOptions, sourceID string, evtCtx cloudevents.EventContext) (string, error) {
+	eventTypeStr := evtCtx.GetType()
+	eventType, err := types.ParseCloudEventsType(eventTypeStr)
 	if err != nil {
-		return "", fmt.Errorf("unsupported event type %s, %v", eventType, err)
+		return "", fmt.Errorf("unsupported event type %q: %w", eventTypeStr, err)
 	}
 
-	clusterName, err := evtCtx.GetExtension(types.ExtensionClusterName)
+	clusterNameVal, err := evtCtx.GetExtension(types.ExtensionClusterName)
 	if err != nil {
 		return "", err
 	}
+
+	clusterName, ok := clusterNameVal.(string)
+	if !ok {
+		return "", fmt.Errorf("clustername extension is not a string: %T", clusterNameVal)
+	}
@@
-	if eventType.Action == types.ResyncRequestAction && clusterName == types.ClusterAll {
+	if eventType.Action == types.ResyncRequestAction && clusterName == string(types.ClusterAll) {
@@
-	eventsTopic := strings.Replace(o.Topics.SourceEvents, "+", fmt.Sprintf("%s", clusterName), 1)
+	eventsTopic := strings.Replace(o.Topics.SourceEvents, "+", clusterName, 1)

This keeps behavior but makes failures easier to diagnose and guards against unexpected extension types.

Please confirm in the existing code/tests how ExtensionClusterName is populated (always as string) to ensure the stricter type assertion doesn’t break current producers.


144-172: SourceSubscribe extraction is good; tweak error message for clarity

The helper nicely centralizes AgentEvents validation and optional AgentBroadcast subscription. One small improvement: the error message uses the full topic string instead of the parsed source, which can be confusing.

Consider:

 func SourceSubscribe(o *MQTTOptions, sourceID string) (*paho.Subscribe, error) {
 	topicSource, err := getSourceFromEventsTopic(o.Topics.AgentEvents)
 	if err != nil {
 		return nil, err
 	}
 
 	if topicSource != sourceID {
-		return nil, fmt.Errorf("the topic source %q does not match with the client sourceID %q",
-			o.Topics.AgentEvents, sourceID)
+		return nil, fmt.Errorf("the topic source %q does not match the client sourceID %q",
+			topicSource, sourceID)
 	}
@@
 	if len(o.Topics.AgentBroadcast) != 0 {
@@
 	return subscribe, nil
 }

Behavior is unchanged; diagnostics become more precise.

pkg/cloudevents/generic/options/mqtt/agentoptions.go (1)

114-147: AgentPubTopic logic matches design; same minor concerns as source side

The function correctly:

  • Parses the CloudEvents type.
  • Uses originalsource extension to distinguish resync-from-all-sources vs per-source events.
  • Falls back to AgentEvents when AgentBroadcast is unset, and uses replaceLast to inject clusterName.
  • For normal events, derives topic by replacing the last + with clusterName and then the remaining + with the concrete source.

Two small improvements:

  • The parse error uses %s with a struct pointer; emitting the raw type string would be clearer.
  • originalSource is interface{}; relying on == types.SourceAll and downstream replaces assumes it is always string-typed.

You could mirror the tighter pattern from the source helper:

-func AgentPubTopic(ctx context.Context, o *MQTTOptions, clusterName string, evtCtx cloudevents.EventContext) (string, error) {
+func AgentPubTopic(ctx context.Context, o *MQTTOptions, clusterName string, evtCtx cloudevents.EventContext) (string, error) {
 	logger := klog.FromContext(ctx)
 
-	eventType, err := types.ParseCloudEventsType(evtCtx.GetType())
+	eventTypeStr := evtCtx.GetType()
+	eventType, err := types.ParseCloudEventsType(eventTypeStr)
 	if err != nil {
-		return "", fmt.Errorf("unsupported event type %s, %v", eventType, err)
+		return "", fmt.Errorf("unsupported event type %q: %w", eventTypeStr, err)
 	}
 
-	originalSource, err := evtCtx.GetExtension(types.ExtensionOriginalSource)
+	originalSourceVal, err := evtCtx.GetExtension(types.ExtensionOriginalSource)
 	if err != nil {
 		return "", err
 	}
+
+	originalSource, ok := originalSourceVal.(string)
+	if !ok {
+		return "", fmt.Errorf("originalsource extension is not a string: %T", originalSourceVal)
+	}
@@
-	if eventType.Action == types.ResyncRequestAction && originalSource == types.SourceAll {
+	if eventType.Action == types.ResyncRequestAction && originalSource == string(types.SourceAll) {
@@
-		return strings.Replace(o.Topics.AgentBroadcast, "+", clusterName, 1), nil
+		return strings.Replace(o.Topics.AgentBroadcast, "+", clusterName, 1), nil

This keeps the topic behavior but gives better diagnostics and guards unexpected extension types.

Please double-check how ExtensionOriginalSource is populated today (type and values) so that the added type assertion and comparison against SourceAll won’t surprise existing producers.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a979478 and 2afa7af.

📒 Files selected for processing (7)
  • pkg/cloudevents/generic/options/builder/optionsbuilder.go (3 hunks)
  • pkg/cloudevents/generic/options/builder/optionsbuilder_test.go (1 hunks)
  • pkg/cloudevents/generic/options/mqtt/agentoptions.go (3 hunks)
  • pkg/cloudevents/generic/options/mqtt/sourceoptions.go (3 hunks)
  • pkg/cloudevents/generic/options/v2/mqtt/options.go (1 hunks)
  • pkg/cloudevents/generic/options/v2/mqtt/transport.go (1 hunks)
  • pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-11-11T13:27:36.331Z
Learnt from: morvencao
Repo: open-cluster-management-io/sdk-go PR: 162
File: pkg/cloudevents/generic/options/pubsub/options.go:157-174
Timestamp: 2025-11-11T13:27:36.331Z
Learning: For open-cluster-management PubSub transport in pkg/cloudevents/generic/options/pubsub: broadcast topics (SourceBroadcast, AgentBroadcast) and their corresponding subscriptions are always required, not optional. The omitempty tags on types.Topics broadcast fields exist because the struct is shared with MQTT (where broadcasts are optional), but PubSub requires all broadcast channels for resync functionality.

Applied to files:

  • pkg/cloudevents/generic/options/builder/optionsbuilder_test.go
  • pkg/cloudevents/generic/options/mqtt/sourceoptions.go
  • pkg/cloudevents/generic/options/v2/mqtt/options.go
  • pkg/cloudevents/generic/options/v2/mqtt/transport.go
  • pkg/cloudevents/generic/options/v2/mqtt/transport_test.go
  • pkg/cloudevents/generic/options/builder/optionsbuilder.go
  • pkg/cloudevents/generic/options/mqtt/agentoptions.go
🧬 Code graph analysis (6)
pkg/cloudevents/generic/options/mqtt/sourceoptions.go (2)
pkg/cloudevents/generic/options/mqtt/options.go (1)
  • MQTTOptions (69-78)
pkg/cloudevents/generic/types/types.go (5)
  • ParseCloudEventsType (209-230)
  • ExtensionClusterName (74-74)
  • ResyncRequestAction (41-41)
  • ClusterAll (18-18)
  • Topics (112-148)
pkg/cloudevents/generic/options/v2/mqtt/options.go (4)
pkg/cloudevents/generic/options/mqtt/agentoptions.go (3)
  • NewAgentOptions (28-41)
  • AgentPubTopic (114-147)
  • AgentSubscribe (149-170)
pkg/cloudevents/generic/options/mqtt/options.go (1)
  • MQTTOptions (69-78)
pkg/cloudevents/generic/options/options.go (2)
  • CloudEventsAgentOptions (76-89)
  • CloudEventsSourceOptions (62-73)
pkg/cloudevents/generic/options/mqtt/sourceoptions.go (3)
  • NewSourceOptions (27-39)
  • SourcePubTopic (118-142)
  • SourceSubscribe (144-172)
pkg/cloudevents/generic/options/v2/mqtt/transport.go (2)
pkg/cloudevents/generic/options/mqtt/options.go (1)
  • MQTTOptions (69-78)
pkg/cloudevents/generic/options/options.go (1)
  • ReceiveHandlerFn (13-13)
pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (1)
pkg/cloudevents/generic/options/mqtt/options.go (2)
  • MQTTOptions (69-78)
  • MQTTDialer (31-37)
pkg/cloudevents/generic/options/builder/optionsbuilder.go (3)
pkg/cloudevents/generic/options/mqtt/sourceoptions.go (1)
  • NewSourceOptions (27-39)
pkg/cloudevents/generic/options/v2/mqtt/options.go (2)
  • NewSourceOptions (30-44)
  • NewAgentOptions (13-28)
pkg/cloudevents/generic/options/mqtt/agentoptions.go (1)
  • NewAgentOptions (28-41)
pkg/cloudevents/generic/options/mqtt/agentoptions.go (2)
pkg/cloudevents/generic/options/mqtt/options.go (1)
  • MQTTOptions (69-78)
pkg/cloudevents/generic/types/types.go (5)
  • ParseCloudEventsType (209-230)
  • ExtensionOriginalSource (77-77)
  • ResyncRequestAction (41-41)
  • SourceAll (22-22)
  • Topics (112-148)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: integration
  • GitHub Check: unit
  • GitHub Check: verify
🔇 Additional comments (9)
pkg/cloudevents/generic/options/builder/optionsbuilder.go (1)

3-13: Builder correctly migrated to v2 MQTT options

The switch from mqtt.New*Options to mqttv2.New*Options for both source and agent builders is consistent with the new v2 constructors and preserves parameter ordering (clientId / sourceId / clusterName). GRPC handling and error paths remain unchanged, so the builder behavior stays compatible while routing to the new transport.

Also applies to: 55-79

pkg/cloudevents/generic/options/v2/mqtt/options.go (1)

13-44: v2 MQTT options constructors mirror v1 semantics with shared transport

NewAgentOptions and NewSourceOptions correctly wire the new mqttTransport with the existing topic/subscription helpers and set the expected identity fields (AgentID, ClusterName, SourceID). This keeps the external CloudEvents*Options shape stable while moving to the v2 transport.

pkg/cloudevents/generic/options/mqtt/sourceoptions.go (3)

26-39: Deprecation wrapper for NewSourceOptions looks good

Keeping the v1 constructor with a clear deprecation note while delegating to mqttSourceTransport is fine and preserves existing callers without behavior change.


41-57: WithContext refactor to SourcePubTopic is appropriate

Delegating topic resolution to SourcePubTopic after honoring any pre-set topic from getSourcePubTopic keeps behavior clear and centralizes topic logic. The context wiring via cloudeventscontext.WithTopic remains unchanged.

Given this depends on cloudevents.EventContext and cloudeventscontext.WithTopic, please re-run existing source-side MQTT tests to confirm no regressions with the refactor.


59-73: Connect now using SourceSubscribe helper is a clean extraction

Moving subscription construction and validation into SourceSubscribe simplifies Connect and keeps the GetCloudEventsProtocol call readable. Error propagation is preserved.

Since SourceSubscribe is new public API, verify any external users that manually constructed paho.Subscribe before are not relying on older internals, and that topic configs still match the expectations of the underlying MQTT/Paho client.

pkg/cloudevents/generic/options/mqtt/agentoptions.go (4)

27-41: Agent v1 constructor deprecation is consistent with source side

The deprecation comment and wrapper NewAgentOptions keep the v1 path available while nudging callers toward the v2 API; the returned CloudEventsAgentOptions is wired identically to the existing transport.


43-59: WithContext delegation to AgentPubTopic is sensible

The method still respects any pre-set topic (getAgentPubTopic) and otherwise centralizes topic derivation in AgentPubTopic, which simplifies the transport and keeps all MQTT-topic logic in one place.

Since this path is CloudEvents-driven, please ensure existing agent-side tests that assert specific MQTT topics still pass with the helper-based implementation.


61-70: Connect using AgentSubscribe keeps subscription wiring focused

Using AgentSubscribe in the WithSubscribe option removes duplication and matches the source-side pattern. There’s no added error path here, which is fine given AgentSubscribe currently only does deterministic string manipulation.

Because this touches the integration between the MQTT Paho client and the CloudEvents MQTT protocol, please re-run agent MQTT integration tests to validate subscriptions still reach all expected topics.


149-169: AgentSubscribe helper cleanly encapsulates subscription setup

The helper correctly:

  • Subscribes to the source events topic with clusterName substituted into the last +, preserving any source wildcards or IDs earlier in the path.
  • Optionally appends SourceBroadcast when configured, matching the “broadcasts optional for MQTT” design.

No functional issues spotted here.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (5)
pkg/cloudevents/generic/options/v2/mqtt/transport.go (2)

20-45: Consider buffering errorChan for more resilient error delivery

With the current unbuffered errorChan and non‑blocking send in OnClientError, errors are only delivered when a receiver is concurrently waiting; transient gaps in readers will cause errors to be dropped immediately. A small buffer (e.g. 16) preserves the non‑blocking behavior while tolerating brief backpressure from consumers.

 func newTransport(clientID string, opts *mqtt.MQTTOptions, pubTopicGetter pubTopicGetter, subscribeGetter subscribeGetter) *mqttTransport {
 	return &mqttTransport{
 		opts:            opts,
 		clientID:        clientID,
-		errorChan:       make(chan error),
+		// Small buffer to decouple internal error reporting from consumer speed.
+		errorChan:       make(chan error, 16),
 		getPublishTopic: pubTopicGetter,
 		getSubscribe:    subscribeGetter,
 	}
 }

47-86: Define behavior for multiple Connect calls on the same transport

Connect does not check or reuse an existing t.client, so calling it twice without an intervening Close will create a new client/connection and leak the previous one. Either explicitly guard (return an error if already connected) or proactively disconnect/replace the existing client, and document the intended semantics.

pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (3)

26-48: Simplify allowAllHook.Provides for clarity

Provides only needs to match two hook types; using bytes.Contains on a 2‑byte slice is harder to read than direct comparisons. A simple boolean expression is clearer and avoids unnecessary allocations.

 func (h *allowAllHook) Provides(b byte) bool {
-	return bytes.Contains([]byte{
-		mochimqtt.OnConnectAuthenticate,
-		mochimqtt.OnACLCheck,
-	}, []byte{b})
+	return b == mochimqtt.OnConnectAuthenticate ||
+		b == mochimqtt.OnACLCheck
 }

50-85: Broker readiness: fixed sleep may be flaky on slow environments

setupTestBroker waits for the broker to be ready using a fixed time.Sleep(100 * time.Millisecond). On slower or heavily loaded CI, this can be marginal and lead to intermittent connection failures. Consider polling for readiness (e.g., attempting a short client dial in a loop with a timeout) instead of a fixed sleep.


576-613: TestTransportReceiveInvalidEvent name doesn’t match current behavior

This test currently never injects malformed MQTT payloads; it only checks that the handler is not called when no messages arrive. Either rename the test to reflect its actual behavior or extend it to publish a deliberately invalid MQTT message to exercise the invalid event logging/skip path.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2afa7af and 28f45ca.

📒 Files selected for processing (2)
  • pkg/cloudevents/generic/options/v2/mqtt/transport.go (1 hunks)
  • pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-11-11T13:27:36.331Z
Learnt from: morvencao
Repo: open-cluster-management-io/sdk-go PR: 162
File: pkg/cloudevents/generic/options/pubsub/options.go:157-174
Timestamp: 2025-11-11T13:27:36.331Z
Learning: For open-cluster-management PubSub transport in pkg/cloudevents/generic/options/pubsub: broadcast topics (SourceBroadcast, AgentBroadcast) and their corresponding subscriptions are always required, not optional. The omitempty tags on types.Topics broadcast fields exist because the struct is shared with MQTT (where broadcasts are optional), but PubSub requires all broadcast channels for resync functionality.

Applied to files:

  • pkg/cloudevents/generic/options/v2/mqtt/transport_test.go
  • pkg/cloudevents/generic/options/v2/mqtt/transport.go
📚 Learning: 2025-09-16T02:22:20.929Z
Learnt from: skeeey
Repo: open-cluster-management-io/sdk-go PR: 144
File: pkg/cloudevents/generic/options/grpc/protocol/protocol.go:200-213
Timestamp: 2025-09-16T02:22:20.929Z
Learning: In the GRPC CloudEvents protocol implementation, when startEventsReceiver encounters a stream error, it sends the error to reconnectErrorChan. The consumer of this channel handles the error by calling Close() on the protocol, which triggers close(p.closeChan), causing OpenInbound to unblock and call cancel() to properly terminate both the events receiver and heartbeat watcher goroutines.

Applied to files:

  • pkg/cloudevents/generic/options/v2/mqtt/transport.go
🧬 Code graph analysis (2)
pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (1)
pkg/cloudevents/generic/options/mqtt/options.go (2)
  • MQTTOptions (69-78)
  • MQTTDialer (31-37)
pkg/cloudevents/generic/options/v2/mqtt/transport.go (2)
pkg/cloudevents/generic/options/mqtt/options.go (1)
  • MQTTOptions (69-78)
pkg/cloudevents/generic/options/options.go (1)
  • ReceiveHandlerFn (13-13)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: integration
  • GitHub Check: unit
  • GitHub Check: verify
🔇 Additional comments (2)
pkg/cloudevents/generic/options/v2/mqtt/transport.go (1)

117-189: Shutdown flow via closeChan and avoiding msgChan closes looks good

Using closeChan as the sole shutdown signal (and no longer closing msgChan) removes the prior send‑on‑closed‑channel race between AddOnPublishReceived and Close. The Receive loop correctly exits on ctx.Done() or closeChan, while still draining msgChan when active.

Also applies to: 195-225

pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (1)

119-653: Good end‑to‑end coverage of the MQTT transport lifecycle

The tests exercise connect/reconnect, send, subscribe/receive (including concurrent and high‑volume scenarios), error paths (no connect, no subscribe), double subscribe/close, context cancellation, and the error channel accessor. This gives strong confidence in the new transport behavior.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
pkg/cloudevents/generic/options/v2/mqtt/transport.go (3)

41-41: Consider increasing errorChan buffer size to match past review recommendation.

The past review suggested a buffer size of 16 for errorChan, but the current implementation uses size 1. While this is safe due to the non-blocking send in OnClientError (lines 61-66), a larger buffer would reduce the likelihood of dropping error notifications during error bursts.

Apply this diff if you want to match the original suggestion:

-        errorChan:       make(chan error, 1),
+        errorChan:       make(chan error, 16),

Alternatively, document why size 1 is sufficient for your use case.


80-81: Consider making msgChan buffer size configurable.

The TODO comment indicates this is a known enhancement. If you expect high message throughput or want to avoid backpressure in AddOnPublishReceived (line 136), making this configurable would be beneficial.

Do you want me to open an issue to track making the buffer size configurable through MQTTOptions?


176-179: Optional cleanup: Unreachable code branch.

Since Close no longer closes msgChan (correctly avoiding the send-on-closed-channel race from the past review), the !ok check here will never trigger. While this defensive code is harmless, it's technically dead code and could be simplified to:

         case m, ok := <-t.msgChan:
-            if !ok {
-                return nil
-            }
             evt, err := binding.ToEvent(ctx, cloudeventsmqtt.NewMessage(m))

Or retain it as defensive programming against future changes. Either way, the current behavior is correct.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 28f45ca and 6395632.

📒 Files selected for processing (4)
  • pkg/cloudevents/generic/options/v2/mqtt/transport.go (1 hunks)
  • test/integration/cloudevents/certrotation_mqtt_test.go (2 hunks)
  • test/integration/cloudevents/cloudevents_resync_test.go (3 hunks)
  • test/integration/cloudevents/cloudevetns_mqtt_test.go (1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-11-11T13:27:36.331Z
Learnt from: morvencao
Repo: open-cluster-management-io/sdk-go PR: 162
File: pkg/cloudevents/generic/options/pubsub/options.go:157-174
Timestamp: 2025-11-11T13:27:36.331Z
Learning: For open-cluster-management PubSub transport in pkg/cloudevents/generic/options/pubsub: broadcast topics (SourceBroadcast, AgentBroadcast) and their corresponding subscriptions are always required, not optional. The omitempty tags on types.Topics broadcast fields exist because the struct is shared with MQTT (where broadcasts are optional), but PubSub requires all broadcast channels for resync functionality.

Applied to files:

  • test/integration/cloudevents/cloudevetns_mqtt_test.go
  • test/integration/cloudevents/certrotation_mqtt_test.go
  • test/integration/cloudevents/cloudevents_resync_test.go
  • pkg/cloudevents/generic/options/v2/mqtt/transport.go
📚 Learning: 2025-09-16T02:22:20.929Z
Learnt from: skeeey
Repo: open-cluster-management-io/sdk-go PR: 144
File: pkg/cloudevents/generic/options/grpc/protocol/protocol.go:200-213
Timestamp: 2025-09-16T02:22:20.929Z
Learning: In the GRPC CloudEvents protocol implementation, when startEventsReceiver encounters a stream error, it sends the error to reconnectErrorChan. The consumer of this channel handles the error by calling Close() on the protocol, which triggers close(p.closeChan), causing OpenInbound to unblock and call cancel() to properly terminate both the events receiver and heartbeat watcher goroutines.

Applied to files:

  • pkg/cloudevents/generic/options/v2/mqtt/transport.go
🧬 Code graph analysis (2)
test/integration/cloudevents/cloudevetns_mqtt_test.go (1)
pkg/cloudevents/generic/options/options.go (1)
  • CloudEventsSourceOptions (62-73)
pkg/cloudevents/generic/options/v2/mqtt/transport.go (2)
pkg/cloudevents/generic/options/mqtt/options.go (1)
  • MQTTOptions (69-78)
pkg/cloudevents/generic/options/options.go (1)
  • ReceiveHandlerFn (13-13)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: integration
  • GitHub Check: unit
  • GitHub Check: verify
🔇 Additional comments (11)
pkg/cloudevents/generic/options/v2/mqtt/transport.go (6)

57-66: LGTM! Non-blocking error callback correctly addresses past review.

The OnClientError callback now uses a buffered errorChan with a select-default pattern, preventing Paho internal goroutines from blocking on error reporting. The fallback log on the default case ensures errors aren't silently ignored when the buffer is full.


88-115: LGTM! Send implementation is correct.

The method properly validates connection state, retrieves the publish topic, converts the CloudEvent to an MQTT message, and publishes with appropriate error handling. The mutex ensures thread-safe client access.


134-143: LGTM! Intentional backpressure design prevents message loss.

The AddOnPublishReceived callback intentionally blocks when msgChan is full (no default case), applying backpressure to the MQTT client to ensure no messages are dropped. The closeChan check prevents indefinite blocking during shutdown. This design prioritizes reliability over throughput, which is appropriate for CloudEvents delivery.


191-193: LGTM! Proper read-only channel accessor.


195-218: LGTM! Close correctly avoids race condition from past review.

The implementation correctly addresses the past review concern by not closing msgChan, preventing the send-on-closed-channel race. Only closeChan is closed to signal shutdown, which is checked by both AddOnPublishReceived (line 138) and Receive (line 174). The double-close guard is appropriate defensive programming.


220-225: LGTM! Thread-safe accessor pattern.

test/integration/cloudevents/cloudevents_resync_test.go (3)

7-7: LGTM!

The import additions are correct. The time import is properly added and the mqttv2 alias for the v2 MQTT options package follows the convention used across other test files.

Also applies to: 24-24


60-64: LGTM!

The source client correctly migrates to v2 MQTT options using mqttv2.NewSourceOptions. The function parameters remain consistent and the change aligns with the v2 API migration.


56-56: The review comment is based on an incorrect understanding of the architecture.

The code is correct. The system is designed to accept v1 *mqtt.MQTTOptions at the API layer (via util.NewMQTTAgentOptions), which the builder then internally converts to v2 options via mqttv2.NewAgentOptions() (line 73 in optionsbuilder.go). Both the source and agent end up with v2 options after internal conversion. This is the intentional design pattern, not a sign of incomplete migration or v1/v2 interoperability testing.

Likely an incorrect or invalid review comment.

test/integration/cloudevents/certrotation_mqtt_test.go (1)

13-13: LGTM!

The migration from mqtt.NewAgentOptions to mqttv2.NewAgentOptions is correct. The import addition and function call update properly transition the agent options construction to the v2 API while maintaining the same parameters and behavior.

Also applies to: 22-22

test/integration/cloudevents/cloudevetns_mqtt_test.go (1)

11-11: LGTM!

The migration to v2 MQTT source options is clean and correct. The import change and function call update from mqtt.NewSourceOptions to mqttv2.NewSourceOptions properly transition to the v2 API while maintaining the same parameters.

Also applies to: 18-22

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (3)

26-49: Broker test helper is good, but startup/shutdown are a bit brittle

The in-process MochiMQTT setup is nice and self-contained, but two things could reduce flakiness and surprises:

  • Startup readiness: a fixed time.Sleep(100 * time.Millisecond) after Serve() may be marginal on slower CI; consider a small retry loop that dials the broker or uses a readiness signal instead of a hard sleep.
  • Fixed port: using a hard-coded 127.0.0.1:11883 can conflict with other local brokers. If feasible, consider making the port configurable for tests or using an ephemeral port and wiring it into MQTTDialer.

These are not blockers, but tightening them would make the suite more resilient across environments.

Also applies to: 51-90


225-245: Direct string comparison on error messages is brittle

Several tests assert exact error strings (e.g., "transport not connected", "transport has already subscribed", "transport not subscribed"). This tightly couples tests to the precise wording and makes future refactors (wrapping with context, changing phrasing, using %w, etc.) painful.

If you control the implementation API, consider:

  • Defining sentinel errors (exported or at least package-level) and using errors.Is in tests, or
  • Loosening assertions to check for a key substring / condition instead of full err.Error() equality.

Not urgent, but worth considering to keep tests from being overly fragile.

Also applies to: 414-428, 430-458, 635-658


581-618: “Invalid event” test currently doesn’t exercise invalid input

TestTransportReceiveInvalidEvent sets up a subscriber and a receive loop, then sleeps and asserts the handler wasn’t called. As written, no MQTT messages—valid or invalid—are ever published, so this only verifies the idle path (no traffic ⇒ no handler invocations), not the “invalid event” handling logic.

If the intent is to cover conversion/validation failures, consider explicitly injecting a malformed MQTT message (or a message that your transport should reject as an invalid CloudEvent) and asserting that:

  • The handler is not called, and
  • The transport fails gracefully (e.g., maybe logs or sends something on the error channel, depending on design).

That would align the behavior with the test name and comment.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6395632 and 40066f5.

📒 Files selected for processing (1)
  • pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-11-11T13:27:36.331Z
Learnt from: morvencao
Repo: open-cluster-management-io/sdk-go PR: 162
File: pkg/cloudevents/generic/options/pubsub/options.go:157-174
Timestamp: 2025-11-11T13:27:36.331Z
Learning: For open-cluster-management PubSub transport in pkg/cloudevents/generic/options/pubsub: broadcast topics (SourceBroadcast, AgentBroadcast) and their corresponding subscriptions are always required, not optional. The omitempty tags on types.Topics broadcast fields exist because the struct is shared with MQTT (where broadcasts are optional), but PubSub requires all broadcast channels for resync functionality.

Applied to files:

  • pkg/cloudevents/generic/options/v2/mqtt/transport_test.go
🧬 Code graph analysis (1)
pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (1)
pkg/cloudevents/generic/options/mqtt/options.go (2)
  • MQTTOptions (69-78)
  • MQTTDialer (31-37)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: unit
  • GitHub Check: integration
  • GitHub Check: verify
🔇 Additional comments (2)
pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (2)

92-122: Core transport flow and load tests look solid

The helper constructors plus the connect/reconnect/send and no-loss tests form a coherent coverage story: they exercise basic lifecycle, concurrent receive, and high-volume publish/subscribe with per-message IDs and atomics. The patterns (contexts, defers, sync.Map, atomic counters) look correct and idiomatic for test code.

No changes needed here from my side.

Also applies to: 124-223, 247-412


327-412: Good coverage for no-loss behavior under load

TestTransportNoMessageLoss is a useful end-to-end check: it stresses send/receive with many messages, uses IDs to verify uniqueness and completeness, and waits with a bounded deadline. The use of sync.Map and atomic.Int32 avoids races cleanly.

Looks good as-is.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (5)
pkg/cloudevents/generic/options/mqtt/agentoptions.go (1)

114-147: Tighten error reporting and keep helper semantics focused

The helper logic for resync vs regular events and optional broadcast topics is reasonable and matches the documented topic patterns, but a couple of details can be improved:

  • The error from ParseCloudEventsType logs %s with eventType, which may be nil and is less informative than the raw type string.
  • The fallback when AgentBroadcast is empty intentionally uses AgentEvents with only the cluster placeholder filled, which effectively behaves as a best-effort, single-topic path and not a true “all sources” broadcast; that’s fine given the TODO, but worth keeping in mind.

You can make the error message clearer without changing behavior:

 func AgentPubTopic(ctx context.Context, o *MQTTOptions, clusterName string, evtCtx cloudevents.EventContext) (string, error) {
@@
-	eventType, err := types.ParseCloudEventsType(evtCtx.GetType())
-	if err != nil {
-		return "", fmt.Errorf("unsupported event type %s, %v", eventType, err)
-	}
+	eventTypeStr := evtCtx.GetType()
+	eventType, err := types.ParseCloudEventsType(eventTypeStr)
+	if err != nil {
+		return "", fmt.Errorf("unsupported event type %q: %v", eventTypeStr, err)
+	}

This preserves all routing behavior but produces clearer diagnostics when the event type is malformed.

Also applies to: 149-170

pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (1)

26-90: Solid end-to-end coverage; consider minor flakiness hardening

The in-process MochiMQTT broker plus helper wiring gives good end-to-end coverage of connect/reconnect, send/receive, and shutdown behavior. Two minor robustness points you might optionally address:

  • setupTestBroker relies on fixed sleeps (100ms for startup, 200ms before shutdown) and a fixed port (127.0.0.1:11883), which can cause flakes on slow CI or when the port is already in use. If this becomes an issue, consider a readiness probe (e.g., a small MQTT client ping) and/or choosing a free port dynamically.
  • For helpers like createTestTransport, marking them as t.Helper()-style equivalents isn’t available in plain functions, but documenting that they’re test-only utilities can help future maintainers.

Overall, the scenarios exercised here align well with the v2 transport’s intended behavior.

Also applies to: 92-123, 124-658

pkg/cloudevents/generic/options/v2/mqtt/transport.go (2)

37-45: Error channel behavior is safe now; consider a slightly larger buffer

Using a buffered errorChan and a non-blocking send in OnClientError avoids the deadlock risk that existed with an unbuffered channel and keeps Paho’s internals from being blocked by slow consumers. With a buffer size of 1, bursts of errors will be dropped quickly; if you expect intermittent spikes, you might want a slightly larger buffer or a configurable size:

 func newTransport(clientID string, opts *mqtt.MQTTOptions, pubTopicGetter pubTopicGetter, subscribeGetter subscribeGetter) *mqttTransport {
 	return &mqttTransport{
 		opts:            opts,
 		clientID:        clientID,
-		errorChan:       make(chan error, 1),
+		// Small buffer to avoid blocking Paho callbacks; size can be tuned if needed.
+		errorChan:       make(chan error, 16),

The ErrorChan accessor correctly exposes a receive-only view, so this change is internal-only.

Also applies to: 47-86, 191-193


88-157: Transport lifecycle and concurrency look correct; locking is conservative but safe

The send/subscribe/receive/close lifecycle is coherent:

  • Send and Subscribe correctly guard on t.client == nil and t.subscribed and return clear errors ("transport not connected", "transport has already subscribed"), which are exercised in tests.
  • AddOnPublishReceived now only checks closeChan (never closing msgChan), avoiding the earlier send-on-closed-channel race; Receive exits cleanly on ctx.Done, closeChan close, or msgChan close (the latter doesn’t currently happen).
  • Close only idempotently closes closeChan, resets subscribed, and calls Disconnect, which aligns with the tests for double-close and close-while-receiving.

The use of mu around Send, Subscribe, Connect, and Close is conservative but acceptable; it serializes those operations, which is usually fine for control-plane traffic. If you later need higher publish concurrency, you could narrow the critical sections in Send to just the connection checks and topic resolution, releasing the lock before Publish, but that’s not required for correctness right now.

Also applies to: 159-189, 195-218, 220-225

pkg/cloudevents/generic/options/mqtt/sourceoptions.go (1)

118-142: Clarify error messages in SourcePubTopic/SourceSubscribe

The routing behavior in SourcePubTopic (broadcast vs per-cluster events) and the validation guard in SourceSubscribe both look correct and align with the documented topic formats. A couple of small tweaks would improve debuggability:

  • As in the agent helper, ParseCloudEventsType error reporting should use the raw type string, not the parsed struct pointer.
  • The mismatch error in SourceSubscribe labels the value as “topic source” but prints the full topic string instead of the parsed topicSource.

You can address both with minimal changes:

 func SourcePubTopic(ctx context.Context, o *MQTTOptions, sourceID string, evtCtx cloudevents.EventContext) (string, error) {
-	eventType, err := types.ParseCloudEventsType(evtCtx.GetType())
-	if err != nil {
-		return "", fmt.Errorf("unsupported event type %s, %v", eventType, err)
-	}
+	eventTypeStr := evtCtx.GetType()
+	eventType, err := types.ParseCloudEventsType(eventTypeStr)
+	if err != nil {
+		return "", fmt.Errorf("unsupported event type %q: %v", eventTypeStr, err)
+	}
@@
 func SourceSubscribe(o *MQTTOptions, sourceID string) (*paho.Subscribe, error) {
 	topicSource, err := getSourceFromEventsTopic(o.Topics.AgentEvents)
@@
 	if topicSource != sourceID {
-		return nil, fmt.Errorf("the topic source %q does not match with the client sourceID %q",
-			o.Topics.AgentEvents, sourceID)
+		return nil, fmt.Errorf("the topic source %q does not match client sourceID %q",
+			topicSource, sourceID)
 	}

These changes keep behavior the same while producing clearer diagnostics when something is misconfigured.

Also applies to: 144-172

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 40066f5 and 6343644.

📒 Files selected for processing (10)
  • pkg/cloudevents/generic/options/builder/optionsbuilder.go (3 hunks)
  • pkg/cloudevents/generic/options/builder/optionsbuilder_test.go (1 hunks)
  • pkg/cloudevents/generic/options/mqtt/agentoptions.go (3 hunks)
  • pkg/cloudevents/generic/options/mqtt/sourceoptions.go (3 hunks)
  • pkg/cloudevents/generic/options/v2/mqtt/options.go (1 hunks)
  • pkg/cloudevents/generic/options/v2/mqtt/transport.go (1 hunks)
  • pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (1 hunks)
  • test/integration/cloudevents/certrotation_mqtt_test.go (2 hunks)
  • test/integration/cloudevents/cloudevents_resync_test.go (3 hunks)
  • test/integration/cloudevents/cloudevetns_mqtt_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
  • pkg/cloudevents/generic/options/v2/mqtt/options.go
  • test/integration/cloudevents/cloudevents_resync_test.go
  • pkg/cloudevents/generic/options/builder/optionsbuilder_test.go
  • pkg/cloudevents/generic/options/builder/optionsbuilder.go
  • test/integration/cloudevents/certrotation_mqtt_test.go
  • test/integration/cloudevents/cloudevetns_mqtt_test.go
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-11-11T13:27:36.331Z
Learnt from: morvencao
Repo: open-cluster-management-io/sdk-go PR: 162
File: pkg/cloudevents/generic/options/pubsub/options.go:157-174
Timestamp: 2025-11-11T13:27:36.331Z
Learning: For open-cluster-management PubSub transport in pkg/cloudevents/generic/options/pubsub: broadcast topics (SourceBroadcast, AgentBroadcast) and their corresponding subscriptions are always required, not optional. The omitempty tags on types.Topics broadcast fields exist because the struct is shared with MQTT (where broadcasts are optional), but PubSub requires all broadcast channels for resync functionality.

Applied to files:

  • pkg/cloudevents/generic/options/v2/mqtt/transport_test.go
  • pkg/cloudevents/generic/options/mqtt/sourceoptions.go
  • pkg/cloudevents/generic/options/v2/mqtt/transport.go
  • pkg/cloudevents/generic/options/mqtt/agentoptions.go
📚 Learning: 2025-09-16T02:22:20.929Z
Learnt from: skeeey
Repo: open-cluster-management-io/sdk-go PR: 144
File: pkg/cloudevents/generic/options/grpc/protocol/protocol.go:200-213
Timestamp: 2025-09-16T02:22:20.929Z
Learning: In the GRPC CloudEvents protocol implementation, when startEventsReceiver encounters a stream error, it sends the error to reconnectErrorChan. The consumer of this channel handles the error by calling Close() on the protocol, which triggers close(p.closeChan), causing OpenInbound to unblock and call cancel() to properly terminate both the events receiver and heartbeat watcher goroutines.

Applied to files:

  • pkg/cloudevents/generic/options/v2/mqtt/transport.go
🧬 Code graph analysis (4)
pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (1)
pkg/cloudevents/generic/options/mqtt/options.go (2)
  • MQTTOptions (69-78)
  • MQTTDialer (31-37)
pkg/cloudevents/generic/options/mqtt/sourceoptions.go (2)
pkg/cloudevents/generic/options/mqtt/options.go (1)
  • MQTTOptions (69-78)
pkg/cloudevents/generic/types/types.go (5)
  • ParseCloudEventsType (209-230)
  • ExtensionClusterName (74-74)
  • ResyncRequestAction (41-41)
  • ClusterAll (18-18)
  • Topics (112-148)
pkg/cloudevents/generic/options/v2/mqtt/transport.go (2)
pkg/cloudevents/generic/options/mqtt/options.go (1)
  • MQTTOptions (69-78)
pkg/cloudevents/generic/options/options.go (1)
  • ReceiveHandlerFn (13-13)
pkg/cloudevents/generic/options/mqtt/agentoptions.go (2)
pkg/cloudevents/generic/options/mqtt/options.go (1)
  • MQTTOptions (69-78)
pkg/cloudevents/generic/types/types.go (5)
  • ParseCloudEventsType (209-230)
  • ExtensionOriginalSource (77-77)
  • ResyncRequestAction (41-41)
  • SourceAll (22-22)
  • Topics (112-148)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: integration
  • GitHub Check: unit
  • GitHub Check: verify
🔇 Additional comments (2)
pkg/cloudevents/generic/options/mqtt/agentoptions.go (1)

27-41: v1 agent options deprecation and wiring look consistent with v2 path

NewAgentOptions is clearly marked deprecated and still returns CloudEventsAgentOptions backed by mqttAgentTransport. WithContext correctly prefers an explicit topic from getAgentPubTopic and otherwise defers to AgentPubTopic, while Connect now centralizes subscription construction via AgentSubscribe. This keeps v1 behavior intact while enabling v2 reuse of the helpers.

Also applies to: 43-59, 61-70

pkg/cloudevents/generic/options/mqtt/sourceoptions.go (1)

26-39: v1 source options deprecation and helper wiring are consistent

NewSourceOptions is marked deprecated in favor of v2.mqtt.NewSourceOptions but still returns a fully wired CloudEventsSourceOptions. WithContext now cleanly delegates topic derivation to SourcePubTopic, and Connect uses SourceSubscribe to build the subscription set, which keeps topic logic centralized and reusable by the v2 wiring.

Also applies to: 41-57, 59-84

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (1)

50-412: Strong coverage for v2 MQTT transport; consider tightening the “invalid event” test

The new MochiMQTT-based tests give good confidence in connect/reconnect, send, subscribe/receive, shutdown, and error-path behaviors—nice work.

One minor point: TestTransportReceiveInvalidEvent currently never publishes any malformed MQTT payloads, so it effectively just asserts that the handler is not called when no messages arrive. If you want to truly exercise the binding.ToEvent error path, consider publishing a deliberately invalid CloudEvents payload on the subscribed topic (e.g., non-JSON bytes or a message missing required CloudEvents attributes) and confirming that:

  • Receive does not call the handler, and
  • the test still completes cleanly under the existing timeout.

This would better align the test name with its behavior and validate the invalid-event handling logic more directly.

Also applies to: 581-618

pkg/cloudevents/generic/options/v2/mqtt/transport.go (1)

20-45: Internal v2 MQTT transport design and concurrency/shutdown behavior look solid

The new mqttTransport is generally well-structured:

  • RWMutex usage cleanly separates control-plane operations (Connect, Subscribe, Close) from the data-plane receive loop.
  • closeChan is now the sole shutdown signal; msgChan remains open, which avoids the previous send-on-closed-channel race while still letting Receive exit via ctx.Done() or closeChan.
  • The buffered msgChan and AddOnPublishReceived callback correctly favor backpressure over silent drops, only discarding messages when shutting down.
  • ErrorChan is decoupled from Paho internals via a small buffer and non-blocking send with a log fallback, which avoids deadlocks if callers lag in draining errors.
  • Tests in transport_test.go exercise connect/reconnect, double close, close-while-receiving, context cancellation, and missing-connect/subscribe error paths and all match the implementation.

One small robustness improvement you might consider: defensively guard against misconfiguration of opts / opts.Dialer in Connect to fail fast with a clear error instead of a nil-pointer panic, for example:

func (t *mqttTransport) Connect(ctx context.Context) error {
    t.mu.Lock()
    defer t.mu.Unlock()

    logger := klog.FromContext(ctx)
+   if t.opts == nil || t.opts.Dialer == nil {
+       return fmt.Errorf("mqtt transport misconfigured: Dialer must be non-nil")
+   }

    tcpConn, err := t.opts.Dialer.Dial()
    if err != nil {
        return err
    }
    ...
}

This is optional since callers are expected to construct MQTTOptions correctly, but it can make configuration errors easier to diagnose.

Also applies to: 47-89, 91-160, 162-221, 223-228

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6343644 and d49d12a.

📒 Files selected for processing (11)
  • pkg/cloudevents/generic/options/builder/optionsbuilder.go (3 hunks)
  • pkg/cloudevents/generic/options/builder/optionsbuilder_test.go (1 hunks)
  • pkg/cloudevents/generic/options/mqtt/agentoptions.go (3 hunks)
  • pkg/cloudevents/generic/options/mqtt/logger.go (2 hunks)
  • pkg/cloudevents/generic/options/mqtt/sourceoptions.go (3 hunks)
  • pkg/cloudevents/generic/options/v2/mqtt/options.go (1 hunks)
  • pkg/cloudevents/generic/options/v2/mqtt/transport.go (1 hunks)
  • pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (1 hunks)
  • test/integration/cloudevents/certrotation_mqtt_test.go (2 hunks)
  • test/integration/cloudevents/cloudevents_resync_test.go (3 hunks)
  • test/integration/cloudevents/cloudevetns_mqtt_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
  • pkg/cloudevents/generic/options/builder/optionsbuilder.go
  • test/integration/cloudevents/cloudevents_resync_test.go
  • pkg/cloudevents/generic/options/builder/optionsbuilder_test.go
  • pkg/cloudevents/generic/options/v2/mqtt/options.go
  • test/integration/cloudevents/certrotation_mqtt_test.go
  • test/integration/cloudevents/cloudevetns_mqtt_test.go
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-11-11T13:27:36.331Z
Learnt from: morvencao
Repo: open-cluster-management-io/sdk-go PR: 162
File: pkg/cloudevents/generic/options/pubsub/options.go:157-174
Timestamp: 2025-11-11T13:27:36.331Z
Learning: For open-cluster-management PubSub transport in pkg/cloudevents/generic/options/pubsub: broadcast topics (SourceBroadcast, AgentBroadcast) and their corresponding subscriptions are always required, not optional. The omitempty tags on types.Topics broadcast fields exist because the struct is shared with MQTT (where broadcasts are optional), but PubSub requires all broadcast channels for resync functionality.

Applied to files:

  • pkg/cloudevents/generic/options/v2/mqtt/transport_test.go
  • pkg/cloudevents/generic/options/mqtt/sourceoptions.go
  • pkg/cloudevents/generic/options/mqtt/agentoptions.go
  • pkg/cloudevents/generic/options/v2/mqtt/transport.go
📚 Learning: 2025-09-16T02:22:20.929Z
Learnt from: skeeey
Repo: open-cluster-management-io/sdk-go PR: 144
File: pkg/cloudevents/generic/options/grpc/protocol/protocol.go:200-213
Timestamp: 2025-09-16T02:22:20.929Z
Learning: In the GRPC CloudEvents protocol implementation, when startEventsReceiver encounters a stream error, it sends the error to reconnectErrorChan. The consumer of this channel handles the error by calling Close() on the protocol, which triggers close(p.closeChan), causing OpenInbound to unblock and call cancel() to properly terminate both the events receiver and heartbeat watcher goroutines.

Applied to files:

  • pkg/cloudevents/generic/options/v2/mqtt/transport.go
📚 Learning: 2025-09-16T06:03:45.232Z
Learnt from: skeeey
Repo: open-cluster-management-io/sdk-go PR: 144
File: pkg/cloudevents/server/grpc/broker.go:169-194
Timestamp: 2025-09-16T06:03:45.232Z
Learning: In the Open Cluster Management gRPC CloudEvents implementation, client reconnections due to heartbeat timeout are considered acceptable behavior by the maintainer skeeey, rather than something that needs optimization to prevent.

Applied to files:

  • pkg/cloudevents/generic/options/v2/mqtt/transport.go
🧬 Code graph analysis (4)
pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (1)
pkg/cloudevents/generic/options/mqtt/options.go (2)
  • MQTTOptions (69-78)
  • MQTTDialer (31-37)
pkg/cloudevents/generic/options/mqtt/sourceoptions.go (2)
pkg/cloudevents/generic/options/mqtt/options.go (1)
  • MQTTOptions (69-78)
pkg/cloudevents/generic/types/types.go (5)
  • ParseCloudEventsType (209-230)
  • ExtensionClusterName (74-74)
  • ResyncRequestAction (41-41)
  • ClusterAll (18-18)
  • Topics (112-148)
pkg/cloudevents/generic/options/mqtt/agentoptions.go (2)
pkg/cloudevents/generic/options/mqtt/options.go (1)
  • MQTTOptions (69-78)
pkg/cloudevents/generic/types/types.go (5)
  • ParseCloudEventsType (209-230)
  • ExtensionOriginalSource (77-77)
  • ResyncRequestAction (41-41)
  • SourceAll (22-22)
  • Topics (112-148)
pkg/cloudevents/generic/options/v2/mqtt/transport.go (3)
pkg/cloudevents/generic/options/mqtt/options.go (1)
  • MQTTOptions (69-78)
pkg/cloudevents/generic/options/mqtt/logger.go (2)
  • NewPahoDebugLogger (22-24)
  • NewPahoErrorLogger (18-20)
pkg/cloudevents/generic/options/options.go (1)
  • ReceiveHandlerFn (13-13)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: integration
  • GitHub Check: unit
  • GitHub Check: verify
🔇 Additional comments (4)
pkg/cloudevents/generic/options/mqtt/logger.go (1)

18-24: Constructors for Paho loggers look correct

The new NewPahoErrorLogger / NewPahoDebugLogger helpers cleanly wrap the existing logger types and return the log.Logger interface, keeping call sites simple. No issues from an API or behavior perspective.

pkg/cloudevents/generic/options/mqtt/sourceoptions.go (2)

26-39: Deprecation wrapper preserves behavior while steering users to v2

NewSourceOptions correctly wraps mqttSourceTransport and marks the v1 constructor as deprecated while preserving existing behavior, which is important for compatibility during the v1→v2 migration.


51-57: Fix incorrect fmt.Errorf format and slightly tighten error reporting in topic helpers

  1. In SourcePubTopic, the error formatting is incorrect:
eventType, err := types.ParseCloudEventsType(evtCtx.GetType())
if err != nil {
    return "", fmt.Errorf("unsupported event type %s, %v", eventType, err)
}

eventType is a *types.CloudEventsType, which does not match the %s verb and will cause a formatting panic on the error path. Use the raw type string or %v instead:

- eventType, err := types.ParseCloudEventsType(evtCtx.GetType())
- if err != nil {
-     return "", fmt.Errorf("unsupported event type %s, %v", eventType, err)
- }
+ ceType := evtCtx.GetType()
+ eventType, err := types.ParseCloudEventsType(ceType)
+ if err != nil {
+     return "", fmt.Errorf("unsupported event type %q, %v", ceType, err)
+ }
  1. In SourceSubscribe, the mismatch error message currently prints the whole topic string rather than the parsed topicSource, which can be confusing:
if topicSource != sourceID {
    return nil, fmt.Errorf("the topic source %q does not match with the client sourceID %q",
        o.Topics.AgentEvents, sourceID)
}

Consider using topicSource so the message reflects the actual parsed source:

- return nil, fmt.Errorf("the topic source %q does not match with the client sourceID %q",
-     o.Topics.AgentEvents, sourceID)
+ return nil, fmt.Errorf("the topic source %q does not match with the client sourceID %q",
+     topicSource, sourceID)

These changes avoid a runtime panic on malformed CloudEvent types and make diagnostics clearer while keeping the overall topic logic intact.

Also applies to: 118-142, 144-172

⛔ Skipped due to learnings
Learnt from: morvencao
Repo: open-cluster-management-io/sdk-go PR: 162
File: pkg/cloudevents/generic/options/pubsub/options.go:157-174
Timestamp: 2025-11-11T13:27:36.331Z
Learning: For open-cluster-management PubSub transport in pkg/cloudevents/generic/options/pubsub: broadcast topics (SourceBroadcast, AgentBroadcast) and their corresponding subscriptions are always required, not optional. The omitempty tags on types.Topics broadcast fields exist because the struct is shared with MQTT (where broadcasts are optional), but PubSub requires all broadcast channels for resync functionality.
pkg/cloudevents/generic/options/mqtt/agentoptions.go (1)

27-41: Agent v1 wrapper aligns with v2 migration and preserves existing semantics

NewAgentOptions correctly wires mqttAgentTransport into CloudEventsAgentOptions and is clearly marked deprecated, which keeps existing consumers working while nudging new code toward v2.mqtt.NewAgentOptions.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
pkg/cloudevents/generic/options/mqtt/agentoptions.go (1)

114-148: Error formatting is correct; topic derivation logic is sound.

The error formatting issue flagged in the previous review has been resolved. Lines 117-121 now correctly capture ceType first and use it with %q in the error message, avoiding the pointer formatting bug.

The topic derivation handles resync broadcasts (with fallback) and normal events appropriately. The logger is used only when falling back (line 131), which is appropriate for informational context.

🧹 Nitpick comments (4)
pkg/cloudevents/generic/options/mqtt/logger.go (1)

18-24: Constructor helpers look good; consider adding GoDoc comments

The new constructors are straightforward wrappers around the existing logger types and keep behavior unchanged. To keep exported symbols consistent with Go style tools, consider adding brief doc comments for NewPahoErrorLogger and NewPahoDebugLogger.

pkg/cloudevents/generic/options/v2/mqtt/transport.go (1)

37-45: Buffered error channel and non‑blocking error callback look sound; consider documenting semantics

The constructor now creates a buffered errorChan and the client error callback uses a non‑blocking send with fallback logging. This avoids blocking internal client goroutines while still surfacing errors, which is a good trade‑off. It would help callers if ErrorChan() (or the type) documented that delivery is best‑effort and errors may be dropped when the buffer is full.

Also applies to: 57-66

pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (1)

581-618: TestTransportReceiveInvalidEvent doesn’t exercise the invalid‑event path

This test just starts Receive and waits; no MQTT publishes are sent, so the handler never runs regardless of whether the invalid‑event branch in Receive works. If you want to validate that branch (where an invalid event is logged and skipped), consider injecting a deliberately malformed publish via a separate client or transport instance, or otherwise feeding a non‑CloudEvent payload onto the subscribed topic.

pkg/cloudevents/generic/options/mqtt/sourceoptions.go (1)

118-143: Topic derivation logic is sound; consider explicit type assertion.

The error formatting correctly uses ceType with %q (the bug from previous reviews has been fixed). The topic derivation handles resync-all broadcasts and normal events appropriately.

Minor: Line 141 uses fmt.Sprintf("%s", clusterName) to convert the interface{} returned by GetExtension to string. While safe, an explicit type assertion would be clearer:

clusterNameStr, ok := clusterName.(string)
if !ok {
    return "", fmt.Errorf("clustername extension is not a string")
}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d49d12a and e2e4ea5.

📒 Files selected for processing (11)
  • pkg/cloudevents/generic/options/builder/optionsbuilder.go (3 hunks)
  • pkg/cloudevents/generic/options/builder/optionsbuilder_test.go (1 hunks)
  • pkg/cloudevents/generic/options/mqtt/agentoptions.go (3 hunks)
  • pkg/cloudevents/generic/options/mqtt/logger.go (2 hunks)
  • pkg/cloudevents/generic/options/mqtt/sourceoptions.go (3 hunks)
  • pkg/cloudevents/generic/options/v2/mqtt/options.go (1 hunks)
  • pkg/cloudevents/generic/options/v2/mqtt/transport.go (1 hunks)
  • pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (1 hunks)
  • test/integration/cloudevents/certrotation_mqtt_test.go (2 hunks)
  • test/integration/cloudevents/cloudevents_resync_test.go (3 hunks)
  • test/integration/cloudevents/cloudevetns_mqtt_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • test/integration/cloudevents/certrotation_mqtt_test.go
  • test/integration/cloudevents/cloudevents_resync_test.go
  • pkg/cloudevents/generic/options/builder/optionsbuilder.go
  • test/integration/cloudevents/cloudevetns_mqtt_test.go
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-11-11T13:27:36.331Z
Learnt from: morvencao
Repo: open-cluster-management-io/sdk-go PR: 162
File: pkg/cloudevents/generic/options/pubsub/options.go:157-174
Timestamp: 2025-11-11T13:27:36.331Z
Learning: For open-cluster-management PubSub transport in pkg/cloudevents/generic/options/pubsub: broadcast topics (SourceBroadcast, AgentBroadcast) and their corresponding subscriptions are always required, not optional. The omitempty tags on types.Topics broadcast fields exist because the struct is shared with MQTT (where broadcasts are optional), but PubSub requires all broadcast channels for resync functionality.

Applied to files:

  • pkg/cloudevents/generic/options/builder/optionsbuilder_test.go
  • pkg/cloudevents/generic/options/v2/mqtt/transport.go
  • pkg/cloudevents/generic/options/v2/mqtt/options.go
  • pkg/cloudevents/generic/options/mqtt/sourceoptions.go
  • pkg/cloudevents/generic/options/mqtt/agentoptions.go
  • pkg/cloudevents/generic/options/v2/mqtt/transport_test.go
📚 Learning: 2025-09-16T02:22:20.929Z
Learnt from: skeeey
Repo: open-cluster-management-io/sdk-go PR: 144
File: pkg/cloudevents/generic/options/grpc/protocol/protocol.go:200-213
Timestamp: 2025-09-16T02:22:20.929Z
Learning: In the GRPC CloudEvents protocol implementation, when startEventsReceiver encounters a stream error, it sends the error to reconnectErrorChan. The consumer of this channel handles the error by calling Close() on the protocol, which triggers close(p.closeChan), causing OpenInbound to unblock and call cancel() to properly terminate both the events receiver and heartbeat watcher goroutines.

Applied to files:

  • pkg/cloudevents/generic/options/v2/mqtt/transport.go
📚 Learning: 2025-09-16T06:03:45.232Z
Learnt from: skeeey
Repo: open-cluster-management-io/sdk-go PR: 144
File: pkg/cloudevents/server/grpc/broker.go:169-194
Timestamp: 2025-09-16T06:03:45.232Z
Learning: In the Open Cluster Management gRPC CloudEvents implementation, client reconnections due to heartbeat timeout are considered acceptable behavior by the maintainer skeeey, rather than something that needs optimization to prevent.

Applied to files:

  • pkg/cloudevents/generic/options/v2/mqtt/transport.go
🧬 Code graph analysis (5)
pkg/cloudevents/generic/options/v2/mqtt/transport.go (3)
pkg/cloudevents/generic/options/mqtt/options.go (1)
  • MQTTOptions (69-78)
pkg/cloudevents/generic/options/mqtt/logger.go (2)
  • NewPahoDebugLogger (22-24)
  • NewPahoErrorLogger (18-20)
pkg/cloudevents/generic/options/options.go (1)
  • ReceiveHandlerFn (13-13)
pkg/cloudevents/generic/options/v2/mqtt/options.go (4)
pkg/cloudevents/generic/options/mqtt/agentoptions.go (3)
  • NewAgentOptions (28-41)
  • AgentPubTopic (114-148)
  • AgentSubscribe (150-171)
pkg/cloudevents/generic/options/mqtt/options.go (1)
  • MQTTOptions (69-78)
pkg/cloudevents/generic/options/options.go (2)
  • CloudEventsAgentOptions (76-89)
  • CloudEventsSourceOptions (62-73)
pkg/cloudevents/generic/options/mqtt/sourceoptions.go (3)
  • NewSourceOptions (27-39)
  • SourcePubTopic (118-143)
  • SourceSubscribe (145-173)
pkg/cloudevents/generic/options/mqtt/sourceoptions.go (2)
pkg/cloudevents/generic/options/mqtt/options.go (1)
  • MQTTOptions (69-78)
pkg/cloudevents/generic/types/types.go (5)
  • ParseCloudEventsType (209-230)
  • ExtensionClusterName (74-74)
  • ResyncRequestAction (41-41)
  • ClusterAll (18-18)
  • Topics (112-148)
pkg/cloudevents/generic/options/mqtt/agentoptions.go (2)
pkg/cloudevents/generic/options/mqtt/options.go (1)
  • MQTTOptions (69-78)
pkg/cloudevents/generic/types/types.go (5)
  • ParseCloudEventsType (209-230)
  • ExtensionOriginalSource (77-77)
  • ResyncRequestAction (41-41)
  • SourceAll (22-22)
  • Topics (112-148)
pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (1)
pkg/cloudevents/generic/options/mqtt/options.go (2)
  • MQTTOptions (69-78)
  • MQTTDialer (31-37)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: unit
  • GitHub Check: integration
  • GitHub Check: verify
🔇 Additional comments (6)
pkg/cloudevents/generic/options/builder/optionsbuilder_test.go (1)

59-59: Updated transport type expectation matches v2 mqttTransport

The expectedTransportType change to *mqtt.mqttTransport aligns with the new v2 MQTT transport returned by BuildCloudEventsSourceOptions; the reflect-based assertion still does the right thing here.

pkg/cloudevents/generic/options/v2/mqtt/transport.go (1)

162-221: Receive/Close coordination cleanly avoids send‑on‑closed‑channel races

The Receive loop exits on ctx.Done() or closeChan, and the Close method now only closes closeChan (leaving msgChan open) while resetting subscribed and disconnecting the client. Combined with Subscribe’s use of getCloseChan() in the publish callback, this avoids the previous risk of send on closed channel while still unblocking receivers on shutdown. The lifecycle behavior here looks correct.

pkg/cloudevents/generic/options/v2/mqtt/options.go (1)

13-44: v2 options correctly reuse v1 topic/subscription helpers to preserve semantics

NewAgentOptions and NewSourceOptions delegate topic and subscription calculation to the existing mqtt.AgentPubTopic/AgentSubscribe and mqtt.SourcePubTopic/SourceSubscribe helpers while swapping in the new shared transport. That keeps the MQTT topic layout and broadcast/resync behavior consistent with the v1 options, while modernizing the transport plumbing. Looks correct. Based on learnings

pkg/cloudevents/generic/options/mqtt/sourceoptions.go (2)

26-39: LGTM: Clear deprecation with migration path.

The deprecation comment correctly directs users to the v2 API while maintaining backward compatibility.


145-173: LGTM: Good defensive validation of topic configuration.

The helper correctly validates that the AgentEvents topic source matches the sourceID before building the subscription. The optional AgentBroadcast handling is appropriate for MQTT (where broadcasts are optional, unlike PubSub).

pkg/cloudevents/generic/options/mqtt/agentoptions.go (1)

27-41: LGTM: Clear deprecation with migration path.

The deprecation comment correctly directs users to the v2 API while maintaining backward compatibility.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
pkg/cloudevents/generic/options/mqtt/agentoptions.go (1)

119-176: Agent topic and subscription helpers look consistent; consider future validation

AgentPubTopic correctly:

  • Parses the CloudEvents type using ParseCloudEventsType.
  • Special-cases resync-all requests (ResyncRequestAction + SourceAll) to use AgentBroadcast when configured, with a fallback to AgentEvents.
  • Derives the concrete AgentEvents topic by first substituting cluster name and then source via getSourceFromEventsTopic.

AgentSubscribe now matches the v2 pattern by returning (*paho.Subscribe, error) and wiring both SourceEvents and optional SourceBroadcast subscriptions, but it currently never returns a non‑nil error.

For API consistency with SourceSubscribe and to catch misconfigured SourceEvents topics early, consider adding basic validation here too (e.g., validating the topic structure against clusterName or expected wildcards), and only then returning subscribe, nil. This mirrors an earlier recommendation and can be done later without breaking callers since the error path is already handled.

🧹 Nitpick comments (7)
pkg/cloudevents/generic/options/mqtt/sourceoptions.go (1)

118-173: Source topic/subscription helpers are coherent; minor nits on usage and error text

  • SourcePubTopic:

    • Parses the CloudEvents type and checks for ResyncRequestAction.
    • Uses the clustername extension to distinguish “resync all agents” (ClusterAll) from per-cluster events.
    • For resync-all, it correctly requires SourceBroadcast and substitutes sourceID into the broadcast topic.
    • For normal events, it substitutes clusterName into SourceEvents.

    Note: in the normal path, sourceID is currently unused. This is fine as long as o.Topics.SourceEvents is configured to already encode the source ID (e.g., sources/<sourceID>/clusters/+/sourceevents). If callers ever rely on SourceEvents containing a + for the source segment as well, you’d want to incorporate sourceID there too to avoid publishing to wildcard topics. Worth double‑checking your current topic configuration before changing anything.

  • SourceSubscribe:

    • Extracts topicSource from AgentEvents and verifies it matches sourceID, which is a good early guardrail.
    • Builds subscriptions for AgentEvents plus optional AgentBroadcast.

    Tiny nit: the error message uses o.Topics.AgentEvents in place of topicSource:

    return nil, fmt.Errorf("the topic source %q does not match with the client sourceID %q",
        o.Topics.AgentEvents, sourceID)

    Consider reporting topicSource (and optionally the full topic) so the message matches the “topic source” wording and is more actionable when debugging misconfigurations.

pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (2)

21-103: MochiMQTT test broker setup is reasonable; watch for timing flakiness

The in-process broker with allowAllHook, fixed host/port, and small sleeps (100ms before readiness, 200ms before shutdown) is pragmatic for package tests and the comments explain the shutdown delay. As long as tests in this package don’t use t.Parallel, this should remain reliable; if you later parallelize, you may want to move to per-test random ports or a single shared broker to avoid bind conflicts.


581-618: TestTransportReceiveInvalidEvent doesn’t currently create invalid events

This test verifies that the handler is never called, but no publisher injects malformed MQTT payloads or non‑CloudEvents messages on topic, so in practice it’s just asserting “no messages arrive”. If you really want to exercise invalid-event handling in the transport, consider adding a raw MQTT publisher that sends an invalid payload on the subscribed topic and then asserting the handler is not invoked.

pkg/cloudevents/generic/options/v2/mqtt/transport.go (4)

37-45: Consider increasing or making the error channel buffer configurable.

The errorChan buffer size of 1 is very small. If multiple errors occur in quick succession (e.g., connection issues, publish failures), the OnClientError callback will drop errors via the default case (line 64), and callers will only see the first error. While I understand from the previous review discussion that immediate reconnection is intended, a slightly larger buffer (e.g., 8-16) would give callers a better chance to observe error patterns without significantly delaying reconnection logic.

Consider this adjustment:

 func newTransport(clientID string, opts *mqtt.MQTTOptions, pubTopicGetter pubTopicGetter, subscribeGetter subscribeGetter) *mqttTransport {
     return &mqttTransport{
         opts:            opts,
         clientID:        clientID,
-        errorChan:       make(chan error, 1),
+        errorChan:       make(chan error, 8), // small buffer for error bursts while still enabling fast reconnection
         getPublishTopic: pubTopicGetter,
         getSubscribe:    subscribeGetter,
     }
 }

Alternatively, make the buffer size configurable via MQTTOptions if different use cases have different error-handling needs.


91-118: Consider using RWMutex read lock for Send to improve concurrency.

The Send method holds an exclusive lock (lines 92-93) during the entire publish operation, including network I/O at line 113. This serializes all concurrent sends and can become a bottleneck under moderate to high load.

Since Send only reads t.client and t.opts (both stable after Connect), you could use t.mu.RLock() instead of Lock(), reserving the write lock for Connect and Close. This would allow concurrent sends while still protecting against mid-send reconnection.

 func (t *mqttTransport) Send(ctx context.Context, evt cloudevents.Event) error {
-    t.mu.Lock()
-    defer t.mu.Unlock()
+    t.mu.RLock()
+    defer t.mu.RUnlock()

     if t.client == nil {
         return fmt.Errorf("transport not connected")
     }

This optimization is safe because Connect/Close use write locks and will block until all in-flight sends complete.


137-146: Document the blocking backpressure behavior for operators.

The AddOnPublishReceived callback intentionally blocks (line 145 comment: "will block until channel has space") when msgChan is full to prevent message loss. While this design choice prioritizes reliability, it means that a slow Receive consumer can stall Paho's internal goroutines, potentially affecting the entire MQTT client (e.g., heartbeat processing).

Consider adding user-facing documentation (e.g., in the package or type doc comment) that describes this backpressure behavior and advises operators to ensure Receive handlers are fast or offload work to separate goroutines.

Optionally, you could add a bounded timeout in the select to prevent indefinite blocking:

select {
case t.msgChan <- m.Packet:
    return true, nil
case <-t.getCloseChan():
    return false, fmt.Errorf("transport closed")
case <-time.After(5 * time.Second): // or configurable
    return false, fmt.Errorf("message buffer full, dropping message")
}

However, this would sacrifice the no-message-loss guarantee.


179-182: The !ok check on line 180 is unreachable.

Since Close (line 198-221) no longer closes t.msgChan (as addressed in the previous review), the channel-closed condition !ok at line 180 will never be true. The loop already exits via closeChan (line 177-178) during shutdown.

You can simplify lines 179-182 to:

-        case m, ok := <-t.msgChan:
-            if !ok {
-                return nil
-            }
+        case m := <-t.msgChan:
             evt, err := binding.ToEvent(ctx, cloudeventsmqtt.NewMessage(m))

This makes the shutdown path more explicit (only via closeChan or ctx.Done()).

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e2e4ea5 and ba09378.

📒 Files selected for processing (11)
  • pkg/cloudevents/generic/options/builder/optionsbuilder.go (3 hunks)
  • pkg/cloudevents/generic/options/builder/optionsbuilder_test.go (1 hunks)
  • pkg/cloudevents/generic/options/mqtt/agentoptions.go (3 hunks)
  • pkg/cloudevents/generic/options/mqtt/logger.go (2 hunks)
  • pkg/cloudevents/generic/options/mqtt/sourceoptions.go (3 hunks)
  • pkg/cloudevents/generic/options/v2/mqtt/options.go (1 hunks)
  • pkg/cloudevents/generic/options/v2/mqtt/transport.go (1 hunks)
  • pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (1 hunks)
  • test/integration/cloudevents/certrotation_mqtt_test.go (2 hunks)
  • test/integration/cloudevents/cloudevents_resync_test.go (3 hunks)
  • test/integration/cloudevents/cloudevetns_mqtt_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
  • pkg/cloudevents/generic/options/builder/optionsbuilder.go
  • pkg/cloudevents/generic/options/builder/optionsbuilder_test.go
  • test/integration/cloudevents/certrotation_mqtt_test.go
  • test/integration/cloudevents/cloudevents_resync_test.go
  • test/integration/cloudevents/cloudevetns_mqtt_test.go
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-11-11T13:27:36.331Z
Learnt from: morvencao
Repo: open-cluster-management-io/sdk-go PR: 162
File: pkg/cloudevents/generic/options/pubsub/options.go:157-174
Timestamp: 2025-11-11T13:27:36.331Z
Learning: For open-cluster-management PubSub transport in pkg/cloudevents/generic/options/pubsub: broadcast topics (SourceBroadcast, AgentBroadcast) and their corresponding subscriptions are always required, not optional. The omitempty tags on types.Topics broadcast fields exist because the struct is shared with MQTT (where broadcasts are optional), but PubSub requires all broadcast channels for resync functionality.

Applied to files:

  • pkg/cloudevents/generic/options/v2/mqtt/options.go
  • pkg/cloudevents/generic/options/mqtt/sourceoptions.go
  • pkg/cloudevents/generic/options/v2/mqtt/transport.go
  • pkg/cloudevents/generic/options/v2/mqtt/transport_test.go
  • pkg/cloudevents/generic/options/mqtt/agentoptions.go
📚 Learning: 2025-09-16T02:22:20.929Z
Learnt from: skeeey
Repo: open-cluster-management-io/sdk-go PR: 144
File: pkg/cloudevents/generic/options/grpc/protocol/protocol.go:200-213
Timestamp: 2025-09-16T02:22:20.929Z
Learning: In the GRPC CloudEvents protocol implementation, when startEventsReceiver encounters a stream error, it sends the error to reconnectErrorChan. The consumer of this channel handles the error by calling Close() on the protocol, which triggers close(p.closeChan), causing OpenInbound to unblock and call cancel() to properly terminate both the events receiver and heartbeat watcher goroutines.

Applied to files:

  • pkg/cloudevents/generic/options/v2/mqtt/transport.go
📚 Learning: 2025-09-16T06:03:45.232Z
Learnt from: skeeey
Repo: open-cluster-management-io/sdk-go PR: 144
File: pkg/cloudevents/server/grpc/broker.go:169-194
Timestamp: 2025-09-16T06:03:45.232Z
Learning: In the Open Cluster Management gRPC CloudEvents implementation, client reconnections due to heartbeat timeout are considered acceptable behavior by the maintainer skeeey, rather than something that needs optimization to prevent.

Applied to files:

  • pkg/cloudevents/generic/options/v2/mqtt/transport.go
🧬 Code graph analysis (5)
pkg/cloudevents/generic/options/v2/mqtt/options.go (4)
pkg/cloudevents/generic/options/mqtt/agentoptions.go (3)
  • NewAgentOptions (28-41)
  • AgentPubTopic (119-153)
  • AgentSubscribe (155-176)
pkg/cloudevents/generic/options/mqtt/options.go (1)
  • MQTTOptions (69-78)
pkg/cloudevents/generic/options/options.go (2)
  • CloudEventsAgentOptions (76-89)
  • CloudEventsSourceOptions (62-73)
pkg/cloudevents/generic/options/mqtt/sourceoptions.go (3)
  • NewSourceOptions (27-39)
  • SourcePubTopic (118-143)
  • SourceSubscribe (145-173)
pkg/cloudevents/generic/options/mqtt/sourceoptions.go (2)
pkg/cloudevents/generic/options/mqtt/options.go (1)
  • MQTTOptions (69-78)
pkg/cloudevents/generic/types/types.go (5)
  • ParseCloudEventsType (209-230)
  • ExtensionClusterName (74-74)
  • ResyncRequestAction (41-41)
  • ClusterAll (18-18)
  • Topics (112-148)
pkg/cloudevents/generic/options/v2/mqtt/transport.go (3)
pkg/cloudevents/generic/options/mqtt/options.go (1)
  • MQTTOptions (69-78)
pkg/cloudevents/generic/options/mqtt/logger.go (2)
  • NewPahoDebugLogger (22-24)
  • NewPahoErrorLogger (18-20)
pkg/cloudevents/generic/options/options.go (1)
  • ReceiveHandlerFn (13-13)
pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (1)
pkg/cloudevents/generic/options/mqtt/options.go (2)
  • MQTTOptions (69-78)
  • MQTTDialer (31-37)
pkg/cloudevents/generic/options/mqtt/agentoptions.go (2)
pkg/cloudevents/generic/options/mqtt/options.go (1)
  • MQTTOptions (69-78)
pkg/cloudevents/generic/types/types.go (5)
  • ParseCloudEventsType (209-230)
  • ExtensionOriginalSource (77-77)
  • ResyncRequestAction (41-41)
  • SourceAll (22-22)
  • Topics (112-148)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: integration
  • GitHub Check: verify
  • GitHub Check: unit
🔇 Additional comments (10)
pkg/cloudevents/generic/options/mqtt/logger.go (1)

18-24: New Paho logger constructors look correct

NewPahoErrorLogger and NewPahoDebugLogger simply wrap klog.Logger into the existing logger types and return the log.Logger interface; no behavior change and wiring is clear.

pkg/cloudevents/generic/options/mqtt/agentoptions.go (2)

27-40: Deprecated v1 NewAgentOptions wrapper is safe and self‑contained

The v1 NewAgentOptions wrapper still constructs a mqttAgentTransport and returns CloudEventsAgentOptions without extra side effects; the deprecation comment correctly points callers at v2.mqtt.NewAgentOptions.


43-65: WithContext/Connect refactor cleanly delegates to helpers

WithContext now prefers any pre-set topic from getAgentPubTopic, otherwise falls back to AgentPubTopic, and Connect now calls AgentSubscribe and propagates its error. This keeps the transport methods thin and centralizes topic/subscription logic without changing observable behavior.

pkg/cloudevents/generic/options/mqtt/sourceoptions.go (2)

26-39: Deprecated v1 NewSourceOptions wrapper is straightforward

The v1 NewSourceOptions continues to create an internal mqttSourceTransport and wrap it into CloudEventsSourceOptions. The deprecation comment clearly points callers towards the v2 constructor, and there’s no behavior change beyond the refactor.


41-84: Source transport now cleanly delegates topic/subscribe logic

WithContext and Connect now delegate topic computation and subscription construction to SourcePubTopic and SourceSubscribe, which keeps the transport methods simpler and aligns with how the agent side is structured. Error propagation remains straightforward.

pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (1)

124-412: Transport lifecycle and load tests provide solid coverage

The tests around connect/disconnect, reconnect, send, subscribe/receive (including the no-loss loop with 100 messages) exercise the v2 transport’s happy paths and some load behavior end‑to‑end against a real broker. The use of context.WithTimeout, atomic counters, and sync.Map is appropriate and should catch most regressions in basic messaging semantics.

pkg/cloudevents/generic/options/v2/mqtt/options.go (1)

13-44: v2 constructors cleanly reuse v1 MQTT helpers via newTransport

NewAgentOptions and NewSourceOptions in v2 wire newTransport with:

  • A client ID ("%s-client" for agents, clientID for sources),
  • The shared *mqtt.MQTTOptions,
  • A pub-topic function that delegates to mqtt.AgentPubTopic / mqtt.SourcePubTopic,
  • A subscribe function that delegates to mqtt.AgentSubscribe / mqtt.SourceSubscribe.

This keeps all MQTT topic semantics in the existing helpers while centralizing transport lifecycle in the v2 implementation, and the returned CloudEvents*Options correctly expose AgentID, ClusterName, and SourceID.

pkg/cloudevents/generic/options/v2/mqtt/transport.go (3)

1-18: LGTM!

The package structure, imports, and type aliases are well-organized. The pubTopicGetter and subscribeGetter function types provide clean abstraction for topic resolution and subscription configuration.


47-89: LGTM!

The Connect method properly handles connection establishment with good lifecycle management:

  • The OnClientError callback now uses non-blocking send with fallback logging (addresses previous review concern)
  • Channel initialization in Connect (lines 82, 84) correctly supports reconnection cycles
  • The TODO comment on line 83 appropriately flags that msgChan buffer size should be configurable

194-228: LGTM!

The ErrorChan, Close, and getCloseChan methods are correctly implemented:

  • Close properly guards against double-close of closeChan (lines 209-215)
  • msgChan is intentionally left open (previous review addressed the send-on-closed-channel race)
  • getCloseChan uses RLock for safe concurrent reads
  • The approach of not closing msgChan and relying solely on closeChan for shutdown signaling is the correct pattern here

@skeeey
Copy link
Member Author

skeeey commented Nov 20, 2025

/unhold

@skeeey
Copy link
Member Author

skeeey commented Nov 20, 2025

/assign @qiujian16

}

func (t *mqttTransport) Send(ctx context.Context, evt cloudevents.Event) error {
t.mu.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why lock instead or rlock? we want to avoid concurrent send?

Copy link
Member Author

@skeeey skeeey Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the concurrent send is allowed, use rlock instead

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

♻️ Duplicate comments (1)
pkg/cloudevents/generic/options/mqtt/agentoptions.go (1)

155-175: AgentSubscribe’s error return is never used; consider adding validation or simplifying the API

AgentSubscribe now returns (*paho.Subscribe, error), but the current implementation always returns nil error and performs no topic validation:

func AgentSubscribe(o *MQTTOptions, clusterName string) (*paho.Subscribe, error) {
    subscribe := &paho.Subscribe{ ... }
    if len(o.Topics.SourceBroadcast) != 0 {
        // append
    }
    return subscribe, nil
}

Given SourceSubscribe does validate topics and can fail, you may want to either:

  • Add analogous validation of o.Topics.SourceEvents here (e.g. verify structure matches clusterName and expectations), or
  • Drop the error result again to keep the API honest if you don’t plan to validate.

Right now the signature suggests failure is possible but it never happens, which can surprise callers and hides misconfiguration opportunities.

🧹 Nitpick comments (2)
pkg/cloudevents/generic/options/mqtt/logger.go (1)

18-20: LGTM! Constructor pattern improves dependency injection.

The constructor correctly wraps the klog.Logger and explicitly returns the log.Logger interface, ensuring compile-time verification of interface compliance.

Consider adding a godoc comment since this is an exported function:

+// NewPahoErrorLogger creates a new Paho error logger that wraps the provided klog.Logger.
 func NewPahoErrorLogger(logger klog.Logger) log.Logger {
 	return &PahoErrorLogger{logger: logger}
 }
pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (1)

50-90: Broker harness is solid; just note fixed port and sleep-based readiness

The in-process MochiMQTT broker with allowAllHook and a TCP listener works well for exercising the transport. A couple of optional hardening ideas:

  • Using a dynamically allocated port (or at least making testBrokerHost overrideable) would reduce the chance of port conflicts in CI.
  • Replacing the fixed time.Sleep waits with a simple readiness loop (e.g. retrying a TCP dial until success or timeout) would make the tests less timing-sensitive.

Not required, but worth considering if you see intermittent flakes in slower environments.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ba09378 and 500719f.

📒 Files selected for processing (11)
  • pkg/cloudevents/generic/options/builder/optionsbuilder.go (3 hunks)
  • pkg/cloudevents/generic/options/builder/optionsbuilder_test.go (1 hunks)
  • pkg/cloudevents/generic/options/mqtt/agentoptions.go (3 hunks)
  • pkg/cloudevents/generic/options/mqtt/logger.go (2 hunks)
  • pkg/cloudevents/generic/options/mqtt/sourceoptions.go (3 hunks)
  • pkg/cloudevents/generic/options/v2/mqtt/options.go (1 hunks)
  • pkg/cloudevents/generic/options/v2/mqtt/transport.go (1 hunks)
  • pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (1 hunks)
  • test/integration/cloudevents/certrotation_mqtt_test.go (2 hunks)
  • test/integration/cloudevents/cloudevents_resync_test.go (3 hunks)
  • test/integration/cloudevents/cloudevetns_mqtt_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
  • test/integration/cloudevents/cloudevents_resync_test.go
  • pkg/cloudevents/generic/options/builder/optionsbuilder.go
  • pkg/cloudevents/generic/options/v2/mqtt/options.go
  • pkg/cloudevents/generic/options/builder/optionsbuilder_test.go
  • test/integration/cloudevents/certrotation_mqtt_test.go
  • test/integration/cloudevents/cloudevetns_mqtt_test.go
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-11-11T13:27:36.331Z
Learnt from: morvencao
Repo: open-cluster-management-io/sdk-go PR: 162
File: pkg/cloudevents/generic/options/pubsub/options.go:157-174
Timestamp: 2025-11-11T13:27:36.331Z
Learning: For open-cluster-management PubSub transport in pkg/cloudevents/generic/options/pubsub: broadcast topics (SourceBroadcast, AgentBroadcast) and their corresponding subscriptions are always required, not optional. The omitempty tags on types.Topics broadcast fields exist because the struct is shared with MQTT (where broadcasts are optional), but PubSub requires all broadcast channels for resync functionality.

Applied to files:

  • pkg/cloudevents/generic/options/mqtt/sourceoptions.go
  • pkg/cloudevents/generic/options/mqtt/agentoptions.go
  • pkg/cloudevents/generic/options/v2/mqtt/transport.go
  • pkg/cloudevents/generic/options/v2/mqtt/transport_test.go
📚 Learning: 2025-09-16T02:22:20.929Z
Learnt from: skeeey
Repo: open-cluster-management-io/sdk-go PR: 144
File: pkg/cloudevents/generic/options/grpc/protocol/protocol.go:200-213
Timestamp: 2025-09-16T02:22:20.929Z
Learning: In the GRPC CloudEvents protocol implementation, when startEventsReceiver encounters a stream error, it sends the error to reconnectErrorChan. The consumer of this channel handles the error by calling Close() on the protocol, which triggers close(p.closeChan), causing OpenInbound to unblock and call cancel() to properly terminate both the events receiver and heartbeat watcher goroutines.

Applied to files:

  • pkg/cloudevents/generic/options/v2/mqtt/transport.go
📚 Learning: 2025-09-16T06:03:45.232Z
Learnt from: skeeey
Repo: open-cluster-management-io/sdk-go PR: 144
File: pkg/cloudevents/server/grpc/broker.go:169-194
Timestamp: 2025-09-16T06:03:45.232Z
Learning: In the Open Cluster Management gRPC CloudEvents implementation, client reconnections due to heartbeat timeout are considered acceptable behavior by the maintainer skeeey, rather than something that needs optimization to prevent.

Applied to files:

  • pkg/cloudevents/generic/options/v2/mqtt/transport.go
🧬 Code graph analysis (4)
pkg/cloudevents/generic/options/mqtt/sourceoptions.go (2)
pkg/cloudevents/generic/options/mqtt/options.go (1)
  • MQTTOptions (69-78)
pkg/cloudevents/generic/types/types.go (5)
  • ParseCloudEventsType (209-230)
  • ExtensionClusterName (74-74)
  • ResyncRequestAction (41-41)
  • ClusterAll (18-18)
  • Topics (112-148)
pkg/cloudevents/generic/options/mqtt/agentoptions.go (2)
pkg/cloudevents/generic/options/mqtt/options.go (1)
  • MQTTOptions (69-78)
pkg/cloudevents/generic/types/types.go (5)
  • ParseCloudEventsType (209-230)
  • ExtensionOriginalSource (77-77)
  • ResyncRequestAction (41-41)
  • SourceAll (22-22)
  • Topics (112-148)
pkg/cloudevents/generic/options/v2/mqtt/transport.go (3)
pkg/cloudevents/generic/options/mqtt/options.go (1)
  • MQTTOptions (69-78)
pkg/cloudevents/generic/options/mqtt/logger.go (2)
  • NewPahoDebugLogger (22-24)
  • NewPahoErrorLogger (18-20)
pkg/cloudevents/generic/options/options.go (1)
  • ReceiveHandlerFn (13-13)
pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (1)
pkg/cloudevents/generic/options/mqtt/options.go (2)
  • MQTTOptions (69-78)
  • MQTTDialer (31-37)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: unit
  • GitHub Check: integration
  • GitHub Check: verify
🔇 Additional comments (1)
pkg/cloudevents/generic/options/v2/mqtt/transport.go (1)

20-221: Transport lifecycle and concurrency look sound; ErrorChan behavior is well-contained and follows established patterns

The new mqttTransport implementation correctly:

  • Guards mutable state (client, subscribed, channels) with an RWMutex, allowing concurrent Send/Receive while serializing Connect/Subscribe/Close.
  • Avoids the previous send-on-closed-channel race by only closing closeChan and leaving msgChan open, with Receive exiting on ctx.Done or closeChan.
  • Uses a small buffered errorChan with a non-blocking send in OnClientError. This is the correct pattern: paho.golang invokes callbacks from internal goroutines where blocking would stall the client (and can trigger "pingresp not received" disconnection errors). A buffered channel with non-blocking send (and logging on overflow) is the established Go idiom for forwarding library callbacks to application code without risking client stall.

As long as users follow the expected sequence (ConnectSubscribeReceive/Send, and stop using the transport after Close), the lifecycle and concurrency story here is solid.

Signed-off-by: Wei Liu <[email protected]>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (5)
pkg/cloudevents/generic/options/mqtt/agentoptions.go (1)

27-40: Deprecation comment is clear but could be more precise about the path

The deprecation note is good for steering callers to v2. Consider spelling out the actual import path (pkg/cloudevents/generic/options/v2/mqtt.NewAgentOptions) to avoid ambiguity for users skimming docs.

pkg/cloudevents/generic/options/mqtt/logger.go (1)

18-24: Logger constructors are correct and match Paho log.Logger expectations

NewPahoErrorLogger / NewPahoDebugLogger cleanly wrap klog.Logger and return the Paho log.Logger interface, which is now used from the MQTT options and v2 transport. No functional issues here; optional doc comments could be added later if desired.

pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (1)

26-90: Test harness is solid; consider marking setup helper for better diagnostics

The MochiMQTT-based setupTestBroker plus createTestMQTTOptions/createTestTransport give good isolation and make the v2 transport testable without external brokers. To improve failure diagnostics, you could add t.Helper() at the start of setupTestBroker so errors inside it are reported against the calling test rather than the helper itself.

pkg/cloudevents/generic/options/v2/mqtt/options.go (1)

3-11: Import alias mqtt may be confusing in a package mqtt

Using mqtt as the alias for the v1 package in a file whose own package is also mqtt makes it harder to visually distinguish v1 helpers from local symbols. Consider renaming the import alias to something like mqttv1 for clarity; behavior stays the same, but it’s easier to read.

pkg/cloudevents/generic/options/mqtt/sourceoptions.go (1)

59-84: Connect now reuses SourceSubscribe, reducing duplication

Switching Connect to call SourceSubscribe(&o.MQTTOptions, o.sourceID) centralizes subscription construction and validation (including the sourceID/topicSource check and optional AgentBroadcast subscription). The rest of the connection setup is unchanged, so this is a straightforward, low-risk refactor.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 500719f and a780f83.

📒 Files selected for processing (12)
  • pkg/cloudevents/generic/options/builder/optionsbuilder.go (3 hunks)
  • pkg/cloudevents/generic/options/builder/optionsbuilder_test.go (1 hunks)
  • pkg/cloudevents/generic/options/mqtt/agentoptions.go (3 hunks)
  • pkg/cloudevents/generic/options/mqtt/logger.go (2 hunks)
  • pkg/cloudevents/generic/options/mqtt/options.go (2 hunks)
  • pkg/cloudevents/generic/options/mqtt/sourceoptions.go (3 hunks)
  • pkg/cloudevents/generic/options/v2/mqtt/options.go (1 hunks)
  • pkg/cloudevents/generic/options/v2/mqtt/transport.go (1 hunks)
  • pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (1 hunks)
  • test/integration/cloudevents/certrotation_mqtt_test.go (2 hunks)
  • test/integration/cloudevents/cloudevents_resync_test.go (3 hunks)
  • test/integration/cloudevents/cloudevetns_mqtt_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • pkg/cloudevents/generic/options/builder/optionsbuilder.go
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-11-11T13:27:36.331Z
Learnt from: morvencao
Repo: open-cluster-management-io/sdk-go PR: 162
File: pkg/cloudevents/generic/options/pubsub/options.go:157-174
Timestamp: 2025-11-11T13:27:36.331Z
Learning: For open-cluster-management PubSub transport in pkg/cloudevents/generic/options/pubsub: broadcast topics (SourceBroadcast, AgentBroadcast) and their corresponding subscriptions are always required, not optional. The omitempty tags on types.Topics broadcast fields exist because the struct is shared with MQTT (where broadcasts are optional), but PubSub requires all broadcast channels for resync functionality.

Applied to files:

  • pkg/cloudevents/generic/options/builder/optionsbuilder_test.go
  • test/integration/cloudevents/cloudevetns_mqtt_test.go
  • pkg/cloudevents/generic/options/mqtt/agentoptions.go
  • pkg/cloudevents/generic/options/mqtt/sourceoptions.go
  • pkg/cloudevents/generic/options/v2/mqtt/options.go
  • pkg/cloudevents/generic/options/v2/mqtt/transport.go
  • test/integration/cloudevents/cloudevents_resync_test.go
  • pkg/cloudevents/generic/options/mqtt/options.go
  • pkg/cloudevents/generic/options/v2/mqtt/transport_test.go
  • test/integration/cloudevents/certrotation_mqtt_test.go
📚 Learning: 2025-09-16T02:22:20.929Z
Learnt from: skeeey
Repo: open-cluster-management-io/sdk-go PR: 144
File: pkg/cloudevents/generic/options/grpc/protocol/protocol.go:200-213
Timestamp: 2025-09-16T02:22:20.929Z
Learning: In the GRPC CloudEvents protocol implementation, when startEventsReceiver encounters a stream error, it sends the error to reconnectErrorChan. The consumer of this channel handles the error by calling Close() on the protocol, which triggers close(p.closeChan), causing OpenInbound to unblock and call cancel() to properly terminate both the events receiver and heartbeat watcher goroutines.

Applied to files:

  • pkg/cloudevents/generic/options/v2/mqtt/transport.go
📚 Learning: 2025-09-16T06:03:45.232Z
Learnt from: skeeey
Repo: open-cluster-management-io/sdk-go PR: 144
File: pkg/cloudevents/server/grpc/broker.go:169-194
Timestamp: 2025-09-16T06:03:45.232Z
Learning: In the Open Cluster Management gRPC CloudEvents implementation, client reconnections due to heartbeat timeout are considered acceptable behavior by the maintainer skeeey, rather than something that needs optimization to prevent.

Applied to files:

  • pkg/cloudevents/generic/options/v2/mqtt/transport.go
  • pkg/cloudevents/generic/options/v2/mqtt/transport_test.go
🧬 Code graph analysis (9)
test/integration/cloudevents/cloudevetns_mqtt_test.go (3)
pkg/cloudevents/generic/options/options.go (1)
  • CloudEventsSourceOptions (62-73)
pkg/cloudevents/generic/options/mqtt/sourceoptions.go (1)
  • NewSourceOptions (27-39)
pkg/cloudevents/generic/options/v2/mqtt/options.go (1)
  • NewSourceOptions (30-44)
pkg/cloudevents/generic/options/mqtt/agentoptions.go (2)
pkg/cloudevents/generic/options/mqtt/options.go (1)
  • MQTTOptions (70-79)
pkg/cloudevents/generic/types/types.go (5)
  • ParseCloudEventsType (209-230)
  • ExtensionOriginalSource (77-77)
  • ResyncRequestAction (41-41)
  • SourceAll (22-22)
  • Topics (112-148)
pkg/cloudevents/generic/options/mqtt/sourceoptions.go (2)
pkg/cloudevents/generic/options/mqtt/options.go (1)
  • MQTTOptions (70-79)
pkg/cloudevents/generic/types/types.go (5)
  • ParseCloudEventsType (209-230)
  • ExtensionClusterName (74-74)
  • ResyncRequestAction (41-41)
  • ClusterAll (18-18)
  • Topics (112-148)
pkg/cloudevents/generic/options/v2/mqtt/options.go (4)
pkg/cloudevents/generic/options/mqtt/agentoptions.go (3)
  • NewAgentOptions (28-41)
  • AgentPubTopic (119-158)
  • AgentSubscribe (160-181)
pkg/cloudevents/generic/options/mqtt/options.go (1)
  • MQTTOptions (70-79)
pkg/cloudevents/generic/options/options.go (2)
  • CloudEventsAgentOptions (76-89)
  • CloudEventsSourceOptions (62-73)
pkg/cloudevents/generic/options/mqtt/sourceoptions.go (3)
  • NewSourceOptions (27-39)
  • SourcePubTopic (118-148)
  • SourceSubscribe (150-178)
pkg/cloudevents/generic/options/v2/mqtt/transport.go (3)
pkg/cloudevents/generic/options/mqtt/options.go (1)
  • MQTTOptions (70-79)
pkg/cloudevents/generic/options/mqtt/logger.go (2)
  • NewPahoDebugLogger (22-24)
  • NewPahoErrorLogger (18-20)
pkg/cloudevents/generic/options/options.go (1)
  • ReceiveHandlerFn (13-13)
test/integration/cloudevents/cloudevents_resync_test.go (2)
pkg/cloudevents/generic/options/mqtt/sourceoptions.go (1)
  • NewSourceOptions (27-39)
pkg/cloudevents/generic/options/v2/mqtt/options.go (1)
  • NewSourceOptions (30-44)
pkg/cloudevents/generic/options/mqtt/options.go (1)
pkg/cloudevents/generic/options/mqtt/logger.go (2)
  • NewPahoDebugLogger (22-24)
  • NewPahoErrorLogger (18-20)
pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (1)
pkg/cloudevents/generic/options/mqtt/options.go (2)
  • MQTTOptions (70-79)
  • MQTTDialer (32-38)
test/integration/cloudevents/certrotation_mqtt_test.go (2)
pkg/cloudevents/generic/options/mqtt/agentoptions.go (1)
  • NewAgentOptions (28-41)
pkg/cloudevents/generic/options/v2/mqtt/options.go (1)
  • NewAgentOptions (13-28)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: unit
  • GitHub Check: integration
  • GitHub Check: verify
🔇 Additional comments (18)
pkg/cloudevents/generic/options/builder/optionsbuilder_test.go (1)

59-59: LGTM - Test expectation correctly updated for v2 MQTT transport.

The updated expected transport type *mqtt.mqttTransport is correct. The implementation chain (BuildCloudEventsSourceOptions → mqttv2.NewSourceOptions → newTransport) creates a *mqttTransport instance from the pkg/cloudevents/generic/options/v2/mqtt package, which declares package mqtt. Go's reflect output will display this as *mqtt.mqttTransport, matching the test expectation exactly.

pkg/cloudevents/generic/options/mqtt/agentoptions.go (2)

43-59: WithContext refactor cleanly delegates topic selection to AgentPubTopic

The split between explicit context override (getAgentPubTopic) and computed topic (AgentPubTopic) is clean and preserves existing override behavior while centralizing topic logic in one helper. No issues from a transport perspective.


119-181: AgentPubTopic / AgentSubscribe logic looks consistent with topic contract

  • CloudEvents type parsing and error messages are safe and informative.
  • originalsource extension handling is type-safe and fails fast on unexpected types.
  • Resync broadcast path correctly prefers AgentBroadcast with a reasonable fallback to AgentEvents (with logging) when unset.
  • Agent events topics are composed via replaceLast / getSourceFromEventsTopic in a way that matches the documented AgentEvents patterns, including $share variants.
  • AgentSubscribe builds the expected SourceEvents subscription and conditionally adds SourceBroadcast.

I don’t see correctness or concurrency risks in these helpers.

test/integration/cloudevents/cloudevents_resync_test.go (1)

7-8: Migration to mqttv2.NewSourceOptions is consistent and minimal-risk

Switching the resync test to mqttv2.NewSourceOptions while leaving the rest of the flow unchanged keeps the test semantics the same but exercises the new v2 transport. The added time import and existing Eventually timeouts look appropriate for this integration scenario.

Also applies to: 24-25, 56-67

test/integration/cloudevents/cloudevetns_mqtt_test.go (1)

11-22: MQTT pub/sub test correctly switched to v2 source options

GetMQTTSourceOptions now delegates to mqttv2.NewSourceOptions while preserving the existing return signature and config type. This keeps the higher-level test contract unchanged and moves coverage onto the v2 transport.

test/integration/cloudevents/certrotation_mqtt_test.go (1)

13-23: Cert-rotation MQTT test now properly exercises v2 agent options

Using mqttv2.NewAgentOptions with the existing TLS-configured *mqtt.MQTTOptions keeps the certificate rotation behavior intact while shifting the transport under test to the v2 implementation. No behavioral regressions evident from the wiring.

pkg/cloudevents/generic/options/mqtt/options.go (1)

13-14: Using Paho logger constructors improves encapsulation and consistency

Swapping the inline &PahoDebugLogger{} / &PahoErrorLogger{} instantiations for NewPahoDebugLogger / NewPahoErrorLogger makes it easier to evolve logger behavior in one place and keeps v1/v2 transports aligned. The context-derived klog logger is still correctly threaded through.

Also applies to: 238-241

pkg/cloudevents/generic/options/v2/mqtt/transport_test.go (2)

124-412: End-to-end send/subscribe/no-loss tests provide strong coverage

The connect/disconnect, reconnect, pub/sub, and no-message-loss tests exercise the critical paths of the v2 transport under load and concurrency (including atomic counters and sync.Map for received IDs). The timeouts and polling loops look reasonable for CI while still being strict enough to catch regressions in delivery guarantees.


460-531: Lifecycle and error-path tests match the transport’s contract

Tests for subscribe/send without connect, double-subscribe, close-while-receiving, double-close, context cancellation, invalid-event handling (no handler invocation), ErrorChan stability, and receive-without-subscribe all align with the behavior exposed in the v2 transport implementation. These should prevent most accidental contract breaks going forward.

Also applies to: 533-579, 581-618, 620-658

pkg/cloudevents/generic/options/v2/mqtt/transport.go (4)

37-45: Connect lifecycle and error propagation look correct and non-blocking

  • newTransport sets up a dedicated errorChan used by OnClientError with a non-blocking send and fallback logging, which avoids stalling Paho goroutines.
  • Connect dials via the shared MQTTOptions.Dialer, configures loggers, checks connAck.ReasonCode, and (re)initializes closeChan and a buffered msgChan suitable for reconnect cycles.

The locking and initialization order here look safe for the connect/reconnect patterns your tests cover.

Also applies to: 57-69, 73-89


91-160: Send/Subscribe paths align with expected transport contract and avoid common races

  • Send guards on t.client != nil under RLock and routes through the injected getPublishTopic before using WritePubMessage and Publish, which cleanly separates topic derivation from transport mechanics.
  • Subscribe enforces “connect before subscribe” and “no double subscribe”, then registers an AddOnPublishReceived callback that either enqueues on msgChan or exits on closeChan without ever sending on a closed channel.
  • Using getCloseChan() inside the callback plus a buffered msgChan helps avoid deadlocks while still back-pressuring when the receiver can’t keep up.

This matches the expectations set by the v2 tests and avoids the send-on-closed-channel race that existed in an earlier iteration.


162-191: Receive loop correctly respects subscription state and shutdown signals

The blocking Receive:

  • Refuses to start unless subscribed is true.
  • Exits cleanly on ctx.Done(), on closeChan closure, or when msgChan is drained/closed.
  • Logs and skips malformed events instead of failing the loop, then calls the handler with a value copy of the event.

This is a solid contract for callers and matches how the tests exercise cancellation and close behavior.


194-221: Close and getCloseChan provide safe, idempotent shutdown

Close:

  • Is guarded by mu and no-ops if client is nil.
  • Closes closeChan exactly once, clears the subscribed flag, and then calls Disconnect, which tests already tolerate being called multiple times.
  • getCloseChan wraps access to closeChan under RLock, which is used consistently from the callback and Receive loop.

This design avoids double-close panics and cooperates with the receive/subscription logic as intended.

Also applies to: 223-228

pkg/cloudevents/generic/options/v2/mqtt/options.go (1)

13-44: v2 constructors correctly centralize topic/subscribe wiring on shared helpers

Both NewAgentOptions and NewSourceOptions delegate topic resolution and subscription construction to the existing v1 helpers (AgentPubTopic/AgentSubscribe and SourcePubTopic/SourceSubscribe) via newTransport, so MQTT topic semantics stay consistent while the transport internals move to v2. The client IDs and option plumbing also mirror the v1 constructors’ intent, so this looks safe and coherent.

pkg/cloudevents/generic/options/mqtt/sourceoptions.go (4)

26-39: Deprecation marker for v1 NewSourceOptions is clear

The deprecation comment cleanly points callers toward the v2 constructor while keeping the v1 API intact for compatibility. No functional issues here.


41-57: WithContext refactor preserves behavior and adds override hook

WithContext now:

  • Respects an existing topic from getSourcePubTopic(ctx) when present, and
  • Falls back to SourcePubTopic for the computed topic otherwise.

This preserves the original behavior while enabling an explicit topic override path via context, which is a reasonable extension.


118-148: SourcePubTopic helper cleanly encodes resync vs normal publish paths

SourcePubTopic:

  • Parses the CloudEvents type and validates it,
  • Safely type-asserts the clustername extension to string and fails fast on mismatches, and
  • Distinguishes between resync-all (ResyncRequestAction + ClusterAll) using SourceBroadcast and normal spec/status events using SourceEvents.

The explicit error when SourceBroadcast is unset keeps MQTT’s “broadcast is optional” contract while making resync-all misconfiguration obvious. This matches the documented types.Topics semantics and looks correct.

Based on learnings


150-178: SourceSubscribe correctly validates topic/source alignment and handles optional broadcasts

SourceSubscribe:

  • Parses the source segment from AgentEvents and ensures it matches sourceID, returning a clear error if misconfigured, and
  • Always subscribes to AgentEvents while adding AgentBroadcast only when configured, which is appropriate for MQTT where broadcasts are optional.

The tightened error message using topicSource makes debugging easier without changing behavior.

Based on learnings

@qiujian16
Copy link
Member

/approve
/lgtm

@openshift-ci openshift-ci bot added the lgtm label Nov 20, 2025
@openshift-ci
Copy link

openshift-ci bot commented Nov 20, 2025

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: qiujian16, skeeey

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@openshift-merge-bot openshift-merge-bot bot merged commit eb2d8ba into open-cluster-management-io:main Nov 20, 2025
12 checks passed
@skeeey skeeey deleted the mqtt-v2 branch November 20, 2025 07:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants