Skip to content

remove threads, fix backoff #623

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 6, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions pkg/controller/queuejob/queuejob_controller_ex.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ func (qjm *XController) PreemptQueueJobs() {
// Only back-off AWs that are in state running and not in state Failed
if updateNewJob.Status.State != arbv1.AppWrapperStateFailed {
klog.Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue.", aw.Name, aw.Namespace)
go qjm.backoff(ctx, updateNewJob, "PreemptionTriggered", string(message))
qjm.backoff(ctx, updateNewJob, "PreemptionTriggered", string(message))
}
}
}
Expand Down Expand Up @@ -1155,7 +1155,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) {
} else {
dispatchFailedMessage = "Cannot find an cluster with enough resources to dispatch AppWrapper."
klog.V(2).Infof("[ScheduleNex] [Dispatcher Mode] %s %s\n", dispatchFailedReason, dispatchFailedMessage)
go qjm.backoff(ctx, qj, dispatchFailedReason, dispatchFailedMessage)
qjm.backoff(ctx, qj, dispatchFailedReason, dispatchFailedMessage)
}
} else { // Agent Mode
aggqj := qjm.GetAggregatedResources(qj)
Expand Down Expand Up @@ -1284,7 +1284,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) {
// TODO: Remove forwarded logic as a big AW will never be forwarded
forwarded = true
// should we call backoff or update etcd?
go qjm.backoff(ctx, qj, dispatchFailedReason, dispatchFailedMessage)
qjm.backoff(ctx, qj, dispatchFailedReason, dispatchFailedMessage)
}
}
forwarded = true
Expand Down Expand Up @@ -1347,7 +1347,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) {
if qjm.quotaManager != nil && quotaFits {
qjm.quotaManager.Release(qj)
}
go qjm.backoff(ctx, qj, dispatchFailedReason, dispatchFailedMessage)
qjm.backoff(ctx, qj, dispatchFailedReason, dispatchFailedMessage)
}
}
return nil
Expand Down Expand Up @@ -1672,6 +1672,20 @@ func (cc *XController) updateQueueJob(oldObj, newObj interface{}) {
}

klog.V(6).Infof("[Informer-updateQJ] '%s/%s' *Delay=%.6f seconds normal enqueue Version=%s Status=%v", newQJ.Namespace, newQJ.Name, time.Now().Sub(newQJ.Status.ControllerFirstTimestamp.Time).Seconds(), newQJ.ResourceVersion, newQJ.Status)
for _, cond := range newQJ.Status.Conditions {
if cond.Type == arbv1.AppWrapperCondBackoff {
//AWs that have backoff conditions have a delay of 10 seconds before getting added to enqueue.
//TODO: we could plug an interface here with back-off strategies for different MCAD use cases.
time.AfterFunc(time.Duration(cc.serverOption.BackoffTime)*time.Second, func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be useful to have that logic encapsulated into a enqueueAfter method, so it mimics the semantic from client-go DelayingInterface.AddAfter, and can potentially be reused elsewhere.

if cc.serverOption.QuotaEnabled && cc.quotaManager != nil {
cc.quotaManager.Release(newQJ)
}
cc.enqueue(newQJ)
})
return
}
}

// cc.eventQueue.Delete(oldObj)
cc.enqueue(newQJ)
}
Expand Down