Skip to content

Commit d63f3bc

Browse files
committed
pubsub receive error won't be pushed to transport error channel.
Signed-off-by: morvencao <[email protected]>
1 parent f5f6a0d commit d63f3bc

File tree

2 files changed

+31
-29
lines changed

2 files changed

+31
-29
lines changed

pkg/cloudevents/generic/options/builder/optionsbuilder_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func TestBuildCloudEventsAgentOptions(t *testing.T) {
144144
Timeout: 60 * time.Second,
145145
},
146146
},
147-
expectedTransportType: "*mqtt.mqttAgentTransport",
147+
expectedTransportType: "*mqtt.mqttTransport",
148148
},
149149
{
150150
name: "grpc config",

pkg/cloudevents/generic/options/v2/pubsub/transport.go

Lines changed: 30 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,8 @@ package pubsub
22

33
import (
44
"context"
5-
"errors"
65
"fmt"
76
"strings"
8-
"sync/atomic"
97

108
"cloud.google.com/go/pubsub/v2"
119
cloudevents "github.com/cloudevents/sdk-go/v2"
@@ -29,6 +27,7 @@ type pubsubTransport struct {
2927
// cluster name, required for agent
3028
clusterName string
3129
client *pubsub.Client
30+
grpcConn *grpc.ClientConn
3231
// Publisher for spec/status updates
3332
publisher *pubsub.Publisher
3433
// Publisher for resync broadcasts
@@ -54,6 +53,7 @@ func (o *pubsubTransport) Connect(ctx context.Context) error {
5453
if err != nil {
5554
return err
5655
}
56+
o.grpcConn = pubsubConn
5757
clientOptions = append(clientOptions, option.WithGRPCConn(pubsubConn))
5858
}
5959
}
@@ -133,28 +133,27 @@ func (o *pubsubTransport) Send(ctx context.Context, evt cloudevents.Event) error
133133
}
134134

135135
func (o *pubsubTransport) Receive(ctx context.Context, fn options.ReceiveHandlerFn) error {
136-
// Use an atomic flag to ensure only one of the subscriber or resync subscriber sends an error to errorChan.
137-
// This prevents triggering client reconnect and receiver restart twice if both subscribers fail.
138-
// 0 = no error sent, 1 = error sent
139-
var errorSent atomic.Int32
136+
errChan := make(chan error)
140137

141138
// start the subscriber for spec/status updates
142-
go o.receiveFromSubscriber(ctx, o.subscriber, fn, &errorSent)
139+
go o.receiveFromSubscriber(ctx, o.subscriber, fn, errChan)
143140

144-
// start the resync subscriber for broadcast messages
145-
go o.receiveFromSubscriber(ctx, o.resyncSubscriber, fn, &errorSent)
141+
// start the resync subscriber for resync events
142+
go o.receiveFromSubscriber(ctx, o.resyncSubscriber, fn, errChan)
146143

147-
// wait for context cancellation or timeout
148-
<-ctx.Done()
149-
return ctx.Err()
144+
// Return the error from either subscriber (including context cancellation).
145+
// We return errors directly instead of writing to the transport errorChan because
146+
// Pub/Sub client has internal retry logic for transient errors. Only non-retryable
147+
// errors or context cancellation will be returned here.
148+
return <-errChan
150149
}
151150

152-
// receiveFromSubscriber handles receiving messages from a subscriber and error handling.
151+
// receiveFromSubscriber handles receiving messages from a subscriber.
153152
func (o *pubsubTransport) receiveFromSubscriber(
154153
ctx context.Context,
155154
subscriber *pubsub.Subscriber,
156155
fn options.ReceiveHandlerFn,
157-
errorSent *atomic.Int32,
156+
errChan chan<- error,
158157
) {
159158
logger := klog.FromContext(ctx)
160159
err := subscriber.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
@@ -168,26 +167,29 @@ func (o *pubsubTransport) receiveFromSubscriber(
168167
// send ACK after all receiver handlers complete.
169168
msg.Ack()
170169
})
170+
171171
// The Pub/Sub client's Receive call automatically retries on retryable errors.
172172
// See: https://github.com/googleapis/google-cloud-go/blob/b8e70aa0056a3e126bc36cb7bf242d987f32c0bd/pubsub/service.go#L51
173-
// If Receive call returns an error, it's usually due to a non-retryable issue or service outage, eg, subscription not found.
174-
// In such cases, we send the error to the error channel to signal the source/agent client to reconnect later.
175-
if err != nil {
176-
// for normal shutdown, won't send error to error channel.
177-
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
178-
return
179-
}
180-
// use CompareAndSwap to atomically check and set the flag. Only the first subscriber wins the race.
181-
if errorSent.CompareAndSwap(0, 1) {
182-
o.errorChan <- fmt.Errorf("subscriber is interrupted by error: %w", err)
183-
} else {
184-
logger.Error(err, "subscriber is interrupted by error but error already sent, skipping...")
185-
}
173+
// If Receive returns an error, it's usually due to a non-retryable issue (e.g., subscription not found),
174+
// service outage, or context cancellation.
175+
select {
176+
case errChan <- err:
177+
default:
186178
}
187179
}
188180

189181
func (o *pubsubTransport) Close(ctx context.Context) error {
190-
return o.client.Close()
182+
var err error
183+
if o.client != nil {
184+
err = o.client.Close()
185+
}
186+
187+
if o.grpcConn != nil {
188+
_ = o.grpcConn.Close()
189+
o.grpcConn = nil
190+
}
191+
192+
return err
191193
}
192194

193195
func (o *pubsubTransport) ErrorChan() <-chan error {

0 commit comments

Comments
 (0)