Skip to content

Commit a045f24

Browse files
author
Klaus Ma
authored
Merge pull request #45 from volcano-sh/feature/support-admission-test
Add admission e2e test
2 parents fc95697 + 03134d0 commit a045f24

File tree

13 files changed

+169
-39
lines changed

13 files changed

+169
-39
lines changed

Makefile

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
BIN_DIR=_output/bin
2-
export IMAGE=volcano
3-
export TAG = 1.0
2+
IMAGE=volcano
3+
TAG = 0.1
4+
5+
.EXPORT_ALL_VARIABLES:
46

57
all: controllers scheduler cli admission
68

hack/run-e2e-kind.sh

+5-3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ export VK_ROOT=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )/..
44
export VK_BIN=${VK_ROOT}/_output/bin
55
export LOG_LEVEL=3
66
export SHOW_VOLCANO_LOGS=${SHOW_VOLCANO_LOGS:-1}
7+
export CLEANUP_CLUSTER=${CLEANUP_CLUSTER:-1}
78

89
if [[ "${CLUSTER_NAME}xxx" != "xxx" ]];then
910
export CLUSTER_CONTEXT="--name ${CLUSTER_NAME}"
@@ -61,7 +62,7 @@ function install-volcano {
6162
helm gen-admission-secret --service integration-admission-service --namespace kube-system
6263

6364
echo "Install volcano chart"
64-
helm install installer/chart/volcano --namespace kube-system --name integration --kubeconfig ${KUBECONFIG}
65+
helm install installer/chart/volcano --namespace kube-system --name integration --kubeconfig ${KUBECONFIG} --set basic.image_tag_version=${TAG}
6566
}
6667

