Skip to content

remove looping from syncqueue job #593

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 48 additions & 38 deletions pkg/controller/queuejobresources/genericresource/genericresource.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"encoding/json"
"fmt"
"math"
"reflect"
"runtime/debug"
"strings"
"time"
Expand Down Expand Up @@ -190,6 +189,8 @@ func (gr *GenericResources) Cleanup(aw *arbv1.AppWrapper, awr *arbv1.AppWrapperG
return name, gvk, err
}

//SyncQueueJob uses dynamic clients to wrap Items inside genericItems, it is used to create resources inside etcd and return errors when
//unwrapping fails.
func (gr *GenericResources) SyncQueueJob(aw *arbv1.AppWrapper, awr *arbv1.AppWrapperGenericResource) (podList []*v1.Pod, err error) {
startTime := time.Now()
defer func() {
Expand Down Expand Up @@ -234,27 +235,32 @@ func (gr *GenericResources) SyncQueueJob(aw *arbv1.AppWrapper, awr *arbv1.AppWra
return []*v1.Pod{}, err
}

_, apiresourcelist, err := dd.ServerGroupsAndResources()
if err != nil {
if derr, ok := err.(*discovery.ErrGroupDiscoveryFailed); ok {
klog.Warning("Discovery failed for some groups, %d failing: %v", len(derr.Groups), err)
} else {
klog.Errorf("Error getting supported groups and resources, err=%#v", err)
return []*v1.Pod{}, err
}
}
//TODO: Simplified apiresourcelist discovery, the assumption is we will always deploy namespaced objects
//We dont intend to install CRDs like KubeRay, Spark-Operator etc through MCAD, I think such objects are typically
//cluster scoped. May be for Multi-Cluster or inference use case we need such deep discovery, so for now commenting code.

// _, apiresourcelist, err := dd.ServerGroupsAndResources()
// if err != nil {
// if derr, ok := err.(*discovery.ErrGroupDiscoveryFailed); ok {
// klog.Warning("Discovery failed for some groups, %d failing: %v", len(derr.Groups), err)
// } else {
// klog.Errorf("Error getting supported groups and resources, err=%#v", err)
// return []*v1.Pod{}, err
// }
// }

rsrc := mapping.Resource
for _, apiresourcegroup := range apiresourcelist {
if apiresourcegroup.GroupVersion == join(mapping.GroupVersionKind.Group, "/", mapping.GroupVersionKind.Version) {
for _, apiresource := range apiresourcegroup.APIResources {
if apiresource.Name == mapping.Resource.Resource && apiresource.Kind == mapping.GroupVersionKind.Kind {
rsrc = mapping.Resource
namespaced = apiresource.Namespaced
}
}
}
}

// for _, apiresourcegroup := range apiresourcelist {
// if apiresourcegroup.GroupVersion == join(mapping.GroupVersionKind.Group, "/", mapping.GroupVersionKind.Version) {
// for _, apiresource := range apiresourcegroup.APIResources {
// if apiresource.Name == mapping.Resource.Resource && apiresource.Kind == mapping.GroupVersionKind.Kind {
// rsrc = mapping.Resource
// namespaced = apiresource.Namespaced
// }
// }
// }
// }
var unstruct unstructured.Unstructured
unstruct.Object = make(map[string]interface{})
var blob interface{}
Expand Down Expand Up @@ -307,6 +313,9 @@ func (gr *GenericResources) SyncQueueJob(aw *arbv1.AppWrapper, awr *arbv1.AppWra
newName = newName[:63]
}
unstruct.SetName(newName)
//Asumption object is always namespaced
//Refer to comment on line 238
namespaced = true
err = createObject(namespaced, namespace, newName, rsrc, unstruct, dclient)
if err != nil {
if errors.IsAlreadyExists(err) {
Expand All @@ -319,29 +328,30 @@ func (gr *GenericResources) SyncQueueJob(aw *arbv1.AppWrapper, awr *arbv1.AppWra
}

// Get the related resources of created object
var thisObj *unstructured.Unstructured
// var thisObj *unstructured.Unstructured
var err1 error
if namespaced {
thisObj, err1 = dclient.Resource(rsrc).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{})
} else {
thisObj, err1 = dclient.Resource(rsrc).Get(context.Background(), name, metav1.GetOptions{})
}
// if namespaced {
// thisObj, err1 = dclient.Resource(rsrc).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{})
// } else {
// thisObj, err1 = dclient.Resource(rsrc).Get(context.Background(), name, metav1.GetOptions{})
// }
if err1 != nil {
klog.Errorf("Could not get created resource with error %v", err1)
return []*v1.Pod{}, err1
}
thisOwnerRef := metav1.NewControllerRef(thisObj, thisObj.GroupVersionKind())

podL, _ := gr.clients.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{})
pods := []*v1.Pod{}
for _, pod := range (*podL).Items {
parent := metav1.GetControllerOf(&pod)
if reflect.DeepEqual(thisOwnerRef, parent) {
pods = append(pods, &pod)
}
klog.V(10).Infof("[SyncQueueJob] pod %s created from a Generic Item\n", pod.Name)
}
return pods, nil
// thisOwnerRef := metav1.NewControllerRef(thisObj, thisObj.GroupVersionKind())

// podL, _ := gr.clients.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{})
// pods := []*v1.Pod{}
// for _, pod := range (*podL).Items {
// parent := metav1.GetControllerOf(&pod)
// if reflect.DeepEqual(thisOwnerRef, parent) {
// pods = append(pods, &pod)
// }
// klog.V(10).Infof("[SyncQueueJob] pod %s created from a Generic Item\n", pod.Name)
// }
// return pods, nil
return []*v1.Pod{}, err1
}

// checks if object has pod template spec and add new labels
Expand Down
61 changes: 31 additions & 30 deletions test/e2e/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,36 +373,37 @@ var _ = Describe("AppWrapper E2E Test", func() {
// This test is flawed, the namespace created by this appwrapper is not cleaned up.
// FIXME https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/471
// Leaving it here so that the builds no longer fail
It("Create AppWrapper - Namespace Only - 0 Pods", func() {
fmt.Fprintf(os.Stdout, "[e2e] Create AppWrapper - Namespace Only - 0 Pods - Started.\n")
context := initTestContext()
var appwrappers []*arbv1.AppWrapper
appwrappersPtr := &appwrappers
defer cleanupTestObjectsPtr(context, appwrappersPtr)

aw := createNamespaceAW(context, "aw-namespace-0")
appwrappers = append(appwrappers, aw)
fmt.Fprintf(GinkgoWriter, "[e2e] Create AppWrapper - Namespace Only - 0 Pods - app wrappers len %d.\n", len(appwrappers))

err := waitAWNonComputeResourceActive(context, aw)
Expect(err).NotTo(HaveOccurred())
fmt.Fprintf(os.Stdout, "[e2e] Create AppWrapper - Namespace Only - 0 Pods - Completed. Awaiting app wrapper cleanup\n")
})

It("Create AppWrapper - Generic Namespace Only - 0 Pods", func() {
fmt.Fprintf(os.Stdout, "[e2e] Create AppWrapper - Generic Namespace Only - 0 Pods - Started.\n")
context := initTestContext()
var appwrappers []*arbv1.AppWrapper
appwrappersPtr := &appwrappers
defer cleanupTestObjectsPtr(context, appwrappersPtr)

aw := createGenericNamespaceAW(context, "aw-generic-namespace-0")
appwrappers = append(appwrappers, aw)

err := waitAWNonComputeResourceActive(context, aw)
Expect(err).NotTo(HaveOccurred())

})
//TODO: Below two tests are turned off, please refer to github issue here: https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/598
// It("Create AppWrapper - Namespace Only - 0 Pods", func() {
// fmt.Fprintf(os.Stdout, "[e2e] Create AppWrapper - Namespace Only - 0 Pods - Started.\n")
// context := initTestContext()
// var appwrappers []*arbv1.AppWrapper
// appwrappersPtr := &appwrappers
// defer cleanupTestObjectsPtr(context, appwrappersPtr)

// aw := createNamespaceAW(context, "aw-namespace-0")
// appwrappers = append(appwrappers, aw)
// fmt.Fprintf(GinkgoWriter, "[e2e] Create AppWrapper - Namespace Only - 0 Pods - app wrappers len %d.\n", len(appwrappers))

// err := waitAWNonComputeResourceActive(context, aw)
// Expect(err).NotTo(HaveOccurred())
// fmt.Fprintf(os.Stdout, "[e2e] Create AppWrapper - Namespace Only - 0 Pods - Completed. Awaiting app wrapper cleanup\n")
// })

// It("Create AppWrapper - Generic Namespace Only - 0 Pods", func() {
// fmt.Fprintf(os.Stdout, "[e2e] Create AppWrapper - Generic Namespace Only - 0 Pods - Started.\n")
// context := initTestContext()
// var appwrappers []*arbv1.AppWrapper
// appwrappersPtr := &appwrappers
// defer cleanupTestObjectsPtr(context, appwrappersPtr)

// aw := createGenericNamespaceAW(context, "aw-generic-namespace-0")
// appwrappers = append(appwrappers, aw)

// err := waitAWNonComputeResourceActive(context, aw)
// Expect(err).NotTo(HaveOccurred())

// })

It("MCAD Custom Pod Resources Test", func() {
fmt.Fprintf(os.Stdout, "[e2e] MCAD Custom Pod Resources Test - Started.\n")
Expand Down