Skip to content

Commit 64e04b0

Browse files
committed
roachprod: prepare centralized - use centralized from roachprod lib
This patch starts using the roachprod centralized client from the roachprod library when properly configured via environment variables. Epic: none Release note: None
1 parent 5da38cd commit 64e04b0

File tree

9 files changed

+231
-49
lines changed

9 files changed

+231
-49
lines changed

pkg/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1812,6 +1812,7 @@ GO_TARGETS = [
18121812
"//pkg/roachpb:roachpb_test",
18131813
"//pkg/roachprod/blobfixture:blobfixture",
18141814
"//pkg/roachprod/blobfixture:blobfixture_test",
1815+
"//pkg/roachprod/centralizedapi:centralizedapi",
18151816
"//pkg/roachprod/cloud/types:types",
18161817
"//pkg/roachprod/cloud:cloud",
18171818
"//pkg/roachprod/cloud:cloud_test",

pkg/cmd/roachprod/cli/commands.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -439,9 +439,15 @@ hosts file.
439439
}
440440
} else {
441441
machineType := func(clusterVMs vm.List) string {
442+
if len(clusterVMs) == 0 {
443+
return ""
444+
}
442445
return clusterVMs[0].MachineType
443446
}
444447
cpuArch := func(clusterVMs vm.List) string {
448+
if len(clusterVMs) == 0 {
449+
return ""
450+
}
445451
// Display CPU architecture and family.
446452
if clusterVMs[0].CPUArch == "" {
447453
// N.B. Either a local cluster or unsupported cloud provider.
@@ -501,6 +507,9 @@ hosts file.
501507
}
502508
timeRemaining := c.LifetimeRemaining().Round(time.Second)
503509
formatTTL := func(ttl time.Duration) string {
510+
if len(c.VMs) == 0 {
511+
return color.HiRedString("")
512+
}
504513
if c.VMs[0].Preemptible {
505514
return color.HiMagentaString(ttl.String())
506515
} else {
@@ -528,7 +537,12 @@ hosts file.
528537
color.HiGreenString(""))
529538
}
530539
} else {
531-
fmt.Fprintf(tw, "\t(-)")
540+
fmt.Fprintf(tw, "\t%s\t%s\t%s\t%s\t%s\t",
541+
color.HiGreenString(""),
542+
color.HiGreenString(""),
543+
color.HiWhiteString(""),
544+
color.HiWhiteString(""),
545+
color.HiGreenString(""))
532546
}
533547
fmt.Fprintf(tw, "\n")
534548
}

