From e4a0dfd7053e03f46ac1474326a9a04b77a96649 Mon Sep 17 00:00:00 2001 From: Ashish Srivastava Date: Thu, 30 Jan 2025 09:25:20 +0530 Subject: [PATCH] grpcsync : Remove OnceFunc (#8049) --- balancer_wrapper.go | 2 +- internal/grpcsync/oncefunc.go | 32 ----------- internal/grpcsync/oncefunc_test.go | 53 ------------------- internal/xds/bootstrap/tlscreds/bundle.go | 4 +- orca/producer.go | 3 +- server.go | 4 +- xds/internal/resolver/xds_resolver_test.go | 4 +- xds/internal/xdsclient/authority.go | 3 +- xds/internal/xdsclient/clientimpl.go | 2 +- xds/internal/xdsclient/pool.go | 7 ++- .../xdsclient/transport/lrs/lrs_stream.go | 3 +- 11 files changed, 15 insertions(+), 102 deletions(-) delete mode 100644 internal/grpcsync/oncefunc.go delete mode 100644 internal/grpcsync/oncefunc_test.go diff --git a/balancer_wrapper.go b/balancer_wrapper.go index c2688376ae74..d08bf82fd603 100644 --- a/balancer_wrapper.go +++ b/balancer_wrapper.go @@ -415,7 +415,7 @@ func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) ( } acbw.producersMu.Unlock() } - return pData.producer, grpcsync.OnceFunc(unref) + return pData.producer, sync.OnceFunc(unref) } func (acbw *acBalancerWrapper) closeProducers() { diff --git a/internal/grpcsync/oncefunc.go b/internal/grpcsync/oncefunc.go deleted file mode 100644 index 6635f7bca96d..000000000000 --- a/internal/grpcsync/oncefunc.go +++ /dev/null @@ -1,32 +0,0 @@ -/* - * - * Copyright 2022 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package grpcsync - -import ( - "sync" -) - -// OnceFunc returns a function wrapping f which ensures f is only executed -// once even if the returned function is executed multiple times. -func OnceFunc(f func()) func() { - var once sync.Once - return func() { - once.Do(f) - } -} diff --git a/internal/grpcsync/oncefunc_test.go b/internal/grpcsync/oncefunc_test.go deleted file mode 100644 index 2b0db8d3eaa3..000000000000 --- a/internal/grpcsync/oncefunc_test.go +++ /dev/null @@ -1,53 +0,0 @@ -/* - * - * Copyright 2022 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package grpcsync - -import ( - "sync" - "sync/atomic" - "testing" - "time" -) - -// TestOnceFunc tests that a OnceFunc is executed only once even with multiple -// simultaneous callers of it. -func (s) TestOnceFunc(t *testing.T) { - var v int32 - inc := OnceFunc(func() { atomic.AddInt32(&v, 1) }) - - const numWorkers = 100 - var wg sync.WaitGroup // Blocks until all workers have called inc. - wg.Add(numWorkers) - - block := NewEvent() // Signal to worker goroutines to call inc - - for i := 0; i < numWorkers; i++ { - go func() { - <-block.Done() // Wait for a signal. - inc() // Call the OnceFunc. - wg.Done() - }() - } - time.Sleep(time.Millisecond) // Allow goroutines to get to the block. - block.Fire() // Unblock them. - wg.Wait() // Wait for them to complete. - if v != 1 { - t.Fatalf("OnceFunc() called %v times; want 1", v) - } -} diff --git a/internal/xds/bootstrap/tlscreds/bundle.go b/internal/xds/bootstrap/tlscreds/bundle.go index 02da3dbf3496..ed90720b581f 100644 --- a/internal/xds/bootstrap/tlscreds/bundle.go +++ b/internal/xds/bootstrap/tlscreds/bundle.go @@ -27,11 +27,11 @@ import ( "errors" "fmt" "net" + "sync" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/tls/certprovider" "google.golang.org/grpc/credentials/tls/certprovider/pemfile" - "google.golang.org/grpc/internal/grpcsync" ) // bundle is an implementation of credentials.Bundle which implements mTLS @@ -81,7 +81,7 @@ func NewBundle(jd json.RawMessage) (credentials.Bundle, func(), error) { } return &bundle{ transportCredentials: &reloadingCreds{provider: provider}, - }, grpcsync.OnceFunc(func() { provider.Close() }), nil + }, sync.OnceFunc(func() { provider.Close() }), nil } func (t *bundle) TransportCredentials() credentials.TransportCredentials { diff --git a/orca/producer.go b/orca/producer.go index 4d370310a0dd..87fd374e1c3f 100644 --- a/orca/producer.go +++ b/orca/producer.go @@ -25,7 +25,6 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" "google.golang.org/grpc/internal/backoff" - "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/orca/internal" "google.golang.org/grpc/status" @@ -85,7 +84,7 @@ func RegisterOOBListener(sc balancer.SubConn, l OOBListener, opts OOBListenerOpt // If stop is called multiple times, prevent it from having any effect on // subsequent calls. - return grpcsync.OnceFunc(func() { + return sync.OnceFunc(func() { p.unregisterListener(l, opts.ReportInterval) closeFn() }) diff --git a/server.go b/server.go index c025de56fe38..1e7701975e08 100644 --- a/server.go +++ b/server.go @@ -643,7 +643,7 @@ func (s *Server) serverWorker() { // connections to reduce the time spent overall on runtime.morestack. func (s *Server) initServerWorkers() { s.serverWorkerChannel = make(chan func()) - s.serverWorkerChannelClose = grpcsync.OnceFunc(func() { + s.serverWorkerChannelClose = sync.OnceFunc(func() { close(s.serverWorkerChannel) }) for i := uint32(0); i < s.opts.numServerWorkers; i++ { @@ -1930,7 +1930,7 @@ func (s *Server) stop(graceful bool) { s.conns = nil if s.opts.numServerWorkers > 0 { - // Closing the channel (only once, via grpcsync.OnceFunc) after all the + // Closing the channel (only once, via sync.OnceFunc) after all the // connections have been closed above ensures that there are no // goroutines executing the callback passed to st.HandleStreams (where // the channel is written to). diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index 9bf5d956af8e..88b944d49369 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -23,6 +23,7 @@ import ( "encoding/json" "fmt" "strings" + "sync" "testing" "time" @@ -32,7 +33,6 @@ import ( "github.com/google/uuid" "google.golang.org/grpc/codes" "google.golang.org/grpc/internal" - "google.golang.org/grpc/internal/grpcsync" iresolver "google.golang.org/grpc/internal/resolver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" @@ -271,7 +271,7 @@ func (s) TestResolverCloseClosesXDSClient(t *testing.T) { Name: t.Name(), WatchExpiryTimeout: defaultTestTimeout, }) - return c, grpcsync.OnceFunc(func() { + return c, sync.OnceFunc(func() { close(closeCh) cancel() }), err diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index f81685a45e69..67b3d7693808 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -20,6 +20,7 @@ package xdsclient import ( "context" "fmt" + "sync" "sync/atomic" "google.golang.org/grpc/grpclog" @@ -674,7 +675,7 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w } func (a *authority) unwatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) func() { - return grpcsync.OnceFunc(func() { + return sync.OnceFunc(func() { done := make(chan struct{}) a.xdsClientSerializer.ScheduleOr(func(context.Context) { defer close(done) diff --git a/xds/internal/xdsclient/clientimpl.go b/xds/internal/xdsclient/clientimpl.go index c1f77cd065c8..33c9befaafcd 100644 --- a/xds/internal/xdsclient/clientimpl.go +++ b/xds/internal/xdsclient/clientimpl.go @@ -342,7 +342,7 @@ func (c *clientImpl) getOrCreateChannel(serverConfig *bootstrap.ServerConfig, in // reference to the xdsChannel. This returned function is idempotent, meaning // it can be called multiple times without any additional effect. func (c *clientImpl) releaseChannel(serverConfig *bootstrap.ServerConfig, state *channelState, deInitLocked func(*channelState)) func() { - return grpcsync.OnceFunc(func() { + return sync.OnceFunc(func() { c.channelsMu.Lock() if c.logger.V(2) { diff --git a/xds/internal/xdsclient/pool.go b/xds/internal/xdsclient/pool.go index 37dd36364a83..f73bffb6aa51 100644 --- a/xds/internal/xdsclient/pool.go +++ b/xds/internal/xdsclient/pool.go @@ -25,7 +25,6 @@ import ( v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3" "google.golang.org/grpc/internal/backoff" - "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/xds/bootstrap" ) @@ -131,7 +130,7 @@ func (p *Pool) GetClientForTesting(name string) (XDSClient, func(), error) { return nil, nil, fmt.Errorf("xds:: xDS client with name %q not found", name) } c.incrRef() - return c, grpcsync.OnceFunc(func() { p.clientRefCountedClose(name) }), nil + return c, sync.OnceFunc(func() { p.clientRefCountedClose(name) }), nil } // SetFallbackBootstrapConfig is used to specify a bootstrap configuration @@ -193,7 +192,7 @@ func (p *Pool) newRefCounted(name string, watchExpiryTimeout time.Duration, stre if c := p.clients[name]; c != nil { c.incrRef() - return c, grpcsync.OnceFunc(func() { p.clientRefCountedClose(name) }), nil + return c, sync.OnceFunc(func() { p.clientRefCountedClose(name) }), nil } c, err := newClientImpl(p.config, watchExpiryTimeout, streamBackoff) @@ -208,5 +207,5 @@ func (p *Pool) newRefCounted(name string, watchExpiryTimeout time.Duration, stre xdsClientImplCreateHook(name) logger.Infof("xDS node ID: %s", p.config.Node().GetId()) - return client, grpcsync.OnceFunc(func() { p.clientRefCountedClose(name) }), nil + return client, sync.OnceFunc(func() { p.clientRefCountedClose(name) }), nil } diff --git a/xds/internal/xdsclient/transport/lrs/lrs_stream.go b/xds/internal/xdsclient/transport/lrs/lrs_stream.go index bab92bcb73cf..e33ac694df06 100644 --- a/xds/internal/xdsclient/transport/lrs/lrs_stream.go +++ b/xds/internal/xdsclient/transport/lrs/lrs_stream.go @@ -29,7 +29,6 @@ import ( "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal/backoff" igrpclog "google.golang.org/grpc/internal/grpclog" - "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/xds/internal" "google.golang.org/grpc/xds/internal/xdsclient/load" @@ -107,7 +106,7 @@ func (lrs *StreamImpl) ReportLoad() (*load.Store, func()) { lrs.mu.Lock() defer lrs.mu.Unlock() - cleanup := grpcsync.OnceFunc(func() { + cleanup := sync.OnceFunc(func() { lrs.mu.Lock() defer lrs.mu.Unlock()