Skip to content

Commit

Permalink
K8s/Dashboards: Delegate large objects to blob store (grafana#94943)
Browse files Browse the repository at this point in the history
  • Loading branch information
ryantxu authored Nov 9, 2024
1 parent b1c5aa0 commit c0de407
Show file tree
Hide file tree
Showing 20 changed files with 442 additions and 81 deletions.
3 changes: 2 additions & 1 deletion pkg/apimachinery/utils/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,9 @@ func (m *grafanaMetaAccessor) GetBlob() *BlobInfo {
func (m *grafanaMetaAccessor) SetBlob(info *BlobInfo) {
if info == nil {
m.SetAnnotation(AnnoKeyBlob, "") // delete
} else {
m.SetAnnotation(AnnoKeyBlob, info.String())
}
m.SetAnnotation(AnnoKeyBlob, info.String())
}

func (m *grafanaMetaAccessor) GetFolder() string {
Expand Down
52 changes: 52 additions & 0 deletions pkg/registry/apis/dashboard/large.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package dashboard

import (
"encoding/json"
"fmt"

"k8s.io/apimachinery/pkg/runtime"

commonV0 "github.com/grafana/grafana/pkg/apimachinery/apis/common/v0alpha1"
dashboard "github.com/grafana/grafana/pkg/apis/dashboard/v0alpha1"
"github.com/grafana/grafana/pkg/storage/unified/apistore"
)

func newDashboardLargeObjectSupport() *apistore.BasicLargeObjectSupport {
return &apistore.BasicLargeObjectSupport{
TheGroupResource: dashboard.DashboardResourceInfo.GroupResource(),

// byte size, while testing lets do almost everything (10bytes)
ThresholdSize: 10,

// 10mb -- we should check what the largest ones are... might be bigger
MaxByteSize: 10 * 1024 * 1024,

ReduceSpec: func(obj runtime.Object) error {
dash, ok := obj.(*dashboard.Dashboard)
if !ok {
return fmt.Errorf("expected dashboard")
}
old := dash.Spec.Object
spec := commonV0.Unstructured{Object: make(map[string]any)}
dash.Spec = spec
dash.SetManagedFields(nil) // this could be bigger than the object!

keep := []string{"title", "description", "schemaVersion"}
for _, k := range keep {
v, ok := old[k]
if ok {
spec.Object[k] = v
}
}
return nil
},

RebuildSpec: func(obj runtime.Object, blob []byte) error {
dash, ok := obj.(*dashboard.Dashboard)
if !ok {
return fmt.Errorf("expected dashboard")
}
return json.Unmarshal(blob, &dash.Spec)
},
}
}
60 changes: 60 additions & 0 deletions pkg/registry/apis/dashboard/large_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package dashboard

import (
"encoding/json"
"os"
"testing"

"github.com/stretchr/testify/require"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

dashboard "github.com/grafana/grafana/pkg/apis/dashboard/v0alpha1"
)

func TestLargeDashboardSupport(t *testing.T) {
devdash := "../../../../devenv/dev-dashboards/all-panels.json"

// nolint:gosec
// We can ignore the gosec G304 warning because this is a test with hardcoded input values
f, err := os.ReadFile(devdash)
require.NoError(t, err)

dash := &dashboard.Dashboard{
ObjectMeta: v1.ObjectMeta{
Name: "test",
Namespace: "test",
},
}
err = json.Unmarshal(f, &dash.Spec)
require.NoError(t, err)

expectedPanelCount := 19
panels, found, err := unstructured.NestedSlice(dash.Spec.Object, "panels")
require.NoError(t, err)
require.True(t, found)
require.Len(t, panels, expectedPanelCount)

largeObject := newDashboardLargeObjectSupport()

// Convert the dashboard to a small value
err = largeObject.ReduceSpec(dash)
require.NoError(t, err)

small, err := json.MarshalIndent(&dash.Spec, "", " ")
require.NoError(t, err)
require.JSONEq(t, `{
"schemaVersion": 33,
"title": "Panel tests - All panels"
}`, string(small))

// Now make it big again
err = largeObject.RebuildSpec(dash, f)
require.NoError(t, err)

// check that all panels exist again
panels, found, err = unstructured.NestedSlice(dash.Spec.Object, "panels")
require.NoError(t, err)
require.True(t, found)
require.Len(t, panels, expectedPanelCount)
}
10 changes: 0 additions & 10 deletions pkg/registry/apis/dashboard/legacy_storage.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package dashboard

import (
"context"

"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/generic"
Expand Down Expand Up @@ -48,16 +46,8 @@ func (s *dashboardStorage) newStore(scheme *runtime.Scheme, defaultOptsGetter ge
return nil, err
}
client := resource.NewLocalResourceClient(server)
// This is needed as the apistore doesn't allow any core grafana dependencies. We extract the needed features
// to a map, to check them in the apistore itself.
features := make(map[string]any)
if s.features.IsEnabled(context.Background(), featuremgmt.FlagUnifiedStorageBigObjectsSupport) {
features[featuremgmt.FlagUnifiedStorageBigObjectsSupport] = struct{}{}
}
optsGetter := apistore.NewRESTOptionsGetterForClient(client,
defaultOpts.StorageConfig.Config,
features,
)

return grafanaregistry.NewRegistryStore(scheme, resourceInfo, optsGetter)
}
11 changes: 10 additions & 1 deletion pkg/registry/apis/dashboard/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,15 @@ func (b *DashboardsAPIBuilder) UpdateAPIGroupInfo(apiGroupInfo *genericapiserver
return err
}

// Split dashboards when they are large
var largeObjects apistore.LargeObjectSupport
if b.legacy.features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageBigObjectsSupport) {
largeObjects = newDashboardLargeObjectSupport()
opts.StorageOptions(dash.GroupResource(), apistore.StorageOptions{
LargeObjectSupport: largeObjects,
})
}

storage := map[string]rest.Storage{}
storage[dash.StoragePath()] = legacyStore
storage[dash.StoragePath("history")] = apistore.NewHistoryConnector(
Expand All @@ -157,7 +166,7 @@ func (b *DashboardsAPIBuilder) UpdateAPIGroupInfo(apiGroupInfo *genericapiserver
}

// Register the DTO endpoint that will consolidate all dashboard bits
storage[dash.StoragePath("dto")], err = newDTOConnector(storage[dash.StoragePath()], b)
storage[dash.StoragePath("dto")], err = newDTOConnector(storage[dash.StoragePath()], largeObjects, b)
if err != nil {
return err
}
Expand Down
27 changes: 20 additions & 7 deletions pkg/registry/apis/dashboard/sub_dto.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import (
"net/http"
"strconv"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/rest"

"github.com/grafana/authlib/claims"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/apimachinery/utils"
Expand All @@ -17,27 +21,27 @@ import (
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
"github.com/grafana/grafana/pkg/services/dashboards"
"github.com/grafana/grafana/pkg/services/guardian"
"github.com/grafana/grafana/pkg/storage/unified/apistore"
"github.com/grafana/grafana/pkg/storage/unified/resource"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/rest"
)

// The DTO returns everything the UI needs in a single request
type DTOConnector struct {
getter rest.Getter
legacy legacy.DashboardAccess
unified resource.ResourceClient
largeObjects apistore.LargeObjectSupport
accessControl accesscontrol.AccessControl
log log.Logger
}

func newDTOConnector(dash rest.Storage, builder *DashboardsAPIBuilder) (rest.Storage, error) {
func newDTOConnector(dash rest.Storage, largeObjects apistore.LargeObjectSupport, builder *DashboardsAPIBuilder) (rest.Storage, error) {
ok := false
v := &DTOConnector{
legacy: builder.legacy.access,
accessControl: builder.accessControl,
unified: builder.unified,
largeObjects: largeObjects,
log: builder.log,
}
v.getter, ok = dash.(rest.Getter)
Expand Down Expand Up @@ -86,7 +90,7 @@ func (r *DTOConnector) Connect(ctx context.Context, name string, opts runtime.Ob
return nil, err
}

rawobj, err := r.getter.Get(ctx, name, &v1.GetOptions{})
rawobj, err := r.getter.Get(ctx, name, &metav1.GetOptions{})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -137,8 +141,17 @@ func (r *DTOConnector) Connect(ctx context.Context, name string, opts runtime.Ob

// Check for blob info
blobInfo := obj.GetBlob()
if blobInfo != nil {
fmt.Printf("TODO, load full blob from storage %+v\n", blobInfo)
if blobInfo != nil && r.largeObjects != nil {
gr := r.largeObjects.GroupResource()
err = r.largeObjects.Reconstruct(ctx, &resource.ResourceKey{
Group: gr.Group,
Resource: gr.Resource,
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
}, r.unified, obj)
if err != nil {
return nil, err
}
}

access.Slug = slugify.Slugify(dash.Spec.GetNestedString("title"))
Expand Down
5 changes: 3 additions & 2 deletions pkg/services/apiserver/aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"testing"
"time"

"github.com/grafana/grafana/pkg/storage/unified/apistore"
"github.com/stretchr/testify/require"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
genericapiserver "k8s.io/apiserver/pkg/server"
Expand All @@ -17,6 +16,8 @@ import (
"k8s.io/kube-aggregator/pkg/apiserver"
aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
aggregatoropenapi "k8s.io/kube-aggregator/pkg/generated/openapi"

"github.com/grafana/grafana/pkg/storage/unified/apistore"
)

// TestAggregatorPostStartHooks tests that the kube-aggregator server has the expected default post start hooks enabled.
Expand All @@ -41,7 +42,7 @@ func TestAggregatorPostStartHooks(t *testing.T) {
cfg.GenericConfig.SharedInformerFactory = informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 10*time.Minute)

// override the RESTOptionsGetter to use the in memory storage options
restOptionsGetter, err := apistore.NewRESTOptionsGetterMemory(*storagebackend.NewDefaultConfig("memory", nil), make(map[string]any))
restOptionsGetter, err := apistore.NewRESTOptionsGetterMemory(*storagebackend.NewDefaultConfig("memory", nil))
require.NoError(t, err)
cfg.GenericConfig.RESTOptionsGetter = restOptionsGetter

Expand Down
2 changes: 2 additions & 0 deletions pkg/services/apiserver/builder/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/kube-openapi/pkg/spec3"

grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
"github.com/grafana/grafana/pkg/storage/unified/apistore"
)

// TODO: this (or something like it) belongs in grafana-app-sdk,
Expand Down Expand Up @@ -64,6 +65,7 @@ type APIGroupOptions struct {
OptsGetter generic.RESTOptionsGetter
DualWriteBuilder grafanarest.DualWriteBuilder
MetricsRegister prometheus.Registerer
StorageOptions apistore.StorageOptionsRegister
}

// Builders that implement OpenAPIPostProcessor are given a chance to modify the schema directly
Expand Down
6 changes: 4 additions & 2 deletions pkg/services/apiserver/builder/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"regexp"
"time"

"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/prometheus/client_golang/prometheus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -25,6 +24,8 @@ import (
"k8s.io/klog/v2"
"k8s.io/kube-openapi/pkg/common"

"github.com/grafana/grafana/pkg/storage/unified/apistore"

"github.com/grafana/grafana/pkg/apiserver/endpoints/filters"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
Expand Down Expand Up @@ -167,7 +168,7 @@ func InstallAPIs(
namespaceMapper request.NamespaceMapper,
kvStore grafanarest.NamespacedKVStore,
serverLock ServerLockService,
features featuremgmt.FeatureToggles,
optsregister apistore.StorageOptionsRegister,
) error {
// dual writing is only enabled when the storage type is not legacy.
// this is needed to support setting a default RESTOptionsGetter for new APIs that don't
Expand Down Expand Up @@ -243,6 +244,7 @@ func InstallAPIs(
OptsGetter: optsGetter,
DualWriteBuilder: dualWrite,
MetricsRegister: reg,
StorageOptions: optsregister,
}); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/services/apiserver/options/grafana-aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (o *GrafanaAggregatorOptions) ApplyTo(aggregatorConfig *aggregatorapiserver
return err
}
// override the RESTOptionsGetter to use the in memory storage options
restOptionsGetter, err := apistore.NewRESTOptionsGetterMemory(etcdOptions.StorageConfig, make(map[string]any))
restOptionsGetter, err := apistore.NewRESTOptionsGetterMemory(etcdOptions.StorageConfig)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/services/apiserver/options/kube-aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (o *KubeAggregatorOptions) ApplyTo(aggregatorConfig *aggregatorapiserver.Co
return err
}
// override the RESTOptionsGetter to use the in memory storage options
restOptionsGetter, err := apistore.NewRESTOptionsGetterMemory(etcdOptions.StorageConfig, make(map[string]any))
restOptionsGetter, err := apistore.NewRESTOptionsGetterMemory(etcdOptions.StorageConfig)
if err != nil {
return err
}
Expand Down
21 changes: 11 additions & 10 deletions pkg/services/apiserver/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"net/http"
"path"

"github.com/grafana/dskit/services"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/prometheus/client_golang/prometheus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -19,6 +17,8 @@ import (
"k8s.io/client-go/tools/clientcmd"
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"

"github.com/grafana/dskit/services"
"github.com/grafana/grafana-plugin-sdk-go/backend"
dataplaneaggregator "github.com/grafana/grafana/pkg/aggregator/apiserver"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/apimachinery/identity"
Expand Down Expand Up @@ -295,6 +295,8 @@ func (s *service) start(ctx context.Context) error {
serverConfig.LoopbackClientConfig.Transport = transport
serverConfig.LoopbackClientConfig.TLSClientConfig = clientrest.TLSClientConfig{}

var optsregister apistore.StorageOptionsRegister

if o.StorageOptions.StorageType == grafanaapiserveroptions.StorageTypeEtcd {
if err := o.RecommendedOptions.Etcd.Validate(); len(err) > 0 {
return err[0]
Expand All @@ -303,14 +305,11 @@ func (s *service) start(ctx context.Context) error {
return err
}
} else {
// This is needed as the apistore doesn't allow any core grafana dependencies.
features := make(map[string]any)
if s.features.IsEnabled(context.Background(), featuremgmt.FlagUnifiedStorageBigObjectsSupport) {
features[featuremgmt.FlagUnifiedStorageBigObjectsSupport] = struct{}{}
}
getter := apistore.NewRESTOptionsGetterForClient(s.unified, o.RecommendedOptions.Etcd.StorageConfig)
optsregister = getter.RegisterOptions

// Use unified storage client
serverConfig.Config.RESTOptionsGetter = apistore.NewRESTOptionsGetterForClient(
s.unified, o.RecommendedOptions.Etcd.StorageConfig, features)
serverConfig.Config.RESTOptionsGetter = getter
}

// Add OpenAPI specs for each group+version
Expand All @@ -337,7 +336,9 @@ func (s *service) start(ctx context.Context) error {
// Install the API group+version
err = builder.InstallAPIs(Scheme, Codecs, server, serverConfig.RESTOptionsGetter, builders, o.StorageOptions,
// Required for the dual writer initialization
s.metrics, request.GetNamespaceMapper(s.cfg), kvstore.WithNamespace(s.kvStore, 0, "storage.dualwriting"), s.serverLockService, s.features,
s.metrics, request.GetNamespaceMapper(s.cfg), kvstore.WithNamespace(s.kvStore, 0, "storage.dualwriting"),
s.serverLockService,
optsregister,
)
if err != nil {
return err
Expand Down
Loading

0 comments on commit c0de407

Please sign in to comment.