Skip to content

Commit a51c902

Browse files
committed
roachprod: prepare centralized - use centralized from roachprod lib
This patch starts using the roachprod centralized client from the roachprod library when properly configured via environment variables. Epic: none Release note: None
1 parent e195dd7 commit a51c902

File tree

9 files changed

+231
-49
lines changed

9 files changed

+231
-49
lines changed

pkg/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1812,6 +1812,7 @@ GO_TARGETS = [
18121812
"//pkg/roachpb:roachpb_test",
18131813
"//pkg/roachprod/blobfixture:blobfixture",
18141814
"//pkg/roachprod/blobfixture:blobfixture_test",
1815+
"//pkg/roachprod/centralizedapi:centralizedapi",
18151816
"//pkg/roachprod/cloud/types:types",
18161817
"//pkg/roachprod/cloud:cloud",
18171818
"//pkg/roachprod/cloud:cloud_test",

pkg/cmd/roachprod/cli/commands.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -439,9 +439,15 @@ hosts file.
439439
}
440440
} else {
441441
machineType := func(clusterVMs vm.List) string {
442+
if len(clusterVMs) == 0 {
443+
return ""
444+
}
442445
return clusterVMs[0].MachineType
443446
}
444447
cpuArch := func(clusterVMs vm.List) string {
448+
if len(clusterVMs) == 0 {
449+
return ""
450+
}
445451
// Display CPU architecture and family.
446452
if clusterVMs[0].CPUArch == "" {
447453
// N.B. Either a local cluster or unsupported cloud provider.
@@ -501,6 +507,9 @@ hosts file.
501507
}
502508
timeRemaining := c.LifetimeRemaining().Round(time.Second)
503509
formatTTL := func(ttl time.Duration) string {
510+
if len(c.VMs) == 0 {
511+
return color.HiRedString("")
512+
}
504513
if c.VMs[0].Preemptible {
505514
return color.HiMagentaString(ttl.String())
506515
} else {
@@ -528,7 +537,12 @@ hosts file.
528537
color.HiGreenString(""))
529538
}
530539
} else {
531-
fmt.Fprintf(tw, "\t(-)")
540+
fmt.Fprintf(tw, "\t%s\t%s\t%s\t%s\t%s\t",
541+
color.HiGreenString(""),
542+
color.HiGreenString(""),
543+
color.HiWhiteString(""),
544+
color.HiWhiteString(""),
545+
color.HiGreenString(""))
532546
}
533547
fmt.Fprintf(tw, "\n")
534548
}

