Skip to content

Commit

Permalink
fix(nil pointer): storage class reference
Browse files Browse the repository at this point in the history
  • Loading branch information
itamar-marom committed Aug 14, 2023
1 parent 764ee9e commit c75da26
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 159 deletions.
30 changes: 16 additions & 14 deletions chart/templates/crds/druid.apache.org_druids.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ spec:
- name: v1alpha1
schema:
openAPIV3Schema:
description: Druid is the Schema for the druids API
description: Druid is the Schema for the druids API.
properties:
apiVersion:
description: 'APIVersion defines the versioned schema of this representation
Expand Down Expand Up @@ -49,7 +49,7 @@ spec:
type: string
type: array
command:
description: Command
description: Command command for the additional container.
items:
type: string
type: array
Expand Down Expand Up @@ -215,15 +215,14 @@ spec:
type: object
type: array
image:
description: Image
description: Image Image of the additional container.
type: string
imagePullPolicy:
description: ImagePullPolicy If not present, will be taken from
top level spec
top level spec.
type: string
resources:
description: Resources Kubernetes Native resource requirements
specification.
description: Resources Kubernetes Native `resources` specification.
properties:
claims:
description: "Claims lists the names of resources, defined
Expand Down Expand Up @@ -450,7 +449,7 @@ spec:
type: object
type: object
volumeMounts:
description: VolumeMounts Kubernetes Native volume mounts specification.
description: VolumeMounts Kubernetes Native `VolumeMount` specification.
items:
description: VolumeMount describes a mounting of a Volume
within a container.
Expand Down Expand Up @@ -1748,6 +1747,7 @@ spec:
description: Image Required here or at the NodeSpec level.
type: string
imagePullPolicy:
default: IfNotPresent
description: ImagePullPolicy
type: string
imagePullSecrets:
Expand Down Expand Up @@ -1930,8 +1930,9 @@ spec:
- type
type: object
metricDimensions.json:
description: DimensionsMapPath Custom Dimension Map Path for statsd
emitter.
description: 'DimensionsMapPath Custom Dimension Map Path for statsd
emitter. stastd documentation is described in the following documentation:
https://druid.apache.org/docs/latest/development/extensions-contrib/statsd.html'
type: string
nodeSelector:
additionalProperties:
Expand Down Expand Up @@ -5025,7 +5026,8 @@ spec:
workload's pods.
type: object
podManagementPolicy:
description: PodManagementPolicy By default, it is set to "parallel"
default: Parallel
description: PodManagementPolicy
type: string
ports:
description: Ports Extra ports to be added to pod spec.
Expand Down Expand Up @@ -5219,8 +5221,7 @@ spec:
minimum: 0
type: integer
resources:
description: Resources Kubernetes Native resource requirements
specification.
description: Resources Kubernetes Native `resources` specification.
properties:
claims:
description: "Claims lists the names of resources, defined
Expand Down Expand Up @@ -8545,6 +8546,7 @@ spec:
description: PodLabels Custom labels to be populated in `Druid` pods.
type: object
podManagementPolicy:
default: Parallel
description: PodManagementPolicy
type: string
readinessProbe:
Expand Down Expand Up @@ -9554,7 +9556,7 @@ spec:
type: integer
type: object
tolerations:
description: Tolerations Kubernetes native `toleration` specification.
description: Tolerations Kubernetes native `tolerations` specification.
items:
description: The pod this Toleration is attached to tolerates any
taint that matches the triple <key,value,effect> using the matching
Expand Down Expand Up @@ -10012,7 +10014,7 @@ spec:
type: object
type: array
volumes:
description: Volumes Kubernetes Native `Volume` specification.
description: Volumes Kubernetes Native `Volumes` specification.
items:
description: Volume represents a named volume in a pod that may
be accessed by any container in the pod.
Expand Down
30 changes: 16 additions & 14 deletions config/crd/bases/druid.apache.org_druids.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ spec:
- name: v1alpha1
schema:
openAPIV3Schema:
description: Druid is the Schema for the druids API
description: Druid is the Schema for the druids API.
properties:
apiVersion:
description: 'APIVersion defines the versioned schema of this representation
Expand Down Expand Up @@ -49,7 +49,7 @@ spec:
type: string
type: array
command:
description: Command
description: Command command for the additional container.
items:
type: string
type: array
Expand Down Expand Up @@ -215,15 +215,14 @@ spec:
type: object
type: array
image:
description: Image
description: Image Image of the additional container.
type: string
imagePullPolicy:
description: ImagePullPolicy If not present, will be taken from
top level spec
top level spec.
type: string
resources:
description: Resources Kubernetes Native resource requirements
specification.
description: Resources Kubernetes Native `resources` specification.
properties:
claims:
description: "Claims lists the names of resources, defined
Expand Down Expand Up @@ -450,7 +449,7 @@ spec:
type: object
type: object
volumeMounts:
description: VolumeMounts Kubernetes Native volume mounts specification.
description: VolumeMounts Kubernetes Native `VolumeMount` specification.
items:
description: VolumeMount describes a mounting of a Volume
within a container.
Expand Down Expand Up @@ -1748,6 +1747,7 @@ spec:
description: Image Required here or at the NodeSpec level.
type: string
imagePullPolicy:
default: IfNotPresent
description: ImagePullPolicy
type: string
imagePullSecrets:
Expand Down Expand Up @@ -1930,8 +1930,9 @@ spec:
- type
type: object
metricDimensions.json:
description: DimensionsMapPath Custom Dimension Map Path for statsd
emitter.
description: 'DimensionsMapPath Custom Dimension Map Path for statsd
emitter. stastd documentation is described in the following documentation:
https://druid.apache.org/docs/latest/development/extensions-contrib/statsd.html'
type: string
nodeSelector:
additionalProperties:
Expand Down Expand Up @@ -5025,7 +5026,8 @@ spec:
workload's pods.
type: object
podManagementPolicy:
description: PodManagementPolicy By default, it is set to "parallel"
default: Parallel
description: PodManagementPolicy
type: string
ports:
description: Ports Extra ports to be added to pod spec.
Expand Down Expand Up @@ -5219,8 +5221,7 @@ spec:
minimum: 0
type: integer
resources:
description: Resources Kubernetes Native resource requirements
specification.
description: Resources Kubernetes Native `resources` specification.
properties:
claims:
description: "Claims lists the names of resources, defined
Expand Down Expand Up @@ -8545,6 +8546,7 @@ spec:
description: PodLabels Custom labels to be populated in `Druid` pods.
type: object
podManagementPolicy:
default: Parallel
description: PodManagementPolicy
type: string
readinessProbe:
Expand Down Expand Up @@ -9554,7 +9556,7 @@ spec:
type: integer
type: object
tolerations:
description: Tolerations Kubernetes native `toleration` specification.
description: Tolerations Kubernetes native `tolerations` specification.
items:
description: The pod this Toleration is attached to tolerates any
taint that matches the triple <key,value,effect> using the matching
Expand Down Expand Up @@ -10012,7 +10014,7 @@ spec:
type: object
type: array
volumes:
description: Volumes Kubernetes Native `Volume` specification.
description: Volumes Kubernetes Native `Volumes` specification.
items:
description: Volume represents a named volume in a pod that may
be accessed by any container in the pod.
Expand Down
133 changes: 2 additions & 131 deletions controllers/druid/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

