Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
53 changes: 37 additions & 16 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module open-cluster-management.io/sdk-go
go 1.24.0

require (
cloud.google.com/go/pubsub/v2 v2.3.0
github.com/bwmarrin/snowflake v0.3.0
github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-20250922144431-372892d7c84d
github.com/cloudevents/sdk-go/v2 v2.16.2
Expand All @@ -24,9 +25,10 @@ require (
github.com/prometheus/client_model v0.6.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.11.1
golang.org/x/oauth2 v0.27.0
google.golang.org/grpc v1.68.1
google.golang.org/protobuf v1.36.5
golang.org/x/oauth2 v0.32.0
google.golang.org/api v0.255.0
google.golang.org/grpc v1.76.0
google.golang.org/protobuf v1.36.10
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.33.2
k8s.io/apimachinery v0.33.2
Expand All @@ -41,21 +43,30 @@ require (
)

require (
cel.dev/expr v0.23.1 // indirect
cloud.google.com/go/compute/metadata v0.5.0 // indirect
cel.dev/expr v0.24.0 // indirect
cloud.google.com/go v0.121.6 // indirect
cloud.google.com/go/auth v0.17.0 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect
cloud.google.com/go/compute/metadata v0.9.0 // indirect
cloud.google.com/go/iam v1.5.2 // indirect
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/gnostic-models v0.6.9 // indirect
github.com/google/s2a-go v0.1.9 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect
github.com/googleapis/gax-go/v2 v2.15.0 // indirect
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
Expand All @@ -71,20 +82,30 @@ require (
github.com/prometheus/common v0.62.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/rs/xid v1.4.0 // indirect
github.com/stoewer/go-strcase v1.3.0 // indirect
github.com/stoewer/go-strcase v1.3.1 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.opentelemetry.io/otel v1.33.0 // indirect
go.opentelemetry.io/otel/trace v1.33.0 // indirect
go.einride.tech/aip v0.73.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.61.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 // indirect
go.opentelemetry.io/otel v1.37.0 // indirect
go.opentelemetry.io/otel/metric v1.37.0 // indirect
go.opentelemetry.io/otel/sdk v1.37.0 // indirect
go.opentelemetry.io/otel/trace v1.37.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.43.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/net v0.43.0 // indirect
golang.org/x/sys v0.35.0 // indirect
golang.org/x/term v0.34.0 // indirect
golang.org/x/text v0.28.0 // indirect
golang.org/x/time v0.12.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241209162323-e6fa225c2576 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241209162323-e6fa225c2576 // indirect
golang.org/x/net v0.46.0 // indirect
golang.org/x/sync v0.17.0 // indirect
golang.org/x/sys v0.37.0 // indirect
golang.org/x/term v0.36.0 // indirect
golang.org/x/text v0.30.0 // indirect
golang.org/x/time v0.14.0 // indirect
google.golang.org/genproto v0.0.0-20250603155806-513f23925822 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250818200422-3122310a409c // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
Expand Down
140 changes: 106 additions & 34 deletions go.sum

Large diffs are not rendered by default.

42 changes: 42 additions & 0 deletions pkg/cloudevents/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ Currently, the CloudEvents options supports the following protocols/drivers:

- [MQTT Protocol/Driver](./generic/options/mqtt)
- [gRPC Protocol/Driver](./generic/options/grpc)
- [Pub/Sub Protocol/Driver](./generic/options/pubsub)

To create CloudEvents source/agent options for these supported protocols/drivers, developers need to provide configuration specific to the protocol/driver. The configuration format resembles the kubeconfig for the Kubernetes client-go but has a different schema.

Expand Down Expand Up @@ -147,6 +148,47 @@ clientKeyFile: /certs/client.key

For detailed configuration options for the gRPC driver, refer to the [gRPC driver options package](https://github.com/open-cluster-management-io/sdk-go/blob/00a94671ced1c17d2ca2b5fad2f4baab282a7d3c/pkg/cloudevents/generic/options/grpc/options.go#L30-L40).

### Pub/Sub Protocol/Driver

Here's an example of a YAML configuration for the Google Cloud Pub/Sub protocol for a source:

```yaml
projectID: my-project
endpoint: https://pubsub.us-east1.googleapis.com # optional, leave empty for global, or set a regional URL.
credentialsFile: /path/to/credentials.json
topics:
sourceEvents: projects/my-project/topics/sourceevents
sourceBroadcast: projects/my-project/topics/sourcebroadcast
subscriptions:
agentEvents: projects/my-project/subscriptions/agentevents-source1
agentBroadcast: projects/my-project/subscriptions/agentbroadcast-source1
```

And here's an example configuration for an agent:

```yaml
projectID: my-project
endpoint: https://pubsub.us-east1.googleapis.com # optional, leave empty for global, or set a regional URL.
credentialsFile: /path/to/credentials.json
topics:
agentEvents: projects/my-project/topics/agentevents
agentBroadcast: projects/my-project/topics/agentbroadcast
subscriptions:
sourceEvents: projects/my-project/subscriptions/sourceevents-cluster1
sourceBroadcast: projects/my-project/subscriptions/sourcebroadcast-cluster1
```

**Note**: The Pub/Sub protocol uses separate topics and subscriptions for different event types:
- **Source** uses `sourceEvents`/`sourceBroadcast` topics to publish events and `agentEvents`/`agentBroadcast` subscriptions to receive events from agents
- **Agent** uses `agentEvents`/`agentBroadcast` topics to publish events and `sourceEvents`/`sourceBroadcast` subscriptions to receive events from sources
- Both `sourceBroadcast` and `agentBroadcast` channels are used for resync requests
- **All topics and subscriptions must be created before running the source and agent**
- The subscription for `agentEvents` is a filtered subscription with filter `attributes."ce-clustername"="<clustername>"`
- The subscription for `sourceEvents` is a filtered subscription with filter `attributes."ce-originalsource"="<sourceID>"`
- The subscriptions for `agentBroadcast` and `sourceBroadcast` are subscriptions without filters, allowing broadcast messages to reach all subscribers for resync events.

For detailed configuration options for the Pub/Sub driver, refer to the [Pub/Sub driver options package](./generic/options/pubsub).

## Work Clients

### Building a ManifestWorkSourceClient on the hub cluster with SourceLocalWatcherStore
Expand Down
2 changes: 2 additions & 0 deletions pkg/cloudevents/clients/options/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type GenericClientOptions[T generic.ResourceObject] struct {
//
// GRPCOptions (*grpc.GRPCOptions): builds a generic cloudevents client with GRPC
//
// PubSubOptions (*pubsub.PubSubOptions): builds a generic cloudevents client with PubSub
//
// - codec, the codec for resource
//
// - clientID, the client ID for generic cloudevents client.
Expand Down
5 changes: 3 additions & 2 deletions pkg/cloudevents/constants/constants.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package constants

const (
ConfigTypeMQTT = "mqtt"
ConfigTypeGRPC = "grpc"
ConfigTypeMQTT = "mqtt"
ConfigTypeGRPC = "grpc"
ConfigTypePubSub = "pubsub"
)

// GRPCSubscriptionIDKey is the key for the gRPC subscription ID.
Expand Down
5 changes: 4 additions & 1 deletion pkg/cloudevents/doc/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ The [`CloudEventsOptions`](../generic/options/options.go) interface defines an o
For each event protocol, it needs to implement the `CloudEventsSourceOptions` and `CloudEventsAgentOptions`, the
`CloudEventsSourceOptions` will be used for building cloudevents source client and the `CloudEventsAgentOptions` will be used for building cloudevents agent client.

Currently, the MQTT and gRPC are implemented.
Currently, the MQTT, gRPC and PubSub are implemented.

```mermaid
classDiagram
Expand Down Expand Up @@ -44,6 +44,9 @@ CloudEventsAgentOptions <|.. mqttAgentOptions

CloudEventsSourceOptions <|.. grpcSourceOptions
CloudEventsAgentOptions <|.. grpcAgentOptions

CloudEventsSourceOptions <|.. pubsubSourceOptions
CloudEventsAgentOptions <|.. pubsubAgentOptions
```

### CloudEventsClient Interface
Expand Down
13 changes: 13 additions & 0 deletions pkg/cloudevents/generic/options/builder/optionsbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt"
grpcv2 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/v2/grpc"
mqttv2 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/v2/mqtt"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/v2/pubsub"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
)

Expand All @@ -23,6 +24,7 @@ type ConfigLoader struct {
// Available configuration types:
// - mqtt
// - grpc
// - pubsub
func NewConfigLoader(configType, configPath string) *ConfigLoader {
return &ConfigLoader{
configType: configType,
Expand All @@ -47,6 +49,13 @@ func (l *ConfigLoader) LoadConfig() (string, any, error) {
}

return grpcOptions.Dialer.URL, grpcOptions, nil
case constants.ConfigTypePubSub:
pubsubOptions, err := pubsub.BuildPubSubOptionsFromFlags(l.configPath)
if err != nil {
return "", nil, err
}

return "", pubsubOptions, nil
}

return "", nil, fmt.Errorf("unsupported config type %s", l.configType)
Expand All @@ -60,6 +69,8 @@ func BuildCloudEventsSourceOptions(config any,
return mqttv2.NewSourceOptions(config, clientId, sourceId), nil
case *grpc.GRPCOptions:
return grpcv2.NewSourceOptions(config, sourceId, dataType), nil
case *pubsub.PubSubOptions:
return pubsub.NewSourceOptions(config, sourceId), nil
default:
return nil, fmt.Errorf("unsupported client configuration type %T", config)
}
Expand All @@ -73,6 +84,8 @@ func BuildCloudEventsAgentOptions(config any,
return mqttv2.NewAgentOptions(config, clusterName, clientId), nil
case *grpc.GRPCOptions:
return grpcv2.NewAgentOptions(config, clusterName, clientId, dataType), nil
case *pubsub.PubSubOptions:
return pubsub.NewAgentOptions(config, clusterName, clientId), nil
default:
return nil, fmt.Errorf("unsupported client configuration type %T", config)
}
Expand Down
Loading