pkg/roachprod/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ go_library(
1616
"//pkg/build",
1717
"//pkg/cli/exit",
1818
"//pkg/cmd/roachprod/grafana",
19+
"//pkg/roachprod/centralizedapi",
1920
"//pkg/roachprod/cloud",
2021
"//pkg/roachprod/cloud/types",
2122
"//pkg/roachprod/config",
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library")
2+
3+
go_library(
4+
name = "centralizedapi",
5+
srcs = ["centralizedapi.go"],
6+
importpath = "github.com/cockroachdb/cockroach/pkg/roachprod/centralizedapi",
7+
visibility = ["//visibility:public"],
8+
deps = ["//pkg/cmd/roachprod-centralized/client"],
9+
)
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package centralizedapi
7+
8+
import (
9+
"sync"
10+
11+
"github.com/cockroachdb/cockroach/pkg/cmd/roachprod-centralized/client"
12+
)
13+
14+
var (
15+
once sync.Once
16+
centralizedapi *client.Client
17+
)
18+
19+
// GetCentralizedAPIClient returns a singleton instance of the centralized API client.
20+
func GetCentralizedAPIClient() *client.Client {
21+
once.Do(func() {
22+
client, err := client.NewClientWithConfig(client.LoadConfigFromEnv())
23+
if err != nil {
24+
panic(err)
25+
}
26+
centralizedapi = client
27+
})
28+
return centralizedapi
29+
}

pkg/roachprod/cloud/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ go_library(
1111
visibility = ["//visibility:public"],
1212
deps = [
1313
"//pkg/cloud/amazon",
14+
"//pkg/cmd/roachprod-centralized/client",
15+
"//pkg/roachprod/centralizedapi",
1416
"//pkg/roachprod/cloud/types",
1517
"//pkg/roachprod/config",
1618
"//pkg/roachprod/logger",

pkg/roachprod/cloud/cluster_cloud.go

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
"sort"
1212
"time"
1313

14+
roachprodcentralized "github.com/cockroachdb/cockroach/pkg/cmd/roachprod-centralized/client"
15+
"github.com/cockroachdb/cockroach/pkg/roachprod/centralizedapi"
1416
cloudcluster "github.com/cockroachdb/cockroach/pkg/roachprod/cloud/types"
1517
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
1618
"github.com/cockroachdb/cockroach/pkg/roachprod/ui"
@@ -140,6 +142,22 @@ func (c *Cloud) ListCloud(l *logger.Logger, options vm.ListOptions) error {
140142

141143
providers := append(c.providers, c.localProviders...)
142144

145+
apiClient := centralizedapi.GetCentralizedAPIClient()
146+
if apiClient.IsEnabled() {
147+
// Use the centralized roachprod service to list clusters.
148+
response, err := apiClient.ListClusters(context.Background(), l, roachprodcentralized.ListClustersOptions{})
149+
if err != nil {
150+
l.Errorf("Error listing clusters using the centralized API: %s", err)
151+
} else {
152+
c.Clusters = response.Clusters
153+
c.BadInstances = response.BadInstances
154+
155+
// Remote providers are already listed by the centralized service,
156+
// so only use local providers in the local listing.
157+
providers = c.localProviders
158+
}
159+
}
160+
143161
// List all VMs across all providers in parallel.
144162
providerVMs := make(map[string]vm.List)
145163
var g errgroup.Group
@@ -268,6 +286,14 @@ func CreateCluster(l *logger.Logger, opts []*ClusterCreateOpts) (*cloudcluster.C
268286
Lifetime: opts[0].CreateOpts.Lifetime,
269287
}
270288

289+
apiClient := centralizedapi.GetCentralizedAPIClient()
290+
if apiClient.IsEnabled() {
291+
// Register the new cluster with the centralized roachprod service.
292+
if err := apiClient.RegisterCluster(context.Background(), l, c); err != nil {
293+
return nil, err
294+
}
295+
}
296+
271297
// Keep track of the total number of nodes created, as we append all cluster names
272298
// with the node count.
273299
var nodesCreated int
@@ -330,6 +356,14 @@ func CreateCluster(l *logger.Logger, opts []*ClusterCreateOpts) (*cloudcluster.C
330356
// `roachprod.Start` expects nodes/vms to be in sorted order
331357
sort.Sort(c.VMs)
332358

359+
if apiClient.IsEnabled() {
360+
// Use the centralized roachprod service to register the final state
361+
// of the cluster.
362+
if err := apiClient.RegisterClusterUpdate(context.Background(), l, c); err != nil {
363+
return nil, err
364+
}
365+
}
366+
333367
return c, nil
334368
}
335369

@@ -370,6 +404,14 @@ func GrowCluster(l *logger.Logger, c *cloudcluster.Cluster, numNodes int) error
370404
// `roachprod.Start` expects nodes/vms to be in sorted order
371405
sort.Sort(c.VMs)
372406

407+
apiClient := centralizedapi.GetCentralizedAPIClient()
408+
if apiClient.IsEnabled() {
409+
// Use the centralized roachprod service to create the cluster.
410+
if err := apiClient.RegisterClusterUpdate(context.Background(), l, c); err != nil {
411+
return err
412+
}
413+
}
414+
373415
return nil
374416
}
375417

@@ -401,6 +443,15 @@ func ShrinkCluster(l *logger.Logger, c *cloudcluster.Cluster, numNodes int) erro
401443

402444
// Update the list of VMs in the cluster.
403445
c.VMs = c.VMs[:len(c.VMs)-numNodes]
446+
447+
apiClient := centralizedapi.GetCentralizedAPIClient()
448+
if apiClient.IsEnabled() {
449+
// Use the centralized roachprod service to create the cluster.
450+
if err := apiClient.RegisterClusterUpdate(context.Background(), l, c); err != nil {
451+
return err
452+
}
453+
}
454+
404455
return nil
405456
}
406457

@@ -419,7 +470,10 @@ func DestroyCluster(l *logger.Logger, c *cloudcluster.Cluster) error {
419470
publicRecords = append(publicRecords, v.PublicDNS)
420471
}
421472
dnsErr := vm.FanOutDNS(c.VMs, func(p vm.DNSProvider, vms vm.List) error {
422-
publicRecordsErr := p.DeletePublicRecordsByName(context.Background(), publicRecords...)
473+
var publicRecordsErr error
474+
if !centralizedapi.GetCentralizedAPIClient().IsEnabled() {
475+
publicRecordsErr = p.DeletePublicRecordsByName(context.Background(), publicRecords...)
476+
}
423477
srvRecordsErr := p.DeleteSRVRecordsBySubdomain(context.Background(), c.Name)
424478
return errors.CombineErrors(publicRecordsErr, srvRecordsErr)
425479
})
@@ -436,6 +490,15 @@ func DestroyCluster(l *logger.Logger, c *cloudcluster.Cluster) error {
436490
return p.Delete(l, vms)
437491
})
438492
stopSpinner()
493+
494+
if clusterErr == nil {
495+
apiClient := centralizedapi.GetCentralizedAPIClient()
496+
if apiClient.IsEnabled() {
497+
if err := apiClient.RegisterClusterDelete(context.Background(), l, c.Name); err != nil {
498+
l.Printf("WARNING: failed to delete cluster %s from centralized service: %s", c.Name, err)
499+
}
500+
}
501+
}
439502
return errors.CombineErrors(dnsErr, clusterErr)
440503
}
441504

@@ -450,5 +513,13 @@ func ExtendCluster(l *logger.Logger, c *cloudcluster.Cluster, extension time.Dur
450513
return err
451514
}
452515
c.Lifetime = newLifetime
516+
517+
apiClient := centralizedapi.GetCentralizedAPIClient()
518+
if apiClient.IsEnabled() {
519+
// Use the centralized roachprod service to create the cluster.
520+
if err := apiClient.RegisterClusterUpdate(context.Background(), l, c); err != nil {
521+
return err
522+
}
523+
}
453524
return nil
454525
}

pkg/roachprod/clusters_cache.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,11 @@ func LoadClusters() error {
159159
}
160160

161161
if len(c.VMs) == 0 {
162-
return errors.Errorf("found no VMs in %s", clusterFilename(name))
162+
// When roachprod-centralized is available, clusters are registered empty
163+
// and updated later when the cluster is actually created to avoid race conditions
164+
// in creation and background clusters syncs.
165+
// Ignore empty clusters for now.
166+
continue
163167
}
164168
if shouldIgnoreCluster(c) {
165169
continue

0 commit comments

Comments
 (0)