From 5171aa2c0376c4027036e7a1d12a42ec0baf2368 Mon Sep 17 00:00:00 2001 From: Joseph McWilliams Date: Thu, 19 Dec 2024 13:35:24 -0800 Subject: [PATCH] check compaction settings from API to compare to desired, add ability to set dynamic configurations with Druid --- apis/druid/v1alpha1/druid_types.go | 9 + apis/druid/v1alpha1/druidingestion_types.go | 17 +- apis/druid/v1alpha1/zz_generated.deepcopy.go | 19 +- .../druid.apache.org_druidingestions.yaml | 3 - chart/crds/druid.apache.org_druids.yaml | 28 ++ .../druid.apache.org_druidingestions.yaml | 3 - config/crd/bases/druid.apache.org_druids.yaml | 28 ++ controllers/druid/druid_controller.go | 11 +- controllers/druid/dynamic_config.go | 102 ++++++++ controllers/ingestion/reconciler.go | 243 ++++++------------ controllers/ingestion/reconciler_test.go | 73 ++---- e2e/configs/druid-cr.yaml | 8 + go.mod | 1 - go.sum | 1 - pkg/druidapi/druidapi.go | 135 ++++++++++ pkg/druidapi/druidapi_test.go | 56 ++++ pkg/util/util.go | 79 ++++++ pkg/util/util_test.go | 135 ++++++++++ 18 files changed, 692 insertions(+), 259 deletions(-) create mode 100644 controllers/druid/dynamic_config.go create mode 100644 pkg/druidapi/druidapi.go create mode 100644 pkg/druidapi/druidapi_test.go create mode 100644 pkg/util/util.go create mode 100644 pkg/util/util_test.go diff --git a/apis/druid/v1alpha1/druid_types.go b/apis/druid/v1alpha1/druid_types.go index 58ad0c00..9911c2a0 100644 --- a/apis/druid/v1alpha1/druid_types.go +++ b/apis/druid/v1alpha1/druid_types.go @@ -3,12 +3,14 @@ package v1alpha1 import ( "encoding/json" + druidapi "github.com/datainfrahq/druid-operator/pkg/druidapi" appsv1 "k8s.io/api/apps/v1" autoscalev2 "k8s.io/api/autoscaling/v2" v1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" policyv1 "k8s.io/api/policy/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" ) // druid-operator deploys a druid cluster from given spec below, based on the spec it would create following @@ -281,6 +283,13 @@ type DruidSpec struct { // CoreSite Contents of `core-site.xml`. // +optional CoreSite string `json:"core-site.xml,omitempty"` + + // Dynamic Configurations for Druid. Applied through the dynamic configuration API. + // +optional + DynamicConfig runtime.RawExtension `json:"dynamicConfig,omitempty"` + + // +optional + Auth druidapi.Auth `json:"auth,omitempty"` } // DruidNodeSpec Specification of `Druid` Node type and its configurations. diff --git a/apis/druid/v1alpha1/druidingestion_types.go b/apis/druid/v1alpha1/druidingestion_types.go index 144a91a9..7a81d280 100644 --- a/apis/druid/v1alpha1/druidingestion_types.go +++ b/apis/druid/v1alpha1/druidingestion_types.go @@ -17,6 +17,7 @@ limitations under the License. package v1alpha1 import ( + druidapi "github.com/datainfrahq/druid-operator/pkg/druidapi" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -40,7 +41,7 @@ type DruidIngestionSpec struct { // +required Ingestion IngestionSpec `json:"ingestion"` // +optional - Auth Auth `json:"auth"` + Auth druidapi.Auth `json:"auth"` } type IngestionSpec struct { @@ -72,23 +73,9 @@ type DruidIngestionStatus struct { // CurrentIngestionSpec is a string instead of RawExtension to maintain compatibility with existing // IngestionSpecs that are stored as JSON strings. CurrentIngestionSpec string `json:"currentIngestionSpec.json"` - CurrentCompaction runtime.RawExtension `json:"compaction,omitempty"` CurrentRules []runtime.RawExtension `json:"rules,omitempty"` } -type AuthType string - -const ( - BasicAuth AuthType = "basic-auth" -) - -type Auth struct { - // +required - Type AuthType `json:"type"` - // +required - SecretRef v1.SecretReference `json:"secretRef"` -} - // +kubebuilder:object:root=true // +kubebuilder:subresource:status // +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" diff --git a/apis/druid/v1alpha1/zz_generated.deepcopy.go b/apis/druid/v1alpha1/zz_generated.deepcopy.go index 2dc3f7fd..a0982056 100644 --- a/apis/druid/v1alpha1/zz_generated.deepcopy.go +++ b/apis/druid/v1alpha1/zz_generated.deepcopy.go @@ -82,22 +82,6 @@ func (in *AdditionalContainer) DeepCopy() *AdditionalContainer { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *Auth) DeepCopyInto(out *Auth) { - *out = *in - out.SecretRef = in.SecretRef -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Auth. -func (in *Auth) DeepCopy() *Auth { - if in == nil { - return nil - } - out := new(Auth) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DeepStorageSpec) DeepCopyInto(out *DeepStorageSpec) { *out = *in @@ -286,7 +270,6 @@ func (in *DruidIngestionSpec) DeepCopy() *DruidIngestionSpec { func (in *DruidIngestionStatus) DeepCopyInto(out *DruidIngestionStatus) { *out = *in in.LastUpdateTime.DeepCopyInto(&out.LastUpdateTime) - in.CurrentCompaction.DeepCopyInto(&out.CurrentCompaction) if in.CurrentRules != nil { in, out := &in.CurrentRules, &out.CurrentRules *out = make([]runtime.RawExtension, len(*in)) @@ -714,6 +697,8 @@ func (in *DruidSpec) DeepCopyInto(out *DruidSpec) { *out = new(DeepStorageSpec) (*in).DeepCopyInto(*out) } + in.DynamicConfig.DeepCopyInto(&out.DynamicConfig) + out.Auth = in.Auth } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DruidSpec. diff --git a/chart/crds/druid.apache.org_druidingestions.yaml b/chart/crds/druid.apache.org_druidingestions.yaml index 08679f61..70e7fe47 100644 --- a/chart/crds/druid.apache.org_druidingestions.yaml +++ b/chart/crds/druid.apache.org_druidingestions.yaml @@ -106,9 +106,6 @@ spec: type: object status: properties: - compaction: - type: object - x-kubernetes-preserve-unknown-fields: true currentIngestionSpec.json: description: |- CurrentIngestionSpec is a string instead of RawExtension to maintain compatibility with existing diff --git a/chart/crds/druid.apache.org_druids.yaml b/chart/crds/druid.apache.org_druids.yaml index 880a1e42..34ec4b9a 100644 --- a/chart/crds/druid.apache.org_druids.yaml +++ b/chart/crds/druid.apache.org_druids.yaml @@ -1265,6 +1265,29 @@ spec: type: array type: object type: object + auth: + properties: + secretRef: + description: |- + SecretReference represents a Secret Reference. It has enough information to retrieve secret + in any namespace + properties: + name: + description: name is unique within a namespace to reference + a secret resource. + type: string + namespace: + description: namespace defines the space within which the + secret name must be unique. + type: string + type: object + x-kubernetes-map-type: atomic + type: + type: string + required: + - secretRef + - type + type: object common.runtime.properties: description: CommonRuntimeProperties Content fo the `common.runtime.properties` configuration file. @@ -1478,6 +1501,11 @@ spec: description: DisablePVCDeletionFinalizer Whether PVCs shall be deleted on the deletion of the Druid cluster. type: boolean + dynamicConfig: + description: Dynamic Configurations for Druid. Applied through the + dynamic configuration API. + type: object + x-kubernetes-preserve-unknown-fields: true env: description: Env Environment variables for druid containers. items: diff --git a/config/crd/bases/druid.apache.org_druidingestions.yaml b/config/crd/bases/druid.apache.org_druidingestions.yaml index 08679f61..70e7fe47 100644 --- a/config/crd/bases/druid.apache.org_druidingestions.yaml +++ b/config/crd/bases/druid.apache.org_druidingestions.yaml @@ -106,9 +106,6 @@ spec: type: object status: properties: - compaction: - type: object - x-kubernetes-preserve-unknown-fields: true currentIngestionSpec.json: description: |- CurrentIngestionSpec is a string instead of RawExtension to maintain compatibility with existing diff --git a/config/crd/bases/druid.apache.org_druids.yaml b/config/crd/bases/druid.apache.org_druids.yaml index 880a1e42..34ec4b9a 100644 --- a/config/crd/bases/druid.apache.org_druids.yaml +++ b/config/crd/bases/druid.apache.org_druids.yaml @@ -1265,6 +1265,29 @@ spec: type: array type: object type: object + auth: + properties: + secretRef: + description: |- + SecretReference represents a Secret Reference. It has enough information to retrieve secret + in any namespace + properties: + name: + description: name is unique within a namespace to reference + a secret resource. + type: string + namespace: + description: namespace defines the space within which the + secret name must be unique. + type: string + type: object + x-kubernetes-map-type: atomic + type: + type: string + required: + - secretRef + - type + type: object common.runtime.properties: description: CommonRuntimeProperties Content fo the `common.runtime.properties` configuration file. @@ -1478,6 +1501,11 @@ spec: description: DisablePVCDeletionFinalizer Whether PVCs shall be deleted on the deletion of the Druid cluster. type: boolean + dynamicConfig: + description: Dynamic Configurations for Druid. Applied through the + dynamic configuration API. + type: object + x-kubernetes-preserve-unknown-fields: true env: description: Env Environment variables for druid containers. items: diff --git a/controllers/druid/druid_controller.go b/controllers/druid/druid_controller.go index 60e59d49..4959d004 100644 --- a/controllers/druid/druid_controller.go +++ b/controllers/druid/druid_controller.go @@ -72,11 +72,18 @@ func (r *DruidReconciler) Reconcile(ctx context.Context, request reconcile.Reque // Initialize Emit Events var emitEvent EventEmitter = EmitEventFuncs{r.Recorder} + // Deploy Druid Cluster if err := deployDruidCluster(ctx, r.Client, instance, emitEvent); err != nil { return ctrl.Result{}, err - } else { - return ctrl.Result{RequeueAfter: r.ReconcileWait}, nil } + + // Update Druid Dynamic Configs + if err := updateDruidDynamicConfigs(ctx, r.Client, instance, emitEvent); err != nil { + return ctrl.Result{}, err + } + + // If both operations succeed, requeue after specified wait time + return ctrl.Result{RequeueAfter: r.ReconcileWait}, nil } func (r *DruidReconciler) SetupWithManager(mgr ctrl.Manager) error { diff --git a/controllers/druid/dynamic_config.go b/controllers/druid/dynamic_config.go new file mode 100644 index 00000000..f8f510e8 --- /dev/null +++ b/controllers/druid/dynamic_config.go @@ -0,0 +1,102 @@ +package druid + +import ( + "context" + "errors" + "fmt" + "net/http" + + "github.com/datainfrahq/druid-operator/apis/druid/v1alpha1" + druidapi "github.com/datainfrahq/druid-operator/pkg/druidapi" + internalhttp "github.com/datainfrahq/druid-operator/pkg/http" + "github.com/datainfrahq/druid-operator/pkg/util" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// updateDruidDynamicConfigs updates the Druid cluster's dynamic configurations. +func updateDruidDynamicConfigs(ctx context.Context, client client.Client, druid *v1alpha1.Druid, emitEvent EventEmitter) error { + if druid.Spec.DynamicConfig.Size() == 0 { + return nil + } + + svcName, err := druidapi.GetRouterSvcUrl(druid.Namespace, druid.Name, client) + if err != nil { + emitEvent.EmitEventGeneric(druid, "GetRouterSvcUrlFailed", "Failed to get router service URL", err) + return err + } + + basicAuth, err := druidapi.GetAuthCreds( + ctx, + client, + druid.Spec.Auth, + ) + if err != nil { + emitEvent.EmitEventGeneric(druid, "GetAuthCredsFailed", "Failed to get authentication credentials", err) + return err + } + + // Create the HTTP client with basic authentication + httpClient := internalhttp.NewHTTPClient( + &http.Client{}, + &internalhttp.Auth{BasicAuth: basicAuth}, + ) + + // Define the URL path for dynamic configurations + dynamicConfigPath := druidapi.MakePath(svcName, "indexer", "worker") + + // Fetch current dynamic configurations + currentResp, err := httpClient.Do( + http.MethodGet, + dynamicConfigPath, + nil, + ) + if err != nil { + emitEvent.EmitEventGeneric(druid, "FetchCurrentConfigsFailed", "Failed to fetch current dynamic configurations", err) + return err + } + if currentResp.StatusCode != http.StatusOK { + err = fmt.Errorf( + "failed to retrieve current Druid dynamic configurations. Status code: %d, Response body: %s", + currentResp.StatusCode, string(currentResp.ResponseBody), + ) + emitEvent.EmitEventGeneric(druid, "FetchCurrentConfigsFailed", "Failed to fetch current dynamic configurations", err) + return err + } + + // Handle empty response body + var currentConfigsJson string + if len(currentResp.ResponseBody) == 0 { + currentConfigsJson = "{}" // Initialize as empty JSON object if response body is empty + } else { + currentConfigsJson = currentResp.ResponseBody + } + + // Compare current and desired configurations + equal, err := util.IncludesJson(currentConfigsJson, string(druid.Spec.DynamicConfig.Raw)) + if err != nil { + emitEvent.EmitEventGeneric(druid, "ConfigComparisonFailed", "Failed to compare configurations", err) + return err + } + if equal { + // Configurations are already up-to-date + emitEvent.EmitEventGeneric(druid, "ConfigsUpToDate", "Dynamic configurations are already up-to-date", nil) + return nil + } + + // Update the Druid cluster's dynamic configurations if needed + respDynamicConfigs, err := httpClient.Do( + http.MethodPost, + dynamicConfigPath, + druid.Spec.DynamicConfig.Raw, + ) + if err != nil { + emitEvent.EmitEventGeneric(druid, "UpdateConfigsFailed", "Failed to update dynamic configurations", err) + return err + } + if respDynamicConfigs.StatusCode != http.StatusOK { + return errors.New("failed to update Druid dynamic configurations") + } + + emitEvent.EmitEventGeneric(druid, "ConfigsUpdated", "Successfully updated dynamic configurations", nil) + return nil +} diff --git a/controllers/ingestion/reconciler.go b/controllers/ingestion/reconciler.go index 939fec7a..1deca6d9 100644 --- a/controllers/ingestion/reconciler.go +++ b/controllers/ingestion/reconciler.go @@ -6,14 +6,14 @@ import ( "errors" "fmt" "net/http" - "net/url" - "path" "reflect" "time" "github.com/datainfrahq/druid-operator/apis/druid/v1alpha1" "github.com/datainfrahq/druid-operator/controllers/druid" + druidapi "github.com/datainfrahq/druid-operator/pkg/druidapi" internalhttp "github.com/datainfrahq/druid-operator/pkg/http" + "github.com/datainfrahq/druid-operator/pkg/util" "github.com/datainfrahq/operator-runtime/builder" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -36,30 +36,17 @@ const ( DruidIngestionControllerFinalizer = "druidingestion.datainfra.io/finalizer" ) -const ( - OperatorUserName = "OperatorUserName" - OperatorPassword = "OperatorPassword" -) -const ( - DruidRouterPort = "8088" -) - -// toJsonString marshals the given data into a JSON string. -func toJsonString(data interface{}) (string, error) { - jsonData, err := json.Marshal(data) - if err != nil { - return "", err - } - return string(jsonData), nil -} - func (r *DruidIngestionReconciler) do(ctx context.Context, di *v1alpha1.DruidIngestion) error { - basicAuth, err := r.getAuthCreds(ctx, di) + basicAuth, err := druidapi.GetAuthCreds( + ctx, + r.Client, + di.Spec.Auth, + ) if err != nil { return err } - svcName, err := r.getRouterSvcUrl(di.Namespace, di.Spec.DruidClusterName) + svcName, err := druidapi.GetRouterSvcUrl(di.Namespace, di.Spec.DruidClusterName, r.Client) if err != nil { return err } @@ -86,7 +73,7 @@ func (r *DruidIngestionReconciler) do(ctx context.Context, di *v1alpha1.DruidIng } else { if controllerutil.ContainsFinalizer(di, DruidIngestionControllerFinalizer) { // our finalizer is present, so lets handle any external dependency - svcName, err := r.getRouterSvcUrl(di.Namespace, di.Spec.DruidClusterName) + svcName, err := druidapi.GetRouterSvcUrl(di.Namespace, di.Spec.DruidClusterName, r.Client) if err != nil { return err } @@ -96,7 +83,11 @@ func (r *DruidIngestionReconciler) do(ctx context.Context, di *v1alpha1.DruidIng &internalhttp.Auth{BasicAuth: basicAuth}, ) - respShutDownTask, err := posthttp.Do(http.MethodPost, getPath(di.Spec.Ingestion.Type, svcName, http.MethodPost, di.Status.TaskId, true), []byte{}) + respShutDownTask, err := posthttp.Do( + http.MethodPost, + getPath(di.Spec.Ingestion.Type, svcName, http.MethodPost, di.Status.TaskId, true), + []byte{}, + ) if err != nil { return err } @@ -156,7 +147,7 @@ func getSpecJson(di *v1alpha1.DruidIngestion) (string, error) { if err != nil { return "", err } - return toJsonString(specData) + return util.ToJsonString(specData) } // getRules extracts the rules from the DruidIngestion object and returns them as a slice of maps. @@ -183,7 +174,7 @@ func getRulesJson(di *v1alpha1.DruidIngestion) (string, error) { if err != nil { return "", err } - return toJsonString(rules) + return util.ToJsonString(rules) } // extractDataSourceFromSpec extracts the dataSource from the spec map @@ -233,7 +224,7 @@ func getCompactionJson(di *v1alpha1.DruidIngestion) (string, error) { if err != nil { return "", err } - return toJsonString(compaction) + return util.ToJsonString(compaction) } // UpdateCompaction updates the compaction settings for a Druid data source. @@ -242,28 +233,60 @@ func (r *DruidIngestionReconciler) UpdateCompaction( svcName string, auth internalhttp.Auth, ) (bool, error) { - // If there are no compaction settings, return true + // If there are no compaction settings, return false if di.Spec.Ingestion.Compaction.Size() == 0 { - return true, nil + return false, nil } - postHttp := internalhttp.NewHTTPClient( + httpClient := internalhttp.NewHTTPClient( &http.Client{}, &auth, ) - compactionData, err := getCompactionJson(di) + dataSource, err := getDataSource(di) + if err != nil { + return false, err + } + + // Get current compaction settings + currentResp, err := httpClient.Do( + http.MethodGet, + druidapi.MakePath(svcName, "coordinator", "config", "compaction", dataSource), + nil, + ) if err != nil { return false, err } + var currentCompactionJson string + if currentResp.StatusCode == http.StatusOK { + currentCompactionJson = string(currentResp.ResponseBody) + } else if currentResp.StatusCode == http.StatusNotFound { + // Assume no compaction settings are currently set + currentCompactionJson = "{}" + } else { + return false, fmt.Errorf("failed to retrieve current compaction settings, status code: %d", currentResp.StatusCode) + } + + desiredCompactionJson, err := getCompactionJson(di) + if err != nil { + return false, err + } + + // Compare current and desired compaction settings + if areEqual, err := util.IncludesJson(currentCompactionJson, desiredCompactionJson); err != nil { + return false, err + } else if areEqual { + // Compaction settings are already up-to-date + return false, nil + } + // Update compaction settings - respUpdateCompaction, err := postHttp.Do( + respUpdateCompaction, err := httpClient.Do( http.MethodPost, - makePath(svcName, "coordinator", "config", "compaction"), - []byte(compactionData), + druidapi.MakePath(svcName, "coordinator", "config", "compaction"), + []byte(desiredCompactionJson), ) - if err != nil { return false, err } @@ -305,7 +328,7 @@ func (r *DruidIngestionReconciler) UpdateRules( // Update rules respUpdateRules, err := postHttp.Do( http.MethodPost, - makePath(svcName, "coordinator", "rules", dataSource), + druidapi.MakePath(svcName, "coordinator", "rules", dataSource), []byte(rulesData), ) @@ -475,35 +498,30 @@ func (r *DruidIngestionReconciler) CreateOrUpdate( } - // compare the compaction state - compactionEqual := reflect.DeepEqual(di.Status.CurrentCompaction, di.Spec.Ingestion.Compaction) + compactionOk, err := r.UpdateCompaction(di, svcName, auth) + if err != nil { + return controllerutil.OperationResultNone, err + } - if !compactionEqual { - compactionOk, err := r.UpdateCompaction(di, svcName, auth) + if compactionOk { + // patch status to store the current compaction json + _, err := r.makePatchDruidIngestionStatus( + di, + di.Status.TaskId, + DruidIngestionControllerUpdateSuccess, + "compaction updated", + v1.ConditionTrue, + DruidIngestionControllerUpdateSuccess, + ) if err != nil { return controllerutil.OperationResultNone, err } - - if compactionOk { - // patch status to store the current compaction json - _, err := r.makePatchDruidIngestionStatus( - di, - di.Status.TaskId, - DruidIngestionControllerUpdateSuccess, - "compaction updated", - v1.ConditionTrue, - DruidIngestionControllerUpdateSuccess, - ) - if err != nil { - return controllerutil.OperationResultNone, err - } - build.Recorder.GenericEvent( - di, - v1.EventTypeNormal, - "compaction updated", - DruidIngestionControllerUpdateSuccess, - ) - } + build.Recorder.GenericEvent( + di, + v1.EventTypeNormal, + "compaction updated", + DruidIngestionControllerUpdateSuccess, + ) } // compare the rules state @@ -562,7 +580,6 @@ func (r *DruidIngestionReconciler) makePatchDruidIngestionStatus( in := obj.(*v1alpha1.DruidIngestion) in.Status.CurrentIngestionSpec = ingestionSpec - in.Status.CurrentCompaction = di.Spec.Ingestion.Compaction in.Status.CurrentRules = di.Spec.Ingestion.Rules in.Status.TaskId = taskId in.Status.LastUpdateTime = metav1.Time{Time: time.Now()} @@ -587,25 +604,25 @@ func getPath( case v1alpha1.NativeBatchIndexParallel: if httpMethod == http.MethodGet { // get task - return makePath(svcName, "indexer", "task", taskId) + return druidapi.MakePath(svcName, "indexer", "task", taskId) } else if httpMethod == http.MethodPost && !shutDownTask { // create or update task - return makePath(svcName, "indexer", "task") + return druidapi.MakePath(svcName, "indexer", "task") } else if shutDownTask { // shutdown task - return makePath(svcName, "indexer", "task", taskId, "shutdown") + return druidapi.MakePath(svcName, "indexer", "task", taskId, "shutdown") } case v1alpha1.HadoopIndexHadoop: case v1alpha1.Kafka: if httpMethod == http.MethodGet { // get supervisor task - return makePath(svcName, "indexer", "supervisor", taskId) + return druidapi.MakePath(svcName, "indexer", "supervisor", taskId) } else if httpMethod == http.MethodPost && !shutDownTask { // create or update supervisor task - return makePath(svcName, "indexer", "supervisor") + return druidapi.MakePath(svcName, "indexer", "supervisor") } else if shutDownTask { // shut down supervisor - return makePath(svcName, "indexer", "supervisor", taskId, "shutdown") + return druidapi.MakePath(svcName, "indexer", "supervisor", taskId, "shutdown") } case v1alpha1.Kinesis: case v1alpha1.QueryControllerSQL: @@ -616,37 +633,6 @@ func getPath( return "" } -// makePath constructs the appropriate path for the specified Druid API. -func makePath(baseURL, componentType, apiType string, additionalPaths ...string) string { - u, err := url.Parse(baseURL) - if err != nil { - fmt.Println("Error parsing URL:", err) - return "" - } - - // Construct the initial path - u.Path = path.Join("druid", componentType, "v1", apiType) - - // Append additional path components - for _, p := range additionalPaths { - u.Path = path.Join(u.Path, p) - } - - return u.String() -} - -func makeSupervisorCreateUpdateTaskPath(svcName string) string { - return svcName + "/druid/indexer/v1/supervisor" -} - -func makeSupervisorShutDownTaskPath(svcName, taskId string) string { - return svcName + "/druid/indexer/v1/supervisor/" + taskId + "/shutdown" -} - -func makeSupervisorGetTaskPath(svcName, taskId string) string { - return svcName + "/druid/indexer/v1/supervisor/" + taskId -} - type taskHolder struct { Task string `json:"task"` // tasks ID string `json:"id"` // supervisor @@ -670,67 +656,6 @@ func getTaskIdFromResponse(resp string) (string, error) { return "", errors.New("task id not found") } -func (r *DruidIngestionReconciler) getRouterSvcUrl(namespace, druidClusterName string) (string, error) { - listOpts := []client.ListOption{ - client.InNamespace(namespace), - client.MatchingLabels(map[string]string{ - "druid_cr": druidClusterName, - "component": "router", - }), - } - svcList := &v1.ServiceList{} - if err := r.Client.List(context.Background(), svcList, listOpts...); err != nil { - return "", err - } - var svcName string - - for range svcList.Items { - svcName = svcList.Items[0].Name - } - - if svcName == "" { - return "", errors.New("router svc discovery fail") - } - - newName := "http://" + svcName + "." + namespace + ".svc.cluster.local:" + DruidRouterPort - - return newName, nil -} - -func (r *DruidIngestionReconciler) getAuthCreds(ctx context.Context, di *v1alpha1.DruidIngestion) (internalhttp.BasicAuth, error) { - druid := v1alpha1.Druid{} - // check if the mentioned druid cluster exists - if err := r.Client.Get(ctx, types.NamespacedName{ - Namespace: di.Namespace, - Name: di.Spec.DruidClusterName, - }, - &druid, - ); err != nil { - return internalhttp.BasicAuth{}, err - } - // check if the mentioned secret exists - if di.Spec.Auth != (v1alpha1.Auth{}) { - secret := v1.Secret{} - if err := r.Client.Get(ctx, types.NamespacedName{ - Namespace: di.Spec.Auth.SecretRef.Namespace, - Name: di.Spec.Auth.SecretRef.Name, - }, - &secret, - ); err != nil { - return internalhttp.BasicAuth{}, err - } - creds := internalhttp.BasicAuth{ - UserName: string(secret.Data[OperatorUserName]), - Password: string(secret.Data[OperatorPassword]), - } - - return creds, nil - - } - - return internalhttp.BasicAuth{}, nil -} - type VerbType string type ( diff --git a/controllers/ingestion/reconciler_test.go b/controllers/ingestion/reconciler_test.go index c575d203..6699ded6 100644 --- a/controllers/ingestion/reconciler_test.go +++ b/controllers/ingestion/reconciler_test.go @@ -122,12 +122,12 @@ func TestUpdateCompaction_Success(t *testing.T) { Spec: v1alpha1.DruidIngestionSpec{ Ingestion: v1alpha1.IngestionSpec{ Spec: `{ - "spec": { - "dataSchema": { - "dataSource": "testDataSource" - } - } - }`, + "spec": { + "dataSchema": { + "dataSource": "testDataSource" + } + } + }`, Compaction: runtime.RawExtension{ Raw: []byte(`{"metricsSpec": "testMetric"}`), }, @@ -137,7 +137,15 @@ func TestUpdateCompaction_Success(t *testing.T) { // Mock HTTP server server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) + if r.Method == http.MethodGet { + // Return current compaction settings + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"metricsSpec": "currentMetric"}`)) + } else if r.Method == http.MethodPost { + // Simulate successful update + w.WriteHeader(http.StatusOK) + } })) defer server.Close() @@ -451,57 +459,6 @@ func TestGetPath(t *testing.T) { } } -func TestMakePath(t *testing.T) { - tests := []struct { - name string - baseURL string - componentType string - apiType string - additionalPaths []string - expected string - }{ - { - name: "NoAdditionalPath", - baseURL: "http://example-druid-service", - componentType: "indexer", - apiType: "task", - expected: "http://example-druid-service/druid/indexer/v1/task", - }, - { - name: "OneAdditionalPath", - baseURL: "http://example-druid-service", - componentType: "indexer", - apiType: "task", - additionalPaths: []string{"extra"}, - expected: "http://example-druid-service/druid/indexer/v1/task/extra", - }, - { - name: "MultipleAdditionalPaths", - baseURL: "http://example-druid-service", - componentType: "coordinator", - apiType: "rules", - additionalPaths: []string{"wikipedia", "history"}, - expected: "http://example-druid-service/druid/coordinator/v1/rules/wikipedia/history", - }, - { - name: "EmptyBaseURL", - baseURL: "", - componentType: "indexer", - apiType: "task", - expected: "druid/indexer/v1/task", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - actual := makePath(tt.baseURL, tt.componentType, tt.apiType, tt.additionalPaths...) - if actual != tt.expected { - t.Errorf("makePath() = %v, expected %v", actual, tt.expected) - } - }) - } -} - func TestGetSpec(t *testing.T) { tests := []struct { name string diff --git a/e2e/configs/druid-cr.yaml b/e2e/configs/druid-cr.yaml index fcf4a896..b18f8691 100644 --- a/e2e/configs/druid-cr.yaml +++ b/e2e/configs/druid-cr.yaml @@ -121,6 +121,14 @@ spec: valueFrom: fieldRef: fieldPath: metadata.namespace + dynamicConfig: + type: default + selectStrategy: + type: fillCapacityWithCategorySpec + workerCategorySpec: + categoryMap: {} + strong: true + autoScaler: null nodes: brokers: diff --git a/go.mod b/go.mod index be1d03a6..14ca62f7 100644 --- a/go.mod +++ b/go.mod @@ -50,7 +50,6 @@ require ( github.com/prometheus/common v0.42.0 // indirect github.com/prometheus/procfs v0.9.0 // indirect github.com/spf13/pflag v1.0.5 // indirect - github.com/stretchr/objx v0.5.0 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.24.0 // indirect diff --git a/go.sum b/go.sum index 145129cf..76aaa3b6 100644 --- a/go.sum +++ b/go.sum @@ -130,7 +130,6 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= diff --git a/pkg/druidapi/druidapi.go b/pkg/druidapi/druidapi.go new file mode 100644 index 00000000..c932dccb --- /dev/null +++ b/pkg/druidapi/druidapi.go @@ -0,0 +1,135 @@ +package druidapi + +import ( + "context" + "errors" + "fmt" + "net/url" + "path" + + internalhttp "github.com/datainfrahq/druid-operator/pkg/http" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + DruidRouterPort = "8088" + OperatorUserName = "OperatorUserName" + OperatorPassword = "OperatorPassword" +) + +type AuthType string + +const ( + BasicAuth AuthType = "basic-auth" +) + +type Auth struct { + // +required + Type AuthType `json:"type"` + // +required + SecretRef v1.SecretReference `json:"secretRef"` +} + +// GetAuthCreds retrieves basic authentication credentials from a Kubernetes secret. +// If the Auth object is empty, it returns an empty BasicAuth object. +// Parameters: +// +// ctx: The context object. +// c: The Kubernetes client. +// auth: The Auth object containing the secret reference. +// +// Returns: +// +// BasicAuth: The basic authentication credentials. +func GetAuthCreds( + ctx context.Context, + c client.Client, + auth Auth, +) (internalhttp.BasicAuth, error) { + // Check if the mentioned secret exists + if auth != (Auth{}) { + secret := v1.Secret{} + if err := c.Get(ctx, types.NamespacedName{ + Namespace: auth.SecretRef.Namespace, + Name: auth.SecretRef.Name, + }, &secret); err != nil { + return internalhttp.BasicAuth{}, err + } + creds := internalhttp.BasicAuth{ + UserName: string(secret.Data[OperatorUserName]), + Password: string(secret.Data[OperatorPassword]), + } + + return creds, nil + } + + return internalhttp.BasicAuth{}, nil +} + +// MakePath constructs the appropriate path for the specified Druid API. +// Parameters: +// +// baseURL: The base URL of the Druid cluster. For example, http://router-svc.namespace.svc.cluster.local:8088. +// componentType: The type of Druid component. For example, "indexer". +// apiType: The type of Druid API. For example, "worker". +// additionalPaths: Additional path components to be appended to the URL. +// +// Returns: +// +// string: The constructed path. +func MakePath(baseURL, componentType, apiType string, additionalPaths ...string) string { + u, err := url.Parse(baseURL) + if err != nil { + fmt.Println("Error parsing URL:", err) + return "" + } + + // Construct the initial path + u.Path = path.Join("druid", componentType, "v1", apiType) + + // Append additional path components + for _, p := range additionalPaths { + u.Path = path.Join(u.Path, p) + } + + return u.String() +} + +// GetRouterSvcUrl retrieves the URL of the Druid router service. +// Parameters: +// +// namespace: The namespace of the Druid cluster. +// druidClusterName: The name of the Druid cluster. +// c: The Kubernetes client. +// +// Returns: +// +// string: The URL of the Druid router service. +func GetRouterSvcUrl(namespace, druidClusterName string, c client.Client) (string, error) { + listOpts := []client.ListOption{ + client.InNamespace(namespace), + client.MatchingLabels(map[string]string{ + "druid_cr": druidClusterName, + "component": "router", + }), + } + svcList := &v1.ServiceList{} + if err := c.List(context.Background(), svcList, listOpts...); err != nil { + return "", err + } + var svcName string + + for range svcList.Items { + svcName = svcList.Items[0].Name + } + + if svcName == "" { + return "", errors.New("router svc discovery fail") + } + + newName := "http://" + svcName + "." + namespace + ".svc.cluster.local:" + DruidRouterPort + + return newName, nil +} diff --git a/pkg/druidapi/druidapi_test.go b/pkg/druidapi/druidapi_test.go new file mode 100644 index 00000000..3b94e993 --- /dev/null +++ b/pkg/druidapi/druidapi_test.go @@ -0,0 +1,56 @@ +package druidapi + +import ( + "testing" +) + +func TestMakePath(t *testing.T) { + tests := []struct { + name string + baseURL string + componentType string + apiType string + additionalPaths []string + expected string + }{ + { + name: "NoAdditionalPath", + baseURL: "http://example-druid-service", + componentType: "indexer", + apiType: "task", + expected: "http://example-druid-service/druid/indexer/v1/task", + }, + { + name: "OneAdditionalPath", + baseURL: "http://example-druid-service", + componentType: "indexer", + apiType: "task", + additionalPaths: []string{"extra"}, + expected: "http://example-druid-service/druid/indexer/v1/task/extra", + }, + { + name: "MultipleAdditionalPaths", + baseURL: "http://example-druid-service", + componentType: "coordinator", + apiType: "rules", + additionalPaths: []string{"wikipedia", "history"}, + expected: "http://example-druid-service/druid/coordinator/v1/rules/wikipedia/history", + }, + { + name: "EmptyBaseURL", + baseURL: "", + componentType: "indexer", + apiType: "task", + expected: "druid/indexer/v1/task", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actual := MakePath(tt.baseURL, tt.componentType, tt.apiType, tt.additionalPaths...) + if actual != tt.expected { + t.Errorf("makePath() = %v, expected %v", actual, tt.expected) + } + }) + } +} diff --git a/pkg/util/util.go b/pkg/util/util.go new file mode 100644 index 00000000..b61eeab2 --- /dev/null +++ b/pkg/util/util.go @@ -0,0 +1,79 @@ +package util + +import ( + "encoding/json" + "fmt" + "reflect" +) + +// ToJsonString marshals the given data into a JSON string. +func ToJsonString(data interface{}) (string, error) { + jsonData, err := json.Marshal(data) + if err != nil { + return "", err + } + return string(jsonData), nil +} + +// IncludesJson checks if all key-value pairs in the desired JSON string are present in the current JSON string. +func IncludesJson(currentJson, desiredJson string) (bool, error) { + var current, desired map[string]interface{} + + // Parse the current JSON string + if err := json.Unmarshal([]byte(currentJson), ¤t); err != nil { + return false, fmt.Errorf("error parsing current JSON: %w", err) + } + + // Parse the desired JSON string + if err := json.Unmarshal([]byte(desiredJson), &desired); err != nil { + return false, fmt.Errorf("error parsing desired JSON: %w", err) + } + + // Check if all key-value pairs in desired are present in current + return includes(current, desired), nil +} + +// includes recursively checks if all key-value pairs in the desired map are present in the current map. +func includes(current, desired map[string]interface{}) bool { + for key, desiredValue := range desired { + currentValue, exists := current[key] + if !exists { + return false + } + + if !reflect.DeepEqual(desiredValue, currentValue) { + switch desiredValueTyped := desiredValue.(type) { + case map[string]interface{}: + currentValueTyped, ok := currentValue.(map[string]interface{}) + if !ok || !includes(currentValueTyped, desiredValueTyped) { + return false + } + case []interface{}: + currentValueTyped, ok := currentValue.([]interface{}) + if !ok || !sliceIncludes(currentValueTyped, desiredValueTyped) { + return false + } + default: + return false + } + } + } + return true +} + +// sliceIncludes checks if all elements of the desired slice are present in the current slice. +func sliceIncludes(current, desired []interface{}) bool { + for _, desiredItem := range desired { + found := false + for _, currentItem := range current { + if reflect.DeepEqual(desiredItem, currentItem) { + found = true + break + } + } + if !found { + return false + } + } + return true +} diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go new file mode 100644 index 00000000..30d1c046 --- /dev/null +++ b/pkg/util/util_test.go @@ -0,0 +1,135 @@ +package util + +import ( + "testing" +) + +func TestIncludesJson(t *testing.T) { + tests := []struct { + name string + currentJson string + desiredJson string + expectedEqual bool + expectError bool + }{ + { + name: "Exact match", + currentJson: `{ + "key1": "value1", + "key2": "value2" + }`, + desiredJson: `{ + "key1": "value1", + "key2": "value2" + }`, + expectedEqual: true, + expectError: false, + }, + { + name: "Real config not matching", + currentJson: `{ + "type": "default", + "selectStrategy": { + "type": "fillCapacityWithCategorySpec", + "workerCategorySpec": { + "categoryMap": {}, + "strong": false + } + }, + "autoScaler": null + }`, + desiredJson: `{ + "type": "default", + "selectStrategy": { + "type": "fillCapacityWithCategorySpec", + "workerCategorySpec": { + "categoryMap": {}, + "strong": true + } + }, + "autoScaler": null + }`, + expectedEqual: false, + expectError: false, + }, + { + name: "Subset match with nested maps", + currentJson: `{ + "key1": "value1", + "key2": { + "nestedKey1": "nestedValue1", + "nestedKey2": "nestedValue2" + } + }`, + desiredJson: `{ + "key2": { + "nestedKey1": "nestedValue1" + } + }`, + expectedEqual: true, + expectError: false, + }, + { + name: "Mismatch with nested maps", + currentJson: `{ + "key1": "value1", + "key2": { + "nestedKey1": "nestedValue1" + } + }`, + desiredJson: `{ + "key2": { + "nestedKey2": "nestedValue2" + } + }`, + expectedEqual: false, + expectError: false, + }, + { + name: "Subset match with arrays", + currentJson: `{ + "key1": ["value1", "value2", "value3"] + }`, + desiredJson: `{ + "key1": ["value1", "value2"] + }`, + expectedEqual: true, + expectError: false, + }, + { + name: "Mismatch with arrays", + currentJson: `{ + "key1": ["value1", "value2"] + }`, + desiredJson: `{ + "key1": ["value3"] + }`, + expectedEqual: false, + expectError: false, + }, + { + name: "Invalid JSON", + currentJson: `{ + "key1": "value1" + `, + desiredJson: `{ + "key1": "value1" + }`, + expectedEqual: false, + expectError: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + equal, err := IncludesJson(test.currentJson, test.desiredJson) + if (err != nil) != test.expectError { + t.Errorf("IncludesJson() error = %v, expectError %v", err, test.expectError) + return + } + if equal != test.expectedEqual { + t.Errorf("IncludesJson() = %v, expectedEqual %v", equal, test.expectedEqual) + } + }) + } +}