pkg/roachprod/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ go_library(
1616
"//pkg/build",
1717
"//pkg/cli/exit",
1818
"//pkg/cmd/roachprod/grafana",
19+
"//pkg/roachprod/centralizedapi",
1920
"//pkg/roachprod/cloud",
2021
"//pkg/roachprod/cloud/types",
2122
"//pkg/roachprod/config",
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library")
2+
3+
go_library(
4+
name = "centralizedapi",
5+
srcs = ["centralizedapi.go"],
6+
importpath = "github.com/cockroachdb/cockroach/pkg/roachprod/centralizedapi",
7+
visibility = ["//visibility:public"],
8+
deps = ["//pkg/cmd/roachprod-centralized/client"],
9+
)
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package centralizedapi
7+
8+
import (
9+
"sync"
10+
11+
"github.com/cockroachdb/cockroach/pkg/cmd/roachprod-centralized/client"
12+
)
13+
14+
var (
15+
once sync.Once
16+
centralizedapi *client.Client
17+
)
18+
19+
// GetCentralizedAPIClient returns a singleton instance of the centralized API client.
20+
func GetCentralizedAPIClient() *client.Client {
21+
once.Do(func() {
22+
client, err := client.NewClientWithConfig(client.LoadConfigFromEnv())
23+
if err != nil {
24+
panic(err)
25+
}
26+
centralizedapi = client
27+
})
28+
return centralizedapi
29+
}

pkg/roachprod/cloud/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ go_library(
1111
visibility = ["//visibility:public"],
1212
deps = [
1313
"//pkg/cloud/amazon",
14+
"//pkg/cmd/roachprod-centralized/client",
15+
"//pkg/roachprod/centralizedapi",
1416
"//pkg/roachprod/cloud/types",
1517
"//pkg/roachprod/config",
1618
"//pkg/roachprod/logger",

pkg/roachprod/cloud/cluster_cloud.go

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
"sort"
1212
"time"
1313

14+
roachprodcentralized "github.com/cockroachdb/cockroach/pkg/cmd/roachprod-centralized/client"
15+
"github.com/cockroachdb/cockroach/pkg/roachprod/centralizedapi"
1416
cloudcluster "github.com/cockroachdb/cockroach/pkg/roachprod/cloud/types"
1517
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
1618
"github.com/cockroachdb/cockroach/pkg/roachprod/ui"
@@ -140,6 +142,22 @@ func (c *Cloud) ListCloud(l *logger.Logger, options vm.ListOptions) error {
140142

141143
providers := append(c.providers, c.localProviders...)
142144

145+
apiClient := centralizedapi.GetCentralizedAPIClient()
146+
if apiClient.IsEnabled() {
147+
// Use the centralized roachprod service to list clusters.
148+
response, err := apiClient.ListClusters(context.Background(), l, roachprodcentralized.ListClustersOptions{})
149+
if err != nil {
150+
l.Errorf("Error listing clusters using the centralized API: %s", err)
151+
} else {
152+
c.Clusters = response.Clusters
153+
c.BadInstances = response.BadInstances
154+
155+
// Remote providers are already listed by the centralized service,
156+
// so only use local providers in the local listing.
157+
providers = c.localProviders
158+
}
159+
}
160+
143161
// List all VMs across all providers in parallel.
144162
providerVMs := make(map[string]vm.List)
145163
var g errgroup.Group
@@ -276,6 +294,14 @@ func CreateCluster(l *logger.Logger, opts []*ClusterCreateOpts) (*cloudcluster.C
276294
Lifetime: opts[0].CreateOpts.Lifetime,
277295
}
278296

297+
apiClient := centralizedapi.GetCentralizedAPIClient()
298+
if apiClient.IsEnabled() {
299+
// Register the new cluster with the centralized roachprod service.
300+
if err := apiClient.RegisterCluster(context.Background(), l, c); err != nil {
301+
return nil, err
302+
}
303+
}
304+
279305
// Keep track of the total number of nodes created, as we append all cluster names
280306
// with the node count.
281307
var nodesCreated int
@@ -338,6 +364,14 @@ func CreateCluster(l *logger.Logger, opts []*ClusterCreateOpts) (*cloudcluster.C
338364
// `roachprod.Start` expects nodes/vms to be in sorted order
339365
sort.Sort(c.VMs)
340366

367+
if apiClient.IsEnabled() {
368+
// Use the centralized roachprod service to register the final state
369+
// of the cluster.
370+
if err := apiClient.RegisterClusterUpdate(context.Background(), l, c); err != nil {
371+
return nil, err
372+
}
373+
}
374+
341375
return c, nil
342376
}
343377

@@ -378,6 +412,14 @@ func GrowCluster(l *logger.Logger, c *cloudcluster.Cluster, numNodes int) error
378412
// `roachprod.Start` expects nodes/vms to be in sorted order
379413
sort.Sort(c.VMs)
380414

415+
apiClient := centralizedapi.GetCentralizedAPIClient()
416+
if apiClient.IsEnabled() {
417+
// Use the centralized roachprod service to create the cluster.
418+
if err := apiClient.RegisterClusterUpdate(context.Background(), l, c); err != nil {
419+
return err
420+
}
421+
}
422+
381423
return nil
382424
}
383425

@@ -409,6 +451,15 @@ func ShrinkCluster(l *logger.Logger, c *cloudcluster.Cluster, numNodes int) erro
409451

410452
// Update the list of VMs in the cluster.
411453
c.VMs = c.VMs[:len(c.VMs)-numNodes]
454+
455+
apiClient := centralizedapi.GetCentralizedAPIClient()
456+
if apiClient.IsEnabled() {
457+
// Use the centralized roachprod service to create the cluster.
458+
if err := apiClient.RegisterClusterUpdate(context.Background(), l, c); err != nil {
459+
return err
460+
}
461+
}
462+
412463
return nil
413464
}
414465

@@ -427,7 +478,10 @@ func DestroyCluster(l *logger.Logger, c *cloudcluster.Cluster) error {
427478
publicRecords = append(publicRecords, v.PublicDNS)
428479
}
429480
dnsErr := vm.FanOutDNS(c.VMs, func(p vm.DNSProvider, vms vm.List) error {
430-
publicRecordsErr := p.DeletePublicRecordsByName(context.Background(), publicRecords...)
481+
var publicRecordsErr error
482+
if !centralizedapi.GetCentralizedAPIClient().IsEnabled() {
483+
publicRecordsErr = p.DeletePublicRecordsByName(context.Background(), publicRecords...)
484+
}
431485
srvRecordsErr := p.DeleteSRVRecordsBySubdomain(context.Background(), c.Name)
432486
return errors.CombineErrors(publicRecordsErr, srvRecordsErr)
433487
})
@@ -444,6 +498,15 @@ func DestroyCluster(l *logger.Logger, c *cloudcluster.Cluster) error {
444498
return p.Delete(l, vms)
445499
})
446500
stopSpinner()
501+
502+
if clusterErr == nil {
503+
apiClient := centralizedapi.GetCentralizedAPIClient()
504+
if apiClient.IsEnabled() {
505+
if err := apiClient.RegisterClusterDelete(context.Background(), l, c.Name); err != nil {
506+
l.Printf("WARNING: failed to delete cluster %s from centralized service: %s", c.Name, err)
507+
}
508+
}
509+
}
447510
return errors.CombineErrors(dnsErr, clusterErr)
448511
}
449512

@@ -458,5 +521,13 @@ func ExtendCluster(l *logger.Logger, c *cloudcluster.Cluster, extension time.Dur
458521
return err
459522
}
460523
c.Lifetime = newLifetime
524+
525+
apiClient := centralizedapi.GetCentralizedAPIClient()
526+
if apiClient.IsEnabled() {
527+
// Use the centralized roachprod service to create the cluster.
528+
if err := apiClient.RegisterClusterUpdate(context.Background(), l, c); err != nil {
529+
return err
530+
}
531+
}
461532
return nil
462533
}

pkg/roachprod/clusters_cache.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,11 @@ func LoadClusters() error {
159159
}
160160

161161
if len(c.VMs) == 0 {
162-
return errors.Errorf("found no VMs in %s", clusterFilename(name))
162+
// When roachprod-centralized is available, clusters are registered empty
163+
// and updated later when the cluster is actually created to avoid race conditions
164+
// in creation and background clusters syncs.
165+
// Ignore empty clusters for now.
166+
continue
163167
}
164168
if shouldIgnoreCluster(c) {
165169
continue

0 commit comments

Comments
 (0)