autoscalev2 "k8s.io/api/autoscaling/v2"
networkingv1 "k8s.io/api/networking/v1"
storage "k8s.io/api/storage/v1"

"github.com/datainfrahq/druid-operator/apis/druid/v1alpha1"
appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -189,11 +188,8 @@ func deployDruidCluster(ctx context.Context, sdk client.Client, m *v1alpha1.Drui
// Ignore for the first iteration ie cluster creation, else get sts shall unnecessary log errors.

if m.Generation > 1 && m.Spec.ScalePvcSts {
if isVolumeExpansionEnabled(ctx, sdk, m, &nodeSpec, emitEvents) {
err := scalePVCForSts(ctx, sdk, &nodeSpec, nodeSpecUniqueStr, m, emitEvents)
if err != nil {
return err
}
if err := expandStatefulSetVolumes(ctx, sdk, m, &nodeSpec, emitEvents, nodeSpecUniqueStr); err != nil {
return err
}
}

Expand Down Expand Up @@ -809,116 +805,6 @@ func isObjFullyDeployed(ctx context.Context, sdk client.Client, nodeSpec v1alpha
return false, nil
}

// scalePVCForSts shall expand the sts volumeclaimtemplates size as well as N no of pvc supported by the sts.
// NOTE: To be called only if generation > 1
func scalePVCForSts(ctx context.Context, sdk client.Client, nodeSpec *v1alpha1.DruidNodeSpec, nodeSpecUniqueStr string, drd *v1alpha1.Druid, emitEvent EventEmitter) error {

getSTSList, err := readers.List(ctx, sdk, drd, makeLabelsForDruid(drd.Name), emitEvent, func() objectList { return &appsv1.StatefulSetList{} }, func(listObj runtime.Object) []object {
items := listObj.(*appsv1.StatefulSetList).Items
result := make([]object, len(items))
for i := 0; i < len(items); i++ {
result[i] = &items[i]
}
return result
})
if err != nil {
return nil
}

// Dont proceed unless all statefulsets are up and running.
// This can cause the go routine to panic

for _, sts := range getSTSList {
if sts.(*appsv1.StatefulSet).Status.Replicas != sts.(*appsv1.StatefulSet).Status.ReadyReplicas {
return nil
}
}

// return nil, in case return err the program halts since sts would not be able
// we would like the operator to create sts.
sts, err := readers.Get(ctx, sdk, nodeSpecUniqueStr, drd, func() object { return &appsv1.StatefulSet{} }, emitEvent)
if err != nil {
return nil
}

pvcLabels := map[string]string{
"component": nodeSpec.NodeType,
}

pvcList, err := readers.List(ctx, sdk, drd, pvcLabels, emitEvent, func() objectList { return &v1.PersistentVolumeClaimList{} }, func(listObj runtime.Object) []object {
items := listObj.(*v1.PersistentVolumeClaimList).Items
result := make([]object, len(items))
for i := 0; i < len(items); i++ {
result[i] = &items[i]
}
return result
})
if err != nil {
return nil
}

desVolumeClaimTemplateSize, currVolumeClaimTemplateSize, pvcSize := getVolumeClaimTemplateSizes(sts, nodeSpec, pvcList)

// current number of PVC can't be less than desired number of pvc
if len(pvcSize) < len(desVolumeClaimTemplateSize) {
return nil
}

// iterate over array for matching each index in desVolumeClaimTemplateSize, currVolumeClaimTemplateSize and pvcSize
for i := range desVolumeClaimTemplateSize {

// Validate Request, shrinking of pvc not supported
// desired size cant be less than current size
// in that case re-create sts/pvc which is a user execute manual step

desiredSize, _ := desVolumeClaimTemplateSize[i].AsInt64()
currentSize, _ := currVolumeClaimTemplateSize[i].AsInt64()

if desiredSize < currentSize {
e := fmt.Errorf("Request for Shrinking of sts pvc size [sts:%s] in [namespace:%s] is not Supported", sts.(*appsv1.StatefulSet).Name, sts.(*appsv1.StatefulSet).Namespace)
logger.Error(e, e.Error(), "name", drd.Name, "namespace", drd.Namespace)
emitEvent.EmitEventGeneric(drd, "DruidOperatorPvcReSizeFail", "", err)
return e
}

// In case size dont match and dessize > currsize, delete the sts using casacde=false / propagation policy set to orphan
// The operator on next reconcile shall create the sts with latest changes
if desiredSize != currentSize {
msg := fmt.Sprintf("Detected Change in VolumeClaimTemplate Sizes for Statefuleset [%s] in Namespace [%s], desVolumeClaimTemplateSize: [%s], currVolumeClaimTemplateSize: [%s]\n, deleteing STS [%s] with casacde=false]", sts.(*appsv1.StatefulSet).Name, sts.(*appsv1.StatefulSet).Namespace, desVolumeClaimTemplateSize[i].String(), currVolumeClaimTemplateSize[i].String(), sts.(*appsv1.StatefulSet).Name)
logger.Info(msg)
emitEvent.EmitEventGeneric(drd, "DruidOperatorPvcReSizeDetected", msg, nil)

if err := writers.Delete(ctx, sdk, drd, sts, emitEvent, client.PropagationPolicy(metav1.DeletePropagationOrphan)); err != nil {
return err
} else {
msg := fmt.Sprintf("[StatefuleSet:%s] successfully deleted with casacde=false", sts.(*appsv1.StatefulSet).Name)
logger.Info(msg, "name", drd.Name, "namespace", drd.Namespace)
emitEvent.EmitEventGeneric(drd, "DruidOperatorStsOrphaned", msg, nil)
}

}

// In case size dont match, patch the pvc with the desiredsize from druid CR
for p := range pvcSize {
pSize, _ := pvcSize[p].AsInt64()
if desiredSize != pSize {
// use deepcopy
patch := client.MergeFrom(pvcList[p].(*v1.PersistentVolumeClaim).DeepCopy())
pvcList[p].(*v1.PersistentVolumeClaim).Spec.Resources.Requests[v1.ResourceStorage] = desVolumeClaimTemplateSize[i]
if err := writers.Patch(ctx, sdk, drd, pvcList[p].(*v1.PersistentVolumeClaim), false, patch, emitEvent); err != nil {
return err
} else {
msg := fmt.Sprintf("[PVC:%s] successfully Patched with [Size:%s]", pvcList[p].(*v1.PersistentVolumeClaim).Name, desVolumeClaimTemplateSize[i].String())
logger.Info(msg, "name", drd.Name, "namespace", drd.Namespace)
}
}
}

}

return nil
}

// desVolumeClaimTemplateSize: the druid CR holds this value for a sts volumeclaimtemplate
// currVolumeClaimTemplateSize: the sts owned by druid CR holds this value in volumeclaimtemplate
// pvcSize: the pvc referenced by the sts holds this value
Expand All @@ -941,21 +827,6 @@ func getVolumeClaimTemplateSizes(sts object, nodeSpec *v1alpha1.DruidNodeSpec, p

}

func isVolumeExpansionEnabled(ctx context.Context, sdk client.Client, m *v1alpha1.Druid, nodeSpec *v1alpha1.DruidNodeSpec, emitEvent EventEmitter) bool {

for _, nodeVCT := range nodeSpec.VolumeClaimTemplates {
sc, err := readers.Get(ctx, sdk, *nodeVCT.Spec.StorageClassName, m, func() object { return &storage.StorageClass{} }, emitEvent)
if err != nil {
return false
}

if sc.(*storage.StorageClass).AllowVolumeExpansion != boolFalse() {
return true
}
}
return false
}

func stringifyForLogging(obj object, drd *v1alpha1.Druid) string {
if bytes, err := json.Marshal(obj); err != nil {
logger.Error(err, err.Error(), fmt.Sprintf("Failed to serialize [%s:%s]", obj.GetObjectKind().GroupVersionKind().Kind, obj.GetName()), "name", drd.Name, "namespace", drd.Namespace)
Expand Down
Loading

0 comments on commit c75da26

Please sign in to comment.