Skip to content

deleted AWs should not be added to event queue #568

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 22 additions & 22 deletions pkg/controller/queuejob/queuejob_controller_ex.go
Original file line number Diff line number Diff line change
Expand Up @@ -1613,6 +1613,12 @@ func larger(a, b string) bool {
return a > b // Equal length, lexicographic order
}

//When an AW is deleted, do not add such AWs to the event queue.
//AW can never be brought back when it is deleted by external client, so do not bother adding it to event queue.
//There will be a scenario, where an AW is in middle to dispatch and it is deleted. at that point when such an
//AW is added to etcd a conflict error will be raised. This is cause the current AW to be skipped.
//If there are large number of delete's will informer miss few delete events is under question for this simplification.
//For 1K AW all of them are delete from the system, and the next resubmitted AW begins processing after in less than 2 mins
func (cc *XController) deleteQueueJob(obj interface{}) {
qj, ok := obj.(*arbv1.AppWrapper)
if !ok {
Expand All @@ -1628,12 +1634,22 @@ func (cc *XController) deleteQueueJob(obj interface{}) {
} else {
accessor.SetDeletionTimestamp(&current_ts)
}
klog.V(3).Infof("[Informer-deleteQJ] %s enqueue deletion, deletion ts = %v", qj.Name, qj.GetDeletionTimestamp())
//Remove stale copy
cc.eventQueue.Delete(qj)
cc.qjqueue.Delete(qj)
//Add fresh copy
cc.eventQueue.Add(qj)
// validate that app wraper has not been marked for deletion by the infomer's delete handler
if qj.DeletionTimestamp != nil {
klog.V(3).Infof("[Informer-deleteQJ] AW job=%s/%s set for deletion.", qj.Namespace, qj.Name)
// cleanup resources for running job, ignoring errors
if err00 := cc.Cleanup(context.Background(), qj); err00 != nil {
klog.Warningf("Failed to cleanup resources for app wrapper '%s/%s', err = %v", qj.Namespace, qj.Name, err00)
}
// empty finalizers and delete the queuejob again
if accessor, err00 := meta.Accessor(qj); err00 == nil {
accessor.SetFinalizers(nil)
}
// we delete the job from the queue if it is there, ignoring errors
cc.qjqueue.Delete(qj)
cc.eventQueue.Delete(qj)
klog.V(3).Infof("[Informer-deleteQJ] AW job=%s/%s deleted.", qj.Namespace, qj.Name)
}
}

func (cc *XController) enqueue(obj interface{}) error {
Expand Down Expand Up @@ -1767,22 +1783,6 @@ func (cc *XController) worker() {
}

func (cc *XController) syncQueueJob(ctx context.Context, qj *arbv1.AppWrapper) error {
// validate that app wraper has not been marked for deletion by the infomer's delete handler
if qj.DeletionTimestamp != nil {
klog.V(3).Infof("[syncQueueJob] AW job=%s/%s set for deletion.", qj.Namespace, qj.Name)
// cleanup resources for running job, ignoring errors
if err00 := cc.Cleanup(ctx, qj); err00 != nil {
klog.Warningf("Failed to cleanup resources for app wrapper '%s/%s', err = %v", qj.Namespace, qj.Name, err00)
}
// empty finalizers and delete the queuejob again
if accessor, err00 := meta.Accessor(qj); err00 == nil {
accessor.SetFinalizers(nil)
}
// we delete the job from the queue if it is there, ignoring errors
cc.qjqueue.Delete(qj)
klog.V(3).Infof("[syncQueueJob] AW job=%s/%s deleted.", qj.Namespace, qj.Name)
return nil
}
cacheAWJob, err := cc.getAppWrapper(qj.Namespace, qj.Name, "[syncQueueJob] get fresh appwrapper ")
if err != nil {
if apierrors.IsNotFound(err) {
Expand Down