6768
function uninstall-volcano {
@@ -98,8 +99,9 @@ Disable displaying volcano component logs:
9899
exit 0
99100
fi
100101

101-
102-
trap cleanup EXIT
102+
if [[ $CLEANUP_CLUSTER -eq 1 ]]; then
103+
trap cleanup EXIT
104+
fi
103105

104106

105107
kind-up-cluster

installer/chart/volcano/config/kube-batch.conf

+2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ tiers:
33
- plugins:
44
- name: priority
55
- name: gang
6+
- name: conformance
67
- plugins:
78
- name: drf
89
- name: predicates
910
- name: proportion
11+
- name: nodeorder

pkg/admission/admit_job.go

-5
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,6 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st
6969
taskNames := map[string]string{}
7070
var totalReplicas int32
7171

72-
if job.Status.Version != 0 {
73-
reviewResponse.Allowed = false
74-
return fmt.Sprintf("Job Version is used internally, should not be specified.")
75-
}
76-
7772
if len(job.Spec.Tasks) == 0 {
7873
reviewResponse.Allowed = false
7974
return fmt.Sprintf("No task specified in job spec")

pkg/admission/mutate_job.go

-11
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ func MutateJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
7272

7373
func createPatch(job v1alpha1.Job) ([]byte, error) {
7474
var patch []patchOperation
75-
patch = append(patch, mutateJobVersion(job.Status, "/status")...)
7675
patch = append(patch, mutateSpec(job.Spec.Tasks, "/spec/tasks")...)
7776
patch = append(patch, mutateMetadata(job.ObjectMeta, "/metadata")...)
7877

@@ -96,16 +95,6 @@ func mutateSpec(tasks []v1alpha1.TaskSpec, basePath string) (patch []patchOperat
9695
return patch
9796
}
9897

99-
func mutateJobVersion(jobStatus v1alpha1.JobStatus, basePath string) (patch []patchOperation) {
100-
jobStatus.Version = 1
101-
patch = append(patch, patchOperation{
102-
Op: "replace",
103-
Path: basePath,
104-
Value: jobStatus,
105-
})
106-
return patch
107-
}
108-
10998
func mutateMetadata(metadata metav1.ObjectMeta, basePath string) (patch []patchOperation) {
11099
if len(metadata.Annotations) == 0 {
111100
metadata.Annotations = make(map[string]string)

pkg/apis/batch/v1alpha1/job.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ type JobStatus struct {
228228
// +optional
229229
Terminating int32 `json:"terminating,omitempty" protobuf:"bytes,7,opt,name=terminating"`
230230
//Current version of job
231-
Version int32
231+
Version int32 `json:"version,omitempty" protobuf:"bytes,8,opt,name=version"`
232232
}
233233

234234
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

pkg/controllers/job/cache/cache.go

+7
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,9 @@ func (jc *jobCache) Run(stopCh <-chan struct{}) {
231231
}
232232

233233
func (jc jobCache) TaskCompleted(jobKey, taskName string) bool {
234+
jc.Lock()
235+
defer jc.Unlock()
236+
234237
var taskReplicas, completed int32
235238

236239
jobInfo, found := jc.jobs[jobKey]
@@ -244,6 +247,10 @@ func (jc jobCache) TaskCompleted(jobKey, taskName string) bool {
244247
return false
245248
}
246249

250+
if jobInfo.Job == nil {
251+
return false
252+
}
253+
247254
for _, task := range jobInfo.Job.Spec.Tasks {
248255
if task.Name == taskName {
249256
taskReplicas = task.Replicas

pkg/controllers/job/job_controller_handler.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -78,18 +78,18 @@ func (cc *Controller) updateJob(oldObj, newObj interface{}) {
7878
return
7979
}
8080

81+
if err := cc.cache.Update(newJob); err != nil {
82+
glog.Errorf("Failed to update job <%s/%s>: %v in cache",
83+
newJob.Namespace, newJob.Name, err)
84+
}
85+
8186
//NOTE: Since we only reconcile job based on Spec, we will ignore other attributes
8287
// For Job status, it's used internally and always been updated via our controller.
8388
if reflect.DeepEqual(newJob.Spec, oldJob.Spec) {
8489
glog.Infof("Job update event is ignored since no update in 'Spec'.")
8590
return
8691
}
8792

88-
if err := cc.cache.Update(newJob); err != nil {
89-
glog.Errorf("Failed to update job <%s/%s>: %v in cache",
90-
newJob.Namespace, newJob.Name, err)
91-
}
92-
9393
req := apis.Request{
9494
Namespace: newJob.Namespace,
9595
JobName: newJob.Name,

pkg/controllers/job/job_controller_util.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action {
166166
}
167167

168168
//For all the requests triggered from discarded job resources will perform sync action instead
169-
if req.JobVersion > 0 && req.JobVersion < job.Status.Version {
169+
if req.JobVersion < job.Status.Version {
170170
glog.Infof("Request %s is outdated, will perform sync instead.", req)
171171
return vkv1.SyncJobAction
172172
}

pkg/scheduler/actions/preempt/preempt.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -232,10 +232,9 @@ func preempt(
232232
}
233233
preempted.Add(preemptee.Resreq)
234234
// If reclaimed enough resources, break loop to avoid Sub panic.
235-
if resreq.LessEqual(preemptee.Resreq) {
235+
if resreq.LessEqual(preempted) {
236236
break
237237
}
238-
resreq.Sub(preemptee.Resreq)
239238
}
240239

241240
metrics.RegisterPreemptionAttempts()

pkg/scheduler/actions/reclaim/reclaim.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -166,10 +166,9 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) {
166166
}
167167
reclaimed.Add(reclaimee.Resreq)
168168
// If reclaimed enough resources, break loop to avoid Sub panic.
169-
if resreq.LessEqual(reclaimee.Resreq) {
169+
if resreq.LessEqual(reclaimed) {
170170
break
171171
}
172-
resreq.Sub(reclaimee.Resreq)
173172
}
174173

175174
glog.V(3).Infof("Reclaimed <%v> for task <%s/%s> requested <%v>.",

test/e2e/admission.go

+125
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
Copyright 2019 The Volcano Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package e2e
18+
19+
import (
20+
. "github.com/onsi/ginkgo"
21+
. "github.com/onsi/gomega"
22+
"k8s.io/apimachinery/pkg/api/errors"
23+
"volcano.sh/volcano/pkg/apis/batch/v1alpha1"
24+
)
25+
26+
var _ = Describe("Job E2E Test: Test Admission service", func() {
27+
It("Duplicated Task Name", func() {
28+
jobName := "job-duplicated"
29+
namespace := "test"
30+
context := initTestContext()
31+
defer cleanupTestContext(context)
32+
rep := clusterSize(context, oneCPU)
33+
34+
_, err := createJobInner(context, &jobSpec{
35+
namespace: namespace,
36+
name: jobName,
37+
tasks: []taskSpec{
38+
{
39+
img: defaultNginxImage,
40+
req: oneCPU,
41+
min: rep,
42+
rep: rep,
43+
name: "duplicated",
44+
},
45+
{
46+
img: defaultNginxImage,
47+
req: oneCPU,
48+
min: rep,
49+
rep: rep,
50+
name: "duplicated",
51+
},
52+
},
53+
})
54+
Expect(err).To(HaveOccurred())
55+
stError, ok := err.(*errors.StatusError)
56+
Expect(ok).To(Equal(true))
57+
Expect(stError.ErrStatus.Code).To(Equal(int32(500)))
58+
Expect(stError.ErrStatus.Message).To(ContainSubstring("duplicated task name"))
59+
})
60+
61+
It("Duplicated Policy Event", func() {
62+
jobName := "job-policy-duplicated"
63+
namespace := "test"
64+
context := initTestContext()
65+
defer cleanupTestContext(context)
66+
rep := clusterSize(context, oneCPU)
67+
68+
_, err := createJobInner(context, &jobSpec{
69+
namespace: namespace,
70+
name: jobName,
71+
tasks: []taskSpec{
72+
{
73+
img: defaultNginxImage,
74+
req: oneCPU,
75+
min: rep,
76+
rep: rep,
77+
name: "taskname",
78+
},
79+
},
80+
policies: []v1alpha1.LifecyclePolicy{
81+
{
82+
Event: v1alpha1.PodFailedEvent,
83+
Action: v1alpha1.AbortJobAction,
84+
},
85+
{
86+
Event: v1alpha1.PodFailedEvent,
87+
Action: v1alpha1.RestartJobAction,
88+
},
89+
},
90+
})
91+
Expect(err).To(HaveOccurred())
92+
stError, ok := err.(*errors.StatusError)
93+
Expect(ok).To(Equal(true))
94+
Expect(stError.ErrStatus.Code).To(Equal(int32(500)))
95+
Expect(stError.ErrStatus.Message).To(ContainSubstring("duplicated job event policies"))
96+
})
97+
98+
It("Min Available illegal", func() {
99+
jobName := "job-min-illegal"
100+
namespace := "test"
101+
context := initTestContext()
102+
defer cleanupTestContext(context)
103+
rep := clusterSize(context, oneCPU)
104+
105+
_, err := createJobInner(context, &jobSpec{
106+
min: rep * 2,
107+
namespace: namespace,
108+
name: jobName,
109+
tasks: []taskSpec{
110+
{
111+
img: defaultNginxImage,
112+
req: oneCPU,
113+
min: rep,
114+
rep: rep,
115+
name: "taskname",
116+
},
117+
},
118+
})
119+
Expect(err).To(HaveOccurred())
120+
stError, ok := err.(*errors.StatusError)
121+
Expect(ok).To(Equal(true))
122+
Expect(stError.ErrStatus.Code).To(Equal(int32(500)))
123+
Expect(stError.ErrStatus.Message).To(ContainSubstring("'minAvailable' should not be greater than total replicas in tasks"))
124+
})
125+
})

test/e2e/util.go

+17-7
Original file line numberDiff line numberDiff line change
@@ -221,11 +221,11 @@ func cleanupTestContext(cxt *context) {
221221
Expect(err).NotTo(HaveOccurred())
222222

223223
// Wait for namespace deleted.
224-
err = wait.Poll(100*time.Millisecond, oneMinute, namespaceNotExist(cxt))
224+
err = wait.Poll(100*time.Millisecond, twoMinute, namespaceNotExist(cxt))
225225
Expect(err).NotTo(HaveOccurred())
226226

227227
// Wait for queues deleted
228-
err = wait.Poll(100*time.Millisecond, oneMinute, queueNotExist(cxt))
228+
err = wait.Poll(100*time.Millisecond, twoMinute, queueNotExist(cxt))
229229
Expect(err).NotTo(HaveOccurred())
230230
}
231231

@@ -294,6 +294,7 @@ type jobSpec struct {
294294
queue string
295295
tasks []taskSpec
296296
policies []vkv1.LifecyclePolicy
297+
min int32
297298
}
298299

299300
func getNS(context *context, job *jobSpec) string {
@@ -311,6 +312,14 @@ func getNS(context *context, job *jobSpec) string {
311312
}
312313

313314
func createJob(context *context, jobSpec *jobSpec) *vkv1.Job {
315+
316+
job, err := createJobInner(context, jobSpec)
317+
Expect(err).NotTo(HaveOccurred())
318+
319+
return job
320+
}
321+
322+
func createJobInner(context *context, jobSpec *jobSpec) (*vkv1.Job, error) {
314323
ns := getNS(context, jobSpec)
315324

316325
job := &vkv1.Job{
@@ -366,12 +375,13 @@ func createJob(context *context, jobSpec *jobSpec) *vkv1.Job {
366375
min += task.min
367376
}
368377

369-
job.Spec.MinAvailable = min
370-
371-
job, err := context.vkclient.BatchV1alpha1().Jobs(job.Namespace).Create(job)
372-
Expect(err).NotTo(HaveOccurred())
378+
if jobSpec.min > 0 {
379+
job.Spec.MinAvailable = jobSpec.min
380+
} else {
381+
job.Spec.MinAvailable = min
382+
}
373383

374-
return job
384+
return context.vkclient.BatchV1alpha1().Jobs(job.Namespace).Create(job)
375385
}
376386

377387
func taskPhase(ctx *context, job *vkv1.Job, phase []v1.PodPhase, taskNum int) wait.ConditionFunc {

0 commit comments

Comments
 (0)