From 2bee4cda624280e92e742476bd1ebe1655492e1a Mon Sep 17 00:00:00 2001
From: Eoin Gallinagh <eoingallinagh@gmail.com>
Date: Tue, 7 Nov 2023 11:21:27 +0000
Subject: [PATCH 1/2] refactor: generic resources

---
 .../queuejob/queuejob_controller_ex.go        |  10 +-
 .../genericresource/genericresource.go        | 405 ++----------------
 .../genericresource/helper.go                 | 253 +++++++++++
 .../genericresource/utils.go                  |  40 ++
 4 files changed, 329 insertions(+), 379 deletions(-)
 create mode 100644 pkg/controller/queuejobresources/genericresource/helper.go
 create mode 100644 pkg/controller/queuejobresources/genericresource/utils.go

diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go
index 0b7ca80a..cc030120 100644
--- a/pkg/controller/queuejob/queuejob_controller_ex.go
+++ b/pkg/controller/queuejob/queuejob_controller_ex.go
@@ -52,7 +52,6 @@ import (
 	apierrors "k8s.io/apimachinery/pkg/api/errors"
 	"k8s.io/apimachinery/pkg/api/meta"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
-	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
 	"k8s.io/apimachinery/pkg/labels"
 	"k8s.io/apimachinery/pkg/util/wait"
 	"k8s.io/client-go/kubernetes"
@@ -582,13 +581,10 @@ func (qjm *XController) getAppWrapperCompletionStatus(caw *arbv1.AppWrapper) arb
 	for i, genericItem := range caw.Spec.AggrResources.GenericItems {
 		if len(genericItem.CompletionStatus) > 0 {
 			objectName := genericItem.GenericTemplate
-			var unstruct unstructured.Unstructured
-			unstruct.Object = make(map[string]interface{})
-			var blob interface{}
-			if err := jsons.Unmarshal(objectName.Raw, &blob); err != nil {
-				klog.Errorf("[getAppWrapperCompletionStatus] Error unmarshalling, err=%#v", err)
+			unstruct, err := genericresource.UnmarshalToUnstructured(objectName.Raw)
+			if err != nil {
+			    klog.Errorf("[getAppWrapperCompletionStatus] Error: %v", err)
 			}
-			unstruct.Object = blob.(map[string]interface{}) // set object to the content of the blob after Unmarshalling
 			name := ""
 			if md, ok := unstruct.Object["metadata"]; ok {
 				metadata := md.(map[string]interface{})
diff --git a/pkg/controller/queuejobresources/genericresource/genericresource.go b/pkg/controller/queuejobresources/genericresource/genericresource.go
index b9686153..2cad7375 100644
--- a/pkg/controller/queuejobresources/genericresource/genericresource.go
+++ b/pkg/controller/queuejobresources/genericresource/genericresource.go
@@ -18,9 +18,10 @@ package genericresource
 
 import (
 	"context"
-	"encoding/json"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+	"k8s.io/apimachinery/pkg/api/errors"
 	"fmt"
-	"math"
 	"runtime/debug"
 	"strings"
 	"time"
@@ -28,19 +29,13 @@ import (
 	arbv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1"
 	v1 "k8s.io/api/core/v1"
 	"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
-	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/klog/v2"
 
 	clusterstateapi "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/clusterstate/api"
-	"k8s.io/apimachinery/pkg/api/errors"
-	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
-	"k8s.io/apimachinery/pkg/runtime"
 	"k8s.io/apimachinery/pkg/runtime/schema"
 	"k8s.io/client-go/discovery"
-	"k8s.io/client-go/dynamic"
 	"k8s.io/client-go/kubernetes"
 	"k8s.io/client-go/rest"
-	"k8s.io/client-go/restmapper"
 )
 
 var appwrapperJobName = "appwrapper.mcad.ibm.com"
@@ -61,16 +56,6 @@ func NewAppWrapperGenericResource(config *rest.Config) *GenericResources {
 	}
 }
 
-func join(strs ...string) string {
-	var result string
-	if strs[0] == "" {
-		return strs[len(strs)-1]
-	}
-	for _, str := range strs {
-		result += str
-	}
-	return result
-}
 
 func (gr *GenericResources) Cleanup(aw *arbv1.AppWrapper, awr *arbv1.AppWrapperGenericResource) (genericResourceName string, groupversionkind *schema.GroupVersionKind, erro error) {
 	var err error
@@ -88,34 +73,15 @@ func (gr *GenericResources) Cleanup(aw *arbv1.AppWrapper, awr *arbv1.AppWrapperG
 	namespaced := true
 	// todo:DELETEME	dd := common.KubeClient.Discovery()
 	dd := gr.clients.Discovery()
-	apigroups, err := restmapper.GetAPIGroupResources(dd)
-	if err != nil {
-		klog.Errorf("[Cleanup] Error getting API resources, err=%#v", err)
-		return name, default_gvk, err
-	}
-	ext := awr.GenericTemplate
-	restmapper := restmapper.NewDiscoveryRESTMapper(apigroups)
-	_, gvk, err := unstructured.UnstructuredJSONScheme.Decode(ext.Raw, default_gvk, nil)
-	if err != nil {
-		klog.Errorf("Decoding error, please check your CR! Aborting handling the resource creation, err:  `%v`", err)
-		return name, gvk, err
-	}
 
-	mapping, err := restmapper.RESTMapping(gvk.GroupKind(), gvk.Version)
+	ext := awr.GenericTemplate
+	gvk, mapping, err := getResourceMapping(dd, ext.Raw, default_gvk)
 	if err != nil {
-		klog.Errorf("mapping error from raw object: `%v`", err)
-		return name, gvk, err
+	    return name, gvk, err
 	}
 
-	// todo:DELETEME		restconfig := common.KubeConfig
-	restconfig := gr.kubeClientConfig
-	restconfig.GroupVersion = &schema.GroupVersion{
-		Group:   mapping.GroupVersionKind.Group,
-		Version: mapping.GroupVersionKind.Version,
-	}
-	dclient, err := dynamic.NewForConfig(restconfig)
+	dclient, err := createDynamicClient(gr, mapping)
 	if err != nil {
-		klog.Errorf("[Cleanup] Error creating new dynamic client, err=%#v.", err)
 		return name, gvk, err
 	}
 
@@ -140,31 +106,19 @@ func (gr *GenericResources) Cleanup(aw *arbv1.AppWrapper, awr *arbv1.AppWrapperG
 		}
 	}
 
-	// Unmarshal generic item raw object
-	var unstruct unstructured.Unstructured
-	unstruct.Object = make(map[string]interface{})
-	var blob interface{}
-	if err = json.Unmarshal(ext.Raw, &blob); err != nil {
-		klog.Errorf("[Cleanup] Error unmarshalling, err=%#v", err)
-		return name, gvk, err
+	unstruct, err := UnmarshalToUnstructured(ext.Raw)
+	if err != nil {
+	    return name, gvk, err
 	}
 
-	unstruct.Object = blob.(map[string]interface{}) // set object to the content of the blob after Unmarshalling
 	namespace := aw.Namespace                       // only delete resources from AppWrapper namespace
-	if md, ok := unstruct.Object["metadata"]; ok {
 
-		metadata := md.(map[string]interface{})
-		if objectName, ok := metadata["name"]; ok {
-			name = objectName.(string)
-		}
-		if objectns, ok := metadata["namespace"]; ok {
-			if objectns.(string) != namespace {
-				err := fmt.Errorf("[Cleanup] resource namespace \"%s\" is different from AppWrapper namespace \"%s\"", objectns.(string), namespace)
-				return name, gvk, err
-			}
-		}
+	name, err = retrieveName(aw.Namespace, unstruct, "Cleanup")
+	if err != nil {
+	    return name, gvk, err
 	}
 
+
 	// Get the resource to see if it exists in the AppWrapper namespace
 	labelSelector := fmt.Sprintf("%s=%s, %s=%s", appwrapperJobName, aw.Name, resourceName, unstruct.GetName())
 	inEtcd, err := dclient.Resource(rsrc).Namespace(aw.Namespace).List(context.Background(), metav1.ListOptions{LabelSelector: labelSelector})
@@ -174,11 +128,7 @@ func (gr *GenericResources) Cleanup(aw *arbv1.AppWrapper, awr *arbv1.AppWrapperG
 
 	// Check to see if object already exists in etcd, if not, create the object.
 	if inEtcd != nil || len(inEtcd.Items) > 0 {
-		newName := name
-		if len(newName) > 63 {
-			newName = newName[:63]
-		}
-
+		newName := truncateName(name)
 		err = deleteObject(namespaced, namespace, newName, rsrc, dclient)
 		if err != nil {
 			if !errors.IsNotFound(err) {
@@ -206,91 +156,36 @@ func (gr *GenericResources) SyncQueueJob(aw *arbv1.AppWrapper, awr *arbv1.AppWra
 	}()
 
 	namespaced := true
+	name := ""
 	// todo:DELETEME	dd := common.KubeClient.Discovery()
 	dd := gr.clients.Discovery()
-	apigroups, err := restmapper.GetAPIGroupResources(dd)
-	if err != nil {
-		klog.Errorf("Error getting API resources, err=%#v", err)
-		return []*v1.Pod{}, err
-	}
 	ext := awr.GenericTemplate
-	restmapper := restmapper.NewDiscoveryRESTMapper(apigroups)
-	// versions := &unstructured.Unstructured{}
-	// _, gvk, err := unstructured.UnstructuredJSONScheme.Decode(ext.Raw, nil, versions)
-	_, gvk, err := unstructured.UnstructuredJSONScheme.Decode(ext.Raw, nil, nil)
-	if err != nil {
-		klog.Errorf("Decoding error, please check your CR! Aborting handling the resource creation, err:  `%v`", err)
-		return []*v1.Pod{}, err
-	}
-	mapping, err := restmapper.RESTMapping(gvk.GroupKind(), gvk.Version)
+	_, mapping, err := getResourceMapping(dd, ext.Raw, nil)
 	if err != nil {
-		klog.Errorf("mapping error from raw object: `%v`", err)
-		return []*v1.Pod{}, err
+	    return []*v1.Pod{}, err
 	}
 
-	// todo:DELETEME		restconfig := common.KubeConfig
-	restconfig := gr.kubeClientConfig
-	restconfig.GroupVersion = &schema.GroupVersion{
-		Group:   mapping.GroupVersionKind.Group,
-		Version: mapping.GroupVersionKind.Version,
-	}
-	dclient, err := dynamic.NewForConfig(restconfig)
+	dclient, err := createDynamicClient(gr, mapping)
 	if err != nil {
-		klog.Errorf("Error creating new dynamic client, err=%#v", err)
 		return []*v1.Pod{}, err
 	}
 
-	//TODO: Simplified apiresourcelist discovery, the assumption is we will always deploy namespaced objects
-	//We dont intend to install CRDs like KubeRay, Spark-Operator etc through MCAD, I think such objects are typically
-	//cluster scoped. May be for Multi-Cluster or inference use case we need such deep discovery, so for now commenting code.
-
-	// _, apiresourcelist, err := dd.ServerGroupsAndResources()
-	// if err != nil {
-	// 	if derr, ok := err.(*discovery.ErrGroupDiscoveryFailed); ok {
-	// 		klog.Warning("Discovery failed for some groups, %d failing: %v", len(derr.Groups), err)
-	// 	} else {
-	// 		klog.Errorf("Error getting supported groups and resources, err=%#v", err)
-	// 		return []*v1.Pod{}, err
-	// 	}
-	// }
-
 	rsrc := mapping.Resource
 
-	// for _, apiresourcegroup := range apiresourcelist {
-	// 	if apiresourcegroup.GroupVersion == join(mapping.GroupVersionKind.Group, "/", mapping.GroupVersionKind.Version) {
-	// 		for _, apiresource := range apiresourcegroup.APIResources {
-	// 			if apiresource.Name == mapping.Resource.Resource && apiresource.Kind == mapping.GroupVersionKind.Kind {
-	// 				rsrc = mapping.Resource
-	// 				namespaced = apiresource.Namespaced
-	// 			}
-	// 		}
-	// 	}
-	// }
-	var unstruct unstructured.Unstructured
-	unstruct.Object = make(map[string]interface{})
-	var blob interface{}
-	if err = json.Unmarshal(ext.Raw, &blob); err != nil {
-		klog.Errorf("Error unmarshalling, err=%#v", err)
-		return []*v1.Pod{}, err
+	unstruct, err := UnmarshalToUnstructured(ext.Raw)
+	if err != nil {
+	    return []*v1.Pod{}, err
 	}
+
 	ownerRef := metav1.NewControllerRef(aw, appWrapperKind)
-	unstruct.Object = blob.(map[string]interface{}) // set object to the content of the blob after Unmarshalling
 	unstruct.SetOwnerReferences(append(unstruct.GetOwnerReferences(), *ownerRef))
 	namespace := aw.Namespace // only create resources in AppWrapper namespace
-	name := ""
-	if md, ok := unstruct.Object["metadata"]; ok {
 
-		metadata := md.(map[string]interface{})
-		if objectName, ok := metadata["name"]; ok {
-			name = objectName.(string)
-		}
-		if objectns, ok := metadata["namespace"]; ok {
-			if objectns.(string) != namespace {
-				err := fmt.Errorf("[SyncQueueJob] resource namespace \"%s\" is different from AppWrapper namespace \"%s\"", objectns.(string), namespace)
-				return []*v1.Pod{}, err
-			}
-		}
+	name, err = retrieveName(aw.Namespace, unstruct, "SyncQueueJob")
+	if err != nil {
+	    return []*v1.Pod{}, err
 	}
+
 	labels := map[string]string{}
 	if unstruct.GetLabels() == nil {
 		unstruct.SetLabels(labels)
@@ -316,13 +211,9 @@ func (gr *GenericResources) SyncQueueJob(aw *arbv1.AppWrapper, awr *arbv1.AppWra
 
 	// Check to see if object already exists in etcd, if not, create the object.
 	if inEtcd == nil || len(inEtcd.Items) < 1 {
-		newName := name
-		if len(newName) > 63 {
-			newName = newName[:63]
-		}
+		newName := truncateName(name)
 		unstruct.SetName(newName)
 		//Asumption object is always namespaced
-		//Refer to comment on line 238
 		namespaced = true
 		err = createObject(namespaced, namespace, newName, rsrc, unstruct, dclient)
 		if err != nil {
@@ -335,177 +226,9 @@ func (gr *GenericResources) SyncQueueJob(aw *arbv1.AppWrapper, awr *arbv1.AppWra
 		}
 	}
 
-	// Get the related resources of created object
-	// var thisObj *unstructured.Unstructured
-	// var err1 error
-	// if namespaced {
-	// 	thisObj, err1 = dclient.Resource(rsrc).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{})
-	// } else {
-	// 	thisObj, err1 = dclient.Resource(rsrc).Get(context.Background(), name, metav1.GetOptions{})
-	// }
-	// if err1 != nil {
-	// 	klog.Errorf("Could not get created resource with error %v", err1)
-	// 	return []*v1.Pod{}, err1
-	// }
-	// thisOwnerRef := metav1.NewControllerRef(thisObj, thisObj.GroupVersionKind())
-
-	// podL, _ := gr.clients.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{})
-	// pods := []*v1.Pod{}
-	// for _, pod := range (*podL).Items {
-	// 	parent := metav1.GetControllerOf(&pod)
-	// 	if reflect.DeepEqual(thisOwnerRef, parent) {
-	// 		pods = append(pods, &pod)
-	// 	}
-	// 	klog.V(10).Infof("[SyncQueueJob] pod %s created from a Generic Item\n", pod.Name)
-	// }
-	// return pods, nil
 	return []*v1.Pod{}, nil
 }
 
-// checks if object has pod template spec and add new labels
-func addLabelsToPodTemplateField(unstruct *unstructured.Unstructured, labels map[string]string) (hasFields bool) {
-	spec, isFound, _ := unstructured.NestedMap(unstruct.UnstructuredContent(), "spec")
-	if !isFound {
-		klog.V(10).Infof("[addLabelsToPodTemplateField] 'spec' field not found.")
-		return false
-	}
-	template, isFound, _ := unstructured.NestedMap(spec, "template")
-	if !isFound {
-		klog.V(10).Infof("[addLabelsToPodTemplateField] 'spec.template' field not found.")
-		return false
-	}
-
-	marshal, _ := json.Marshal(template)
-	unmarshal := v1.PodTemplateSpec{}
-	if err := json.Unmarshal(marshal, &unmarshal); err != nil {
-		klog.Warning(err)
-		return false
-	}
-	existingLabels, isFound, _ := unstructured.NestedStringMap(template, "metadata", "labels")
-	if !isFound {
-		klog.V(10).Infof("[addLabelsToPodTemplateField] 'spec.template.metadata.labels' field not found.")
-		return false
-	}
-	newLength := len(existingLabels) + len(labels)
-	m := make(map[string]string, newLength) // convert map[string]string into map[string]interface{}
-	for k, v := range existingLabels {
-		m[k] = v
-	}
-
-	for k, v := range labels {
-		m[k] = v
-	}
-
-	if err := unstructured.SetNestedStringMap(unstruct.Object, m, "spec", "template", "metadata", "labels"); err != nil {
-		klog.Warning(err)
-		return false
-	}
-
-	return isFound
-}
-
-// checks if object has replicas and containers field
-func hasFields(obj runtime.RawExtension) (hasFields bool, replica float64, containers []v1.Container) {
-	var unstruct unstructured.Unstructured
-	unstruct.Object = make(map[string]interface{})
-	var blob interface{}
-	if err := json.Unmarshal(obj.Raw, &blob); err != nil {
-		klog.Errorf("Error unmarshalling, err=%#v", err)
-		return false, 0, nil
-	}
-	unstruct.Object = blob.(map[string]interface{})
-	spec, isFound, _ := unstructured.NestedMap(unstruct.UnstructuredContent(), "spec")
-	if !isFound {
-		klog.Warningf("[hasFields] No spec field found in raw object: %#v", unstruct.UnstructuredContent())
-	}
-
-	replicas, isFound, _ := unstructured.NestedFloat64(spec, "replicas")
-	// Set default to 1 if no replicas field is found (handles the case of a single pod creation without replicaset.
-	if !isFound {
-		replicas = 1
-	}
-
-	template, isFound, _ := unstructured.NestedMap(spec, "template")
-	// If spec does not contain a podtemplate, check for pod singletons
-	var subspec map[string]interface{}
-	if !isFound {
-		subspec = spec
-		klog.V(6).Infof("[hasFields] No template field found in raw object: %#v", spec)
-	} else {
-		subspec, isFound, _ = unstructured.NestedMap(template, "spec")
-	}
-
-	containerList, isFound, _ := unstructured.NestedSlice(subspec, "containers")
-	if !isFound {
-		klog.Warningf("[hasFields] No containers field found in raw object: %#v", subspec)
-		return false, 0, nil
-	}
-	objContainers := make([]v1.Container, len(containerList))
-	for _, container := range containerList {
-		marshal, _ := json.Marshal(container)
-		unmarshal := v1.Container{}
-		_ = json.Unmarshal(marshal, &unmarshal)
-		objContainers = append(objContainers, unmarshal)
-	}
-	return isFound, replicas, objContainers
-}
-
-func createObject(namespaced bool, namespace string, name string, rsrc schema.GroupVersionResource, unstruct unstructured.Unstructured, dclient dynamic.Interface) (erro error) {
-	var err error
-	if !namespaced {
-		res := dclient.Resource(rsrc)
-		_, err = res.Create(context.Background(), &unstruct, metav1.CreateOptions{})
-		if err != nil {
-			if errors.IsAlreadyExists(err) {
-				klog.Errorf("%v\n", err.Error())
-				return nil
-			} else {
-				klog.Errorf("Error creating the object `%v`, the error is `%v`", name, errors.ReasonForError(err))
-				return err
-			}
-		} else {
-			klog.V(4).Infof("Resource `%v` created\n", name)
-			return nil
-		}
-	} else {
-		res := dclient.Resource(rsrc).Namespace(namespace)
-		_, err = res.Create(context.Background(), &unstruct, metav1.CreateOptions{})
-		if err != nil {
-			if errors.IsAlreadyExists(err) {
-				klog.Errorf("%v\n", err.Error())
-				return nil
-			} else {
-				klog.Errorf("Error creating the object `%v`, the error is `%v`", name, errors.ReasonForError(err))
-				return err
-			}
-		} else {
-			klog.V(4).Infof("Resource `%v` created\n", name)
-			return nil
-
-		}
-	}
-}
-
-func deleteObject(namespaced bool, namespace string, name string, rsrc schema.GroupVersionResource, dclient dynamic.Interface) error {
-	var err error
-	backGround := metav1.DeletePropagationBackground
-	delOptions := metav1.DeleteOptions{PropagationPolicy: &backGround}
-	if !namespaced {
-		res := dclient.Resource(rsrc)
-		err = res.Delete(context.Background(), name, delOptions)
-	} else {
-		res := dclient.Resource(rsrc).Namespace(namespace)
-		err = res.Delete(context.Background(), name, delOptions)
-	}
-
-	if err != nil && !errors.IsNotFound(err) {
-		klog.Errorf("[deleteObject] Error deleting the object `%v`, the error is `%v`.", name, errors.ReasonForError(err))
-		return err
-	} else {
-		klog.V(4).Infof("[deleteObject] Resource `%v` deleted.\n", name)
-		return nil
-	}
-}
 
 func GetListOfPodResourcesFromOneGenericItem(awr *arbv1.AppWrapperGenericResource) (resource []*clusterstateapi.Resource, er error) {
 	var podResourcesList []*clusterstateapi.Resource
@@ -573,87 +296,25 @@ func GetResources(awr *arbv1.AppWrapperGenericResource) (resource *clusterstatea
 	return totalresource, err
 }
 
-func getPodResources(pod arbv1.CustomPodResourceTemplate) (resource *clusterstateapi.Resource) {
-	replicas := pod.Replicas
-	req := clusterstateapi.NewResource(pod.Requests)
-	limit := clusterstateapi.NewResource(pod.Limits)
-	tolerance := 0.001
-
-	// Use limit if request is 0
-	if diff := math.Abs(req.MilliCPU - float64(0.0)); diff < tolerance {
-		req.MilliCPU = limit.MilliCPU
-	}
-
-	if diff := math.Abs(req.Memory - float64(0.0)); diff < tolerance {
-		req.Memory = limit.Memory
-	}
-
-	if req.GPU <= 0 {
-		req.GPU = limit.GPU
-	}
-	req.MilliCPU = req.MilliCPU * float64(replicas)
-	req.Memory = req.Memory * float64(replicas)
-	req.GPU = req.GPU * int64(replicas)
-	return req
-}
-
-func getContainerResources(container v1.Container, replicas float64) *clusterstateapi.Resource {
-	req := clusterstateapi.NewResource(container.Resources.Requests)
-	limit := clusterstateapi.NewResource(container.Resources.Limits)
-
-	tolerance := 0.001
 
-	// Use limit if request is 0
-	if diff := math.Abs(req.MilliCPU - float64(0.0)); diff < tolerance {
-		req.MilliCPU = limit.MilliCPU
-	}
-
-	if diff := math.Abs(req.Memory - float64(0.0)); diff < tolerance {
-		req.Memory = limit.Memory
-	}
 
-	if req.GPU <= 0 {
-		req.GPU = limit.GPU
-	}
 
-	req.MilliCPU = req.MilliCPU * float64(replicas)
-	req.Memory = req.Memory * float64(replicas)
-	req.GPU = req.GPU * int64(replicas)
-	return req
-}
 
 // returns status of an item present in etcd
 func (gr *GenericResources) IsItemCompleted(awgr *arbv1.AppWrapperGenericResource, namespace string, appwrapperName string, genericItemName string) (completed bool) {
 	dd := gr.clients.Discovery()
-	apigroups, err := restmapper.GetAPIGroupResources(dd)
-	if err != nil {
-		klog.Errorf("[IsItemCompleted] Error getting API resources, err=%#v", err)
-		return false
-	}
-	restmapper := restmapper.NewDiscoveryRESTMapper(apigroups)
-	_, gvk, err := unstructured.UnstructuredJSONScheme.Decode(awgr.GenericTemplate.Raw, nil, nil)
-	if err != nil {
-		klog.Errorf("[IsItemCompleted] Decoding error, please check your CR! Aborting handling the resource creation, err:  `%v`", err)
-		return false
-	}
 
-	mapping, err := restmapper.RESTMapping(gvk.GroupKind(), gvk.Version)
+	_, mapping, err := getResourceMapping(dd, awgr.GenericTemplate.Raw, nil)
 	if err != nil {
-		klog.Errorf("[IsItemCompleted] mapping error from raw object: `%v`", err)
-		return false
+	    return false
 	}
-	restconfig := gr.kubeClientConfig
-	restconfig.GroupVersion = &schema.GroupVersion{
-		Group:   mapping.GroupVersionKind.Group,
-		Version: mapping.GroupVersionKind.Version,
-	}
-	rsrc := mapping.Resource
-	dclient, err := dynamic.NewForConfig(restconfig)
+
+	dclient, err := createDynamicClient(gr, mapping)
 	if err != nil {
-		klog.Errorf("[IsItemCompleted] Error creating new dynamic client, err %v", err)
 		return false
 	}
 
+	rsrc := mapping.Resource
 	labelSelector := fmt.Sprintf("%s=%s", appwrapperJobName, appwrapperName)
 	inEtcd, err := dclient.Resource(rsrc).Namespace(namespace).List(context.Background(), metav1.ListOptions{LabelSelector: labelSelector})
 	if err != nil {
diff --git a/pkg/controller/queuejobresources/genericresource/helper.go b/pkg/controller/queuejobresources/genericresource/helper.go
new file mode 100644
index 00000000..376d2f0a
--- /dev/null
+++ b/pkg/controller/queuejobresources/genericresource/helper.go
@@ -0,0 +1,253 @@
+package genericresource
+
+import (
+	"math"
+	"fmt"
+	"encoding/json"
+	"k8s.io/apimachinery/pkg/runtime"
+	"context"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+	"k8s.io/apimachinery/pkg/api/errors"
+
+	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+	arbv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1"
+	v1 "k8s.io/api/core/v1"
+
+	clusterstateapi "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/clusterstate/api"
+	"k8s.io/klog/v2"
+
+	"k8s.io/client-go/restmapper"
+
+	"k8s.io/apimachinery/pkg/api/meta"
+	"k8s.io/apimachinery/pkg/runtime/schema"
+	"k8s.io/client-go/discovery"
+	"k8s.io/client-go/dynamic"
+
+)
+
+func getPodResources(pod arbv1.CustomPodResourceTemplate) *clusterstateapi.Resource {
+    req := clusterstateapi.NewResource(pod.Requests)
+    limit := clusterstateapi.NewResource(pod.Limits)
+
+    calculateResources(req, limit, float64(pod.Replicas))
+
+    return req
+}
+
+func getContainerResources(container v1.Container, replicas float64) *clusterstateapi.Resource {
+    req := clusterstateapi.NewResource(container.Resources.Requests)
+    limit := clusterstateapi.NewResource(container.Resources.Limits)
+
+    calculateResources(req, limit, replicas)
+
+    return req
+}
+
+
+func calculateResources(req *clusterstateapi.Resource, limit *clusterstateapi.Resource, replicas float64) {
+    tolerance := 0.001
+
+    // Use limit if request is 0
+    if diff := math.Abs(req.MilliCPU); diff < tolerance {
+        req.MilliCPU = limit.MilliCPU
+    }
+
+    if diff := math.Abs(req.Memory); diff < tolerance {
+        req.Memory = limit.Memory
+    }
+
+    if req.GPU <= 0 {
+        req.GPU = limit.GPU
+    }
+
+    req.MilliCPU = req.MilliCPU * replicas
+    req.Memory = req.Memory * replicas
+    req.GPU = req.GPU * int64(replicas)
+}
+
+func retrieveName(awNamespace string, unstruct unstructured.Unstructured, logContext string) (string, error) {
+    var name string
+
+    if md, ok := unstruct.Object["metadata"]; ok {
+        metadata := md.(map[string]interface{})
+        if objectName, ok := metadata["name"]; ok {
+            name = objectName.(string)
+        }
+        if objectns, ok := metadata["namespace"]; ok {
+            if objectns.(string) != awNamespace {
+                return "", fmt.Errorf("[%s] resource namespace \"%s\" is different from AppWrapper namespace \"%s\"", logContext, objectns.(string), awNamespace)
+            }
+        }
+    }
+
+    return name, nil
+}
+
+func createDynamicClient(gr *GenericResources, mapping *meta.RESTMapping) (dynamic.Interface, error) {
+	restconfig := gr.kubeClientConfig
+	restconfig.GroupVersion = &schema.GroupVersion{
+		Group:   mapping.GroupVersionKind.Group,
+		Version: mapping.GroupVersionKind.Version,
+	}
+
+	dclient, err := dynamic.NewForConfig(restconfig)
+	if err != nil {
+		klog.Errorf("Error creating new dynamic client, err: %v", err)
+		return nil, err
+	}
+	return dclient, nil
+}
+
+func getResourceMapping(dd discovery.DiscoveryInterface, raw []byte, defaultGVK *schema.GroupVersionKind) (*schema.GroupVersionKind, *meta.RESTMapping, error) {
+    apigroups, err := restmapper.GetAPIGroupResources(dd)
+    if err != nil {
+        klog.Errorf("Error getting API resources, err=%#v", err)
+        return nil, nil, err
+    }
+
+    restmapper := restmapper.NewDiscoveryRESTMapper(apigroups)
+    _, gvk, err := unstructured.UnstructuredJSONScheme.Decode(raw, defaultGVK, nil)
+    if err != nil {
+        klog.Errorf("Decoding error, please check your CR! err: `%v`", err)
+        return nil, nil, err
+    }
+
+    mapping, err := restmapper.RESTMapping(gvk.GroupKind(), gvk.Version)
+    if err != nil {
+        klog.Errorf("Mapping error from raw object: `%v`", err)
+        return nil, nil, err
+    }
+
+    return gvk, mapping, nil
+}
+
+
+// checks if object has replicas and containers field
+func hasFields(obj runtime.RawExtension) (hasFields bool, replica float64, containers []v1.Container) {
+	unstruct, err := UnmarshalToUnstructured(obj.Raw)
+	if err != nil {
+	    return false, 0, nil
+	}
+
+	spec, isFound, _ := unstructured.NestedMap(unstruct.UnstructuredContent(), "spec")
+	if !isFound {
+		klog.Warningf("[hasFields] No spec field found in raw object: %#v", unstruct.UnstructuredContent())
+	}
+
+	replicas, isFound, _ := unstructured.NestedFloat64(spec, "replicas")
+	// Set default to 1 if no replicas field is found (handles the case of a single pod creation without replicaset.
+	if !isFound {
+		replicas = 1
+	}
+
+	template, isFound, _ := unstructured.NestedMap(spec, "template")
+	// If spec does not contain a podtemplate, check for pod singletons
+	var subspec map[string]interface{}
+	if !isFound {
+		subspec = spec
+		klog.V(6).Infof("[hasFields] No template field found in raw object: %#v", spec)
+	} else {
+		subspec, isFound, _ = unstructured.NestedMap(template, "spec")
+	}
+
+	containerList, isFound, _ := unstructured.NestedSlice(subspec, "containers")
+	if !isFound {
+		klog.Warningf("[hasFields] No containers field found in raw object: %#v", subspec)
+		return false, 0, nil
+	}
+	objContainers := make([]v1.Container, len(containerList))
+	for _, container := range containerList {
+		marshal, _ := json.Marshal(container)
+		unmarshal := v1.Container{}
+		_ = json.Unmarshal(marshal, &unmarshal)
+		objContainers = append(objContainers, unmarshal)
+	}
+	return isFound, replicas, objContainers
+}
+
+// checks if object has pod template spec and add new labels
+func addLabelsToPodTemplateField(unstruct *unstructured.Unstructured, labels map[string]string) (hasFields bool) {
+	spec, isFound, _ := unstructured.NestedMap(unstruct.UnstructuredContent(), "spec")
+	if !isFound {
+		klog.V(10).Infof("[addLabelsToPodTemplateField] 'spec' field not found.")
+		return false
+	}
+	template, isFound, _ := unstructured.NestedMap(spec, "template")
+	if !isFound {
+		klog.V(10).Infof("[addLabelsToPodTemplateField] 'spec.template' field not found.")
+		return false
+	}
+
+	marshal, _ := json.Marshal(template)
+	unmarshal := v1.PodTemplateSpec{}
+	if err := json.Unmarshal(marshal, &unmarshal); err != nil {
+		klog.Warning(err)
+		return false
+	}
+	existingLabels, isFound, _ := unstructured.NestedStringMap(template, "metadata", "labels")
+	if !isFound {
+		klog.V(10).Infof("[addLabelsToPodTemplateField] 'spec.template.metadata.labels' field not found.")
+		return false
+	}
+	newLength := len(existingLabels) + len(labels)
+	m := make(map[string]string, newLength) // convert map[string]string into map[string]interface{}
+	for k, v := range existingLabels {
+		m[k] = v
+	}
+
+	for k, v := range labels {
+		m[k] = v
+	}
+
+	if err := unstructured.SetNestedStringMap(unstruct.Object, m, "spec", "template", "metadata", "labels"); err != nil {
+		klog.Warning(err)
+		return false
+	}
+
+	return isFound
+}
+
+func createObject(namespaced bool, namespace string, name string, rsrc schema.GroupVersionResource, unstruct unstructured.Unstructured, dclient dynamic.Interface) error {
+	res := getResourceInterface(namespaced, namespace, rsrc, dclient)
+
+	_, err := res.Create(context.Background(), &unstruct, metav1.CreateOptions{})
+	if err != nil {
+		if errors.IsAlreadyExists(err) {
+			klog.Errorf("[createObject] Object `%v` already exists: %v", name, err)
+			return nil
+		}
+		klog.Errorf("[createObject] Error creating the object `%v`: %v", name, err)
+		return err
+	}
+
+	klog.V(4).Infof("[createObject] Resource `%v` created\n", name)
+	return nil
+}
+
+func deleteObject(namespaced bool, namespace string, name string, rsrc schema.GroupVersionResource, dclient dynamic.Interface) error {
+	backGround := metav1.DeletePropagationBackground
+	delOptions := metav1.DeleteOptions{PropagationPolicy: &backGround}
+
+	res := getResourceInterface(namespaced, namespace, rsrc, dclient)
+
+	err := res.Delete(context.Background(), name, delOptions)
+	if err != nil {
+		if errors.IsNotFound(err) {
+			klog.V(4).Infof("[deleteObject] object `%v` not found. No action taken.", name)
+			return nil
+		}
+		klog.Errorf("[deleteObject] Error deleting the object `%v`: %v", name, err)
+		return err
+	}
+
+	klog.V(4).Infof("[deleteObject] Resource `%v` deleted.\n", name)
+	return nil
+}
+
+func getResourceInterface(namespaced bool, namespace string, rsrc schema.GroupVersionResource, dclient dynamic.Interface) dynamic.ResourceInterface {
+	if namespaced {
+		return dclient.Resource(rsrc).Namespace(namespace)
+	}
+	return dclient.Resource(rsrc)
+}
diff --git a/pkg/controller/queuejobresources/genericresource/utils.go b/pkg/controller/queuejobresources/genericresource/utils.go
new file mode 100644
index 00000000..91c3f4af
--- /dev/null
+++ b/pkg/controller/queuejobresources/genericresource/utils.go
@@ -0,0 +1,40 @@
+package genericresource
+
+import (
+	"encoding/json"
+	"k8s.io/klog/v2"
+
+	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+)
+
+func UnmarshalToUnstructured(raw []byte) (unstruct unstructured.Unstructured, err error) {
+    unstruct.Object = make(map[string]interface{})
+    var blob interface{}
+    err = json.Unmarshal(raw, &blob)
+    if err != nil {
+        klog.Errorf("Error unmarshalling, err=%#v", err)
+        return unstruct, err
+    }
+    unstruct.Object = blob.(map[string]interface{})
+    return unstruct, nil
+}
+
+func truncateName(name string) string {
+	newName := name
+	if len(newName) > 63 {
+		newName = newName[:63]
+	}
+	return newName
+}
+
+func join(strs ...string) string {
+	var result string
+	if strs[0] == "" {
+		return strs[len(strs)-1]
+	}
+	for _, str := range strs {
+		result += str
+	}
+	return result
+}
+

From 0e6aa32ade70c8025a9ceef6aad3e83935506c88 Mon Sep 17 00:00:00 2001
From: Eoin Gallinagh <eoingallinagh@gmail.com>
Date: Mon, 20 Nov 2023 11:15:32 +0000
Subject: [PATCH 2/2] add: adjustments from feeback

---
 pkg/controller/queuejob/queuejob_controller_ex.go           | 2 +-
 .../queuejobresources/genericresource/genericresource.go    | 6 ++----
 pkg/controller/queuejobresources/genericresource/helper.go  | 4 ++--
 pkg/controller/queuejobresources/genericresource/utils.go   | 2 --
 4 files changed, 5 insertions(+), 9 deletions(-)

diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go
index cc030120..6061addf 100644
--- a/pkg/controller/queuejob/queuejob_controller_ex.go
+++ b/pkg/controller/queuejob/queuejob_controller_ex.go
@@ -583,7 +583,7 @@ func (qjm *XController) getAppWrapperCompletionStatus(caw *arbv1.AppWrapper) arb
 			objectName := genericItem.GenericTemplate
 			unstruct, err := genericresource.UnmarshalToUnstructured(objectName.Raw)
 			if err != nil {
-			    klog.Errorf("[getAppWrapperCompletionStatus] Error: %v", err)
+				klog.Errorf("[getAppWrapperCompletionStatus] Error unmarshalling appwrapper: %v", caw.Name)
 			}
 			name := ""
 			if md, ok := unstruct.Object["metadata"]; ok {
diff --git a/pkg/controller/queuejobresources/genericresource/genericresource.go b/pkg/controller/queuejobresources/genericresource/genericresource.go
index 2cad7375..362228f4 100644
--- a/pkg/controller/queuejobresources/genericresource/genericresource.go
+++ b/pkg/controller/queuejobresources/genericresource/genericresource.go
@@ -69,11 +69,9 @@ func (gr *GenericResources) Cleanup(aw *arbv1.AppWrapper, awr *arbv1.AppWrapperG
 	}
 	// Default generic resource name
 	name := ""
-
 	namespaced := true
-	// todo:DELETEME	dd := common.KubeClient.Discovery()
-	dd := gr.clients.Discovery()
 
+	dd := gr.clients.Discovery()
 	ext := awr.GenericTemplate
 	gvk, mapping, err := getResourceMapping(dd, ext.Raw, default_gvk)
 	if err != nil {
@@ -157,7 +155,7 @@ func (gr *GenericResources) SyncQueueJob(aw *arbv1.AppWrapper, awr *arbv1.AppWra
 
 	namespaced := true
 	name := ""
-	// todo:DELETEME	dd := common.KubeClient.Discovery()
+
 	dd := gr.clients.Discovery()
 	ext := awr.GenericTemplate
 	_, mapping, err := getResourceMapping(dd, ext.Raw, nil)
diff --git a/pkg/controller/queuejobresources/genericresource/helper.go b/pkg/controller/queuejobresources/genericresource/helper.go
index 376d2f0a..f19d9599 100644
--- a/pkg/controller/queuejobresources/genericresource/helper.go
+++ b/pkg/controller/queuejobresources/genericresource/helper.go
@@ -123,7 +123,7 @@ func getResourceMapping(dd discovery.DiscoveryInterface, raw []byte, defaultGVK
 }
 
 
-// checks if object has replicas and containers field
+// hasFields checks if obj has replicas and containers field
 func hasFields(obj runtime.RawExtension) (hasFields bool, replica float64, containers []v1.Container) {
 	unstruct, err := UnmarshalToUnstructured(obj.Raw)
 	if err != nil {
@@ -166,7 +166,7 @@ func hasFields(obj runtime.RawExtension) (hasFields bool, replica float64, conta
 	return isFound, replicas, objContainers
 }
 
-// checks if object has pod template spec and add new labels
+// addLabelsToPodTemplateField checks if unstruct has pod template spec and add new labels
 func addLabelsToPodTemplateField(unstruct *unstructured.Unstructured, labels map[string]string) (hasFields bool) {
 	spec, isFound, _ := unstructured.NestedMap(unstruct.UnstructuredContent(), "spec")
 	if !isFound {
diff --git a/pkg/controller/queuejobresources/genericresource/utils.go b/pkg/controller/queuejobresources/genericresource/utils.go
index 91c3f4af..a4790fed 100644
--- a/pkg/controller/queuejobresources/genericresource/utils.go
+++ b/pkg/controller/queuejobresources/genericresource/utils.go
@@ -2,7 +2,6 @@ package genericresource
 
 import (
 	"encoding/json"
-	"k8s.io/klog/v2"
 
 	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
 )
@@ -12,7 +11,6 @@ func UnmarshalToUnstructured(raw []byte) (unstruct unstructured.Unstructured, er
     var blob interface{}
     err = json.Unmarshal(raw, &blob)
     if err != nil {
-        klog.Errorf("Error unmarshalling, err=%#v", err)
         return unstruct, err
     }
     unstruct.Object = blob.(map[string]interface{})