diff --git a/.travis.yml b/.travis.yml index c3124e9..cd8c3ad 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,6 @@ language: java jdk: - oraclejdk8 -sudo: false install: ./installViaTravis.sh script: ./buildViaTravis.sh env: diff --git a/build.gradle b/build.gradle index 1954c56..02cda80 100644 --- a/build.gradle +++ b/build.gradle @@ -24,7 +24,7 @@ buildscript { } } dependencies { - classpath 'com.netflix.nebula:gradle-netflixoss-project-plugin:3.5.2' + classpath 'com.netflix.nebula:gradle-netflixoss-project-plugin:5.0.0' } } @@ -58,4 +58,8 @@ subprojects { tasks.withType(Javadoc) { options.addStringOption('Xdoclint:none', '-quiet') } + + test { + maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1 + } } diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/AssignableVMs.java b/fenzo-core/src/main/java/com/netflix/fenzo/AssignableVMs.java index 81bf959..d462379 100644 --- a/fenzo-core/src/main/java/com/netflix/fenzo/AssignableVMs.java +++ b/fenzo-core/src/main/java/com/netflix/fenzo/AssignableVMs.java @@ -346,7 +346,7 @@ void purgeInactiveVMs(Set excludeVms) { vmCollection.remove(avm); if (avm.getCurrVMId() != null) vmIdToHostnameMap.remove(avm.getCurrVMId(), avm.getHostname()); - logger.info("Removed inactive host " + avm.getHostname()); + logger.debug("Removed inactive host " + avm.getHostname()); } } } diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/AssignableVirtualMachine.java b/fenzo-core/src/main/java/com/netflix/fenzo/AssignableVirtualMachine.java index 1637c6f..efb1a94 100644 --- a/fenzo-core/src/main/java/com/netflix/fenzo/AssignableVirtualMachine.java +++ b/fenzo-core/src/main/java/com/netflix/fenzo/AssignableVirtualMachine.java @@ -492,7 +492,7 @@ private void assignResourceSets(TaskRequest request) { } void expireLease(String leaseId) { - logger.info("Got request to expire lease on " + hostname); + logger.debug("Got request to expire lease on " + hostname); leasesToExpire.offer(leaseId); } diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/TaskScheduler.java b/fenzo-core/src/main/java/com/netflix/fenzo/TaskScheduler.java index 3483558..2b4d93b 100644 --- a/fenzo-core/src/main/java/com/netflix/fenzo/TaskScheduler.java +++ b/fenzo-core/src/main/java/com/netflix/fenzo/TaskScheduler.java @@ -17,20 +17,17 @@ package com.netflix.fenzo; import com.netflix.fenzo.common.ThreadFactoryBuilder; -import com.netflix.fenzo.plugins.NoOpScaleDownOrderEvaluator; -import com.netflix.fenzo.queues.Assignable; -import com.netflix.fenzo.queues.QueuableTask; -import com.netflix.fenzo.sla.ResAllocs; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import com.netflix.fenzo.functions.Action1; -import com.netflix.fenzo.functions.Action2; -import com.netflix.fenzo.functions.Func1; - -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -41,6 +38,16 @@ import java.util.function.Supplier; import java.util.stream.Collectors; +import com.netflix.fenzo.functions.Action1; +import com.netflix.fenzo.functions.Action2; +import com.netflix.fenzo.functions.Func1; +import com.netflix.fenzo.plugins.NoOpScaleDownOrderEvaluator; +import com.netflix.fenzo.queues.Assignable; +import com.netflix.fenzo.queues.QueuableTask; +import com.netflix.fenzo.sla.ResAllocs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * A scheduling service that you can use to optimize the assignment of tasks to hosts within a Mesos framework. * Call the {@link #scheduleOnce scheduleOnce()} method with a list of task requests and a list of new resource @@ -58,7 +65,7 @@ * {@link #getTaskUnAssigner getTaskUnAssigner()} method. These actions make the {@code TaskScheduler} keep * track of launched tasks. The {@code TaskScheduler} then makes these tracked tasks available to its * scheduling optimization functions. - * + *

* Do not call the scheduler concurrently. The scheduler assigns tasks in the order that they are received in a * particular list. It checks each task against available resources until it finds a match. *

@@ -79,34 +86,30 @@ public class TaskScheduler { */ public final static class Builder { - private Action1 leaseRejectAction=null; - private long leaseOfferExpirySecs=120; - private int maxOffersToReject=4; - private boolean rejectAllExpiredOffers=false; + private Action1 leaseRejectAction = null; + private long leaseOfferExpirySecs = 120; + private int maxOffersToReject = 4; + private boolean rejectAllExpiredOffers = false; private VMTaskFitnessCalculator fitnessCalculator = new DefaultFitnessCalculator(); - private String autoScaleByAttributeName=null; - private String autoScalerMapHostnameAttributeName=null; - private String autoScaleDownBalancedByAttributeName=null; + private String autoScaleByAttributeName = null; + private String autoScalerMapHostnameAttributeName = null; + private String autoScaleDownBalancedByAttributeName = null; private ScaleDownOrderEvaluator scaleDownOrderEvaluator; private Map weightedScaleDownConstraintEvaluators; private PreferentialNamedConsumableResourceEvaluator preferentialNamedConsumableResourceEvaluator = new DefaultPreferentialNamedConsumableResourceEvaluator(); - private Action1 autoscalerCallback=null; - private long delayAutoscaleUpBySecs=0L; - private long delayAutoscaleDownBySecs=0L; - private long disabledVmDurationInSecs =0L; - private List autoScaleRules=new ArrayList<>(); - private Func1 isFitnessGoodEnoughFunction = new Func1() { - @Override - public Boolean call(Double f) { - return f>1.0; - } - }; - private boolean disableShortfallEvaluation=false; - private Map resAllocs=null; - private boolean singleOfferMode=false; + private Action1 autoscalerCallback = null; + private long delayAutoscaleUpBySecs = 0L; + private long delayAutoscaleDownBySecs = 0L; + private long disabledVmDurationInSecs = 0L; + private List autoScaleRules = new ArrayList<>(); + private Func1 isFitnessGoodEnoughFunction = f -> f > 1.0; + private boolean disableShortfallEvaluation = false; + private Map resAllocs = null; + private boolean singleOfferMode = false; private final List schedulingEventListeners = new ArrayList<>(); private int maxConcurrent = Runtime.getRuntime().availableProcessors(); private Supplier taskBatchSizeSupplier = () -> Long.MAX_VALUE; + private Func1, List> assignableVMsEvaluator = null; /** * (Required) Call this method to establish a method that your task scheduler will call to notify you @@ -140,12 +143,14 @@ public Builder withLeaseOfferExpirySecs(long leaseOfferExpirySecs) { /** * Call this method to set the maximum number of offers to reject within a time period equal to lease expiry * seconds, set with {@code leaseOfferExpirySecs()}. Default is 4. + * * @param maxOffersToReject Maximum number of offers to reject. * @return this same {@code Builder}, suitable for further chaining or to build the {@link TaskScheduler} */ public Builder withMaxOffersToReject(int maxOffersToReject) { - if(!rejectAllExpiredOffers) + if (!rejectAllExpiredOffers) { this.maxOffersToReject = maxOffersToReject; + } return this; } @@ -153,6 +158,7 @@ public Builder withMaxOffersToReject(int maxOffersToReject) { * Indicate that all offers older than the set expiry time must be rejected. By default this is set to false. * If false, Fenzo rejects a maximum number of offers set using {@link #withMaxOffersToReject(int)} per each * time period spanning the expiry time, set by {@link #withLeaseOfferExpirySecs(long)}. + * * @return this same {@code Builder}, suitable for further chaining or to build the {@link TaskScheduler} */ public Builder withRejectAllExpiredOffers() { @@ -303,7 +309,7 @@ public Builder disableShortfallEvaluation() { * @param resAllocs a Map with the task group name as keys and resource allocation limits as values * @return this same {@code Builder}, suitable for further chaining or to build the {@link TaskScheduler} * @see Resource Allocation - * Limits + * Limits */ public Builder withInitialResAllocs(Map resAllocs) { this.resAllocs = resAllocs; @@ -326,12 +332,15 @@ public Builder withInitialResAllocs(Map resAllocs) { * @see Autoscaling */ public Builder withAutoScaleRule(AutoScaleRule rule) { - if(autoScaleByAttributeName==null || autoScaleByAttributeName.isEmpty()) + if (autoScaleByAttributeName == null || autoScaleByAttributeName.isEmpty()) { throw new IllegalArgumentException("Auto scale by attribute name must be set before setting rules"); - if(rule.getMinIdleHostsToKeep()<1) + } + if (rule.getMinIdleHostsToKeep() < 1) { throw new IllegalArgumentException("Min Idle must be >0"); - if(rule.getMinIdleHostsToKeep()>rule.getMaxIdleHostsToKeep()) + } + if (rule.getMinIdleHostsToKeep() > rule.getMaxIdleHostsToKeep()) { throw new IllegalArgumentException("Min Idle must be <= Max Idle hosts"); + } this.autoScaleRules.add(rule); return this; } @@ -353,22 +362,24 @@ public Builder withAutoScalerCallback(Action1 callback) { * policy rules. Such scale ups can be caused by, for example, the periodic offer rejections that result in * offers coming back shortly. They can also be caused by certain environments where tasks are first scheduled * to replace existing tasks. - *

+ *

* The autoscaler takes the scale up action based on the latest scale up request value after the delay. - *

+ *

* The default is 0 secs. Ideally, you should set this to be at least two times the larger of the two values: *

+ * * @param delayAutoscaleUpBySecs Delay autoscale up actions by this many seconds. * @return this same {@code Builder}, suitable for further chaining or to build the {@link TaskScheduler} * @throws IllegalArgumentException if you give negative number for {@code delayAutoscalerbySecs}. * @see Autoscaling */ public Builder withDelayAutoscaleUpBySecs(long delayAutoscaleUpBySecs) { - if(delayAutoscaleUpBySecs < 0L) + if (delayAutoscaleUpBySecs < 0L) { throw new IllegalArgumentException("Delay secs can't be negative: " + delayAutoscaleUpBySecs); + } this.delayAutoscaleUpBySecs = delayAutoscaleUpBySecs; return this; } @@ -377,19 +388,21 @@ public Builder withDelayAutoscaleUpBySecs(long delayAutoscaleUpBySecs) { * Delay the autoscale down actions to reduce unnecessary actions due to short periods of breach of scale down * policy rules. Such scale downs can be caused by, for example, certain environments where existing tasks are * removed before replacing them with new tasks. - *

+ *

* The autoscaler takes the scale down action based on the latest scale down request value after the delay. - *

+ *

* The default is 0 secs. Ideally, you should set this to be at least two times the delay before terminated * tasks are replaced successfully. + * * @param delayAutoscaleDownBySecs Delay autoscale down actions by this many seconds. * @return this same {@code Builder}, suitable for further chaining or to build the {@link TaskScheduler} * @throws IllegalArgumentException if you give negative number for {@code delayAutoscalerbySecs}. * @see Autoscaling */ public Builder withDelayAutoscaleDownBySecs(long delayAutoscaleDownBySecs) { - if(delayAutoscaleDownBySecs < 0L) + if (delayAutoscaleDownBySecs < 0L) { throw new IllegalArgumentException("Delay secs can't be negative: " + delayAutoscaleDownBySecs); + } this.delayAutoscaleDownBySecs = delayAutoscaleDownBySecs; return this; } @@ -408,7 +421,7 @@ public Builder withDelayAutoscaleDownBySecs(long delayAutoscaleDownBySecs) { * @see Autoscaling */ public Builder withAutoscaleDisabledVmDurationInSecs(long disabledVmDurationInSecs) { - if(disabledVmDurationInSecs <= 0L) { + if (disabledVmDurationInSecs <= 0L) { throw new IllegalArgumentException("disabledVmDurationInSecs must be greater than 0: " + disabledVmDurationInSecs); } this.disabledVmDurationInSecs = disabledVmDurationInSecs; @@ -460,18 +473,33 @@ public Builder withTaskBatchSizeSupplier(Supplier taskBatchSizeSupplier) { return this; } + /** + * A provided function that can transform the assignable virtual machines that will be used during a scheduling + * iteration right before the scheduling iteration happens. This function is useful for global filtering and sorting + * right before the VMs are used to make scheduling decisions. Since this function blocks the scheduling loop, the expectation + * is that it returns very quickly. + * This API is experimental and subject to change. + * + * @param function that takes in a list of the current VMs and returns a list with VMs. + * @return this same {@code Builder}, suitable for further chaining or to build the {@link TaskScheduler} + */ + public Builder withAssignableVMsEvaluator(Func1, List> function) { + this.assignableVMsEvaluator = function; + return this; + } + /** * Creates a {@link TaskScheduler} based on the various builder methods you have chained. * * @return a {@code TaskScheduler} built according to the specifications you indicated */ public TaskScheduler build() { - if(scaleDownOrderEvaluator == null) { - if(weightedScaleDownConstraintEvaluators != null) { + if (scaleDownOrderEvaluator == null) { + if (weightedScaleDownConstraintEvaluators != null) { scaleDownOrderEvaluator = new NoOpScaleDownOrderEvaluator(); } } else { - if(weightedScaleDownConstraintEvaluators == null) { + if (weightedScaleDownConstraintEvaluators == null) { weightedScaleDownConstraintEvaluators = Collections.emptyMap(); } } @@ -497,7 +525,7 @@ private EvalResult(List assignmentResults, TaskAssignmentR private final AssignableVMs assignableVMs; private static final Logger logger = LoggerFactory.getLogger(TaskScheduler.class); private static final long purgeVMsIntervalSecs = 60; - private long lastVMPurgeAt=System.currentTimeMillis(); + private long lastVMPurgeAt = System.currentTimeMillis(); private final Builder builder; private final StateMonitor stateMonitor; private final SchedulingEventListener schedulingEventListener; @@ -509,10 +537,12 @@ private EvalResult(List assignmentResults, TaskAssignmentR private final TaskTracker taskTracker; private volatile boolean usingSchedulingService = false; private final String usingSchedSvcMesg = "Invalid call when using task scheduling service"; + private final Func1, List> assignableVMsEvaluator; private TaskScheduler(Builder builder) { - if(builder.leaseRejectAction ==null) + if (builder.leaseRejectAction == null) { throw new IllegalArgumentException("Lease reject action must be non-null"); + } this.builder = builder; this.maxConcurrent = builder.maxConcurrent; ThreadFactory threadFactory = ThreadFactoryBuilder.newBuilder().withNameFormat("fenzo-worker-%d").build(); @@ -524,7 +554,7 @@ private TaskScheduler(Builder builder) { assignableVMs = new AssignableVMs(taskTracker, builder.leaseRejectAction, builder.preferentialNamedConsumableResourceEvaluator, builder.leaseOfferExpirySecs, builder.maxOffersToReject, builder.autoScaleByAttributeName, builder.singleOfferMode, builder.autoScaleByAttributeName); - if(builder.autoScaleByAttributeName != null && !builder.autoScaleByAttributeName.isEmpty()) { + if (builder.autoScaleByAttributeName != null && !builder.autoScaleByAttributeName.isEmpty()) { ScaleDownConstraintExecutor scaleDownConstraintExecutor = builder.scaleDownOrderEvaluator == null ? null : new ScaleDownConstraintExecutor(builder.scaleDownOrderEvaluator, builder.weightedScaleDownConstraintEvaluators); @@ -533,24 +563,28 @@ private TaskScheduler(Builder builder) { builder.autoScaleRules, assignableVMs, builder.disableShortfallEvaluation, assignableVMs.getActiveVmGroups(), assignableVMs.getVmCollection(), scaleDownConstraintExecutor); - if(builder.autoscalerCallback != null) + if (builder.autoscalerCallback != null) { autoScaler.setCallback(builder.autoscalerCallback); - if(builder.delayAutoscaleDownBySecs > 0L) + } + if (builder.delayAutoscaleDownBySecs > 0L) { autoScaler.setDelayScaleDownBySecs(builder.delayAutoscaleDownBySecs); - if(builder.delayAutoscaleUpBySecs > 0L) + } + if (builder.delayAutoscaleUpBySecs > 0L) { autoScaler.setDelayScaleUpBySecs(builder.delayAutoscaleUpBySecs); + } if (builder.disabledVmDurationInSecs > 0L) { autoScaler.setDisabledVmDurationInSecs(builder.disabledVmDurationInSecs); } + } else { + autoScaler = null; } - else { - autoScaler=null; - } + assignableVMsEvaluator = builder.assignableVMsEvaluator == null ? avms -> avms : builder.assignableVMsEvaluator; } void checkIfShutdown() throws IllegalStateException { - if(isShutdown.get()) + if (isShutdown.get()) { throw new IllegalStateException("TaskScheduler already shutdown"); + } } /** @@ -565,8 +599,9 @@ void checkIfShutdown() throws IllegalStateException { */ public void setAutoscalerCallback(Action1 callback) throws IllegalStateException { checkIfShutdown(); - if(autoScaler==null) + if (autoScaler == null) { throw new IllegalStateException("No autoScaler setup"); + } autoScaler.setCallback(callback); } @@ -575,14 +610,14 @@ public TaskTracker getTaskTracker() { } private TaskAssignmentResult getSuccessfulResult(List results) { - double bestFitness=0.0; - TaskAssignmentResult bestResult=null; - for(int r=results.size()-1; r>=0; r--) { + double bestFitness = 0.0; + TaskAssignmentResult bestResult = null; + for (int r = results.size() - 1; r >= 0; r--) { // change to using fitness value from assignment result TaskAssignmentResult res = results.get(r); - if(res!=null && res.isSuccessful()) { - if(bestResult==null || res.getFitness()>bestFitness || - (res.getFitness()==bestFitness && res.getHostname().compareTo(bestResult.getHostname())<0)) { + if (res != null && res.isSuccessful()) { + if (bestResult == null || res.getFitness() > bestFitness || + (res.getFitness() == bestFitness && res.getHostname().compareTo(bestResult.getHostname()) < 0)) { bestFitness = res.getFitness(); bestResult = res; } @@ -600,7 +635,7 @@ private boolean isGoodEnough(TaskAssignmentResult result) { * * @return current mapping of resource allocations * @see Resource Allocation - * Limits + * Limits */ public Map getResAllocs() { return resAllocsEvaluator.getResAllocs(); @@ -611,7 +646,7 @@ public Map getResAllocs() { * * @param resAllocs the resource allocation to add or replace * @see Resource Allocation - * Limits + * Limits */ public void addOrReplaceResAllocs(ResAllocs resAllocs) { resAllocsEvaluator.replaceResAllocs(resAllocs); @@ -622,7 +657,7 @@ public void addOrReplaceResAllocs(ResAllocs resAllocs) { * * @param groupName the name of the resource allocation to remove * @see Resource Allocation - * Limits + * Limits */ public void removeResAllocs(String groupName) { resAllocsEvaluator.remResAllocs(groupName); @@ -635,8 +670,9 @@ public void removeResAllocs(String groupName) { * @see Autoscaling */ public Collection getAutoScaleRules() { - if(autoScaler==null) + if (autoScaler == null) { return Collections.emptyList(); + } return autoScaler.getRules(); } @@ -666,8 +702,9 @@ public void removeAutoScaleRule(String ruleName) { } /* package */ void setTaskToClusterAutoScalerMapGetter(Func1> getter) { - if (autoScaler != null) + if (autoScaler != null) { autoScaler.setTaskToClustersGetter(getter); + } } /* package */ AutoScaler getAutoScaler() { @@ -706,65 +743,65 @@ public void removeAutoScaleRule(String ruleName) { * plugins. The scheduling routine stops upon catching any unexpected exceptions. These exceptions are surfaced to * you in one or both of two ways. *

* If there are exceptions, the internal state of Fenzo may be corrupt with no way to undo any partial effects. * - * @param requests a list of task requests to match with resources, in their given order + * @param requests a list of task requests to match with resources, in their given order * @param newLeases new resource leases from hosts that the scheduler can use along with any previously * ununsed leases * @return a {@link SchedulingResult} object that contains a task assignment results map and other summaries * @throws IllegalStateException if you call this method concurrently, or, if you try to add an existing lease - * again, or, if there was unexpected exception during the scheduling iteration, or, if using - * {@link TaskSchedulingService}, which will instead invoke scheduling from within. Unexpected exceptions - * can arise from uncaught exceptions in user defined plugins. It is also thrown if the scheduler has been shutdown - * via the {@link #shutdown()} method. + * again, or, if there was unexpected exception during the scheduling iteration, or, if using + * {@link TaskSchedulingService}, which will instead invoke scheduling from within. Unexpected exceptions + * can arise from uncaught exceptions in user defined plugins. It is also thrown if the scheduler has been shutdown + * via the {@link #shutdown()} method. */ public SchedulingResult scheduleOnce( List requests, List newLeases) throws IllegalStateException { - if (usingSchedulingService) + if (usingSchedulingService) { throw new IllegalStateException(usingSchedSvcMesg); + } final Iterator iterator = requests != null ? requests.iterator() : Collections.emptyIterator(); - TaskIterator taskIterator = new TaskIterator() { - @Override - public Assignable next() { - if (iterator.hasNext()) - return Assignable.success(iterator.next()); - return null; + TaskIterator taskIterator = () -> { + if (iterator.hasNext()) { + return Assignable.success(iterator.next()); } + return null; }; return scheduleOnce(taskIterator, newLeases); } /** * Variant of {@link #scheduleOnce(List, List)} that takes a task iterator instead of task list. + * * @param taskIterator Iterator for tasks to assign resources to. - * @param newLeases new resource leases from hosts that the scheduler can use along with any previously - * ununsed leases + * @param newLeases new resource leases from hosts that the scheduler can use along with any previously + * ununsed leases * @return a {@link SchedulingResult} object that contains a task assignment results map and other summaries * @throws IllegalStateException if you call this method concurrently, or, if you try to add an existing lease - * again, or, if there was unexpected exception during the scheduling iteration. For example, unexpected exceptions - * can arise from uncaught exceptions in user defined plugins. It is also thrown if the scheduler has been shutdown - * via the {@link #shutdown()} method. + * again, or, if there was unexpected exception during the scheduling iteration. For example, unexpected exceptions + * can arise from uncaught exceptions in user defined plugins. It is also thrown if the scheduler has been shutdown + * via the {@link #shutdown()} method. */ /* package */ SchedulingResult scheduleOnce( TaskIterator taskIterator, List newLeases) throws IllegalStateException { checkIfShutdown(); - try (AutoCloseable ac = stateMonitor.enter()) { + try (AutoCloseable ignored = stateMonitor.enter()) { return doScheduling(taskIterator, newLeases); } catch (Exception e) { logger.error("Error with scheduling run: " + e.getMessage(), e); - if(e instanceof IllegalStateException) - throw (IllegalStateException)e; - else { + if (e instanceof IllegalStateException) { + throw (IllegalStateException) e; + } else { logger.warn("Unexpected exception: " + e.getMessage()); throw new IllegalStateException("Unexpected exception during scheduling run: " + e.getMessage(), e); } @@ -774,6 +811,7 @@ public Assignable next() { /** * Variant of {@link #scheduleOnce(List, List)} that should be only used to schedule a pseudo iteration as it * ignores the StateMonitor lock. + * * @param taskIterator Iterator for tasks to assign resources to. * @return a {@link SchedulingResult} object that contains a task assignment results map and other summaries */ @@ -785,11 +823,11 @@ private SchedulingResult doScheduling(TaskIterator taskIterator, List newLeases) throws Exception { long start = System.currentTimeMillis(); final SchedulingResult schedulingResult = doSchedule(taskIterator, newLeases); - if((lastVMPurgeAt + purgeVMsIntervalSecs*1000) < System.currentTimeMillis()) { + if ((lastVMPurgeAt + purgeVMsIntervalSecs * 1000) < System.currentTimeMillis()) { lastVMPurgeAt = System.currentTimeMillis(); - logger.info("Purging inactive VMs"); + logger.debug("Purging inactive VMs"); assignableVMs.purgeInactiveVMs( // explicitly exclude VMs that have assignments - schedulingResult.getResultMap() == null? + schedulingResult.getResultMap() == null ? Collections.emptySet() : new HashSet<>(schedulingResult.getResultMap().keySet()) ); @@ -802,25 +840,28 @@ private SchedulingResult doSchedule( TaskIterator taskIterator, List newLeases) throws Exception { AtomicInteger rejectedCount = new AtomicInteger(); - List avms = assignableVMs.prepareAndGetOrderedVMs(newLeases, rejectedCount); - if(logger.isDebugEnabled()) - logger.debug("Got {} avms", avms.size()); + List originalVms = assignableVMs.prepareAndGetOrderedVMs(newLeases, rejectedCount); + List avms = assignableVMsEvaluator.call(originalVms); + if (logger.isDebugEnabled()) { + logger.debug("Original VMs: {}", originalVms); + logger.debug("VMs: {}", avms); + } + List inactiveAVMs = assignableVMs.getInactiveVMs(); - if(logger.isDebugEnabled()) - logger.debug("Found {} VMs with non-zero offers to assign from", avms.size()); final boolean hasResAllocs = resAllocsEvaluator.prepare(); //logger.info("Got " + avms.size() + " AVMs to schedule on"); - int totalNumAllocations=0; + int totalNumAllocations = 0; Set failedTasksForAutoScaler = new HashSet<>(); Map resultMap = new HashMap<>(avms.size()); final SchedulingResult schedulingResult = new SchedulingResult(resultMap); long taskBatchSize = builder.taskBatchSizeSupplier.get(); long tasksIterationCount = 0; - if(avms.isEmpty()) { + if (avms.isEmpty()) { while (true) { final Assignable taskOrFailure = taskIterator.next(); - if (taskOrFailure == null) + if (taskOrFailure == null) { break; + } failedTasksForAutoScaler.add(taskOrFailure.getTask()); } } else { @@ -831,102 +872,106 @@ private SchedulingResult doSchedule( break; } final Assignable taskOrFailure = taskIterator.next(); - if(logger.isDebugEnabled()) - logger.debug("TaskSched: task=" + (taskOrFailure == null? "null" : taskOrFailure.getTask().getId())); - if (taskOrFailure == null) + if (logger.isDebugEnabled()) { + logger.debug("TaskSched: task=" + (taskOrFailure == null ? "null" : taskOrFailure.getTask().getId())); + } + if (taskOrFailure == null) { break; - if(taskOrFailure.hasFailure()) { + } + if (taskOrFailure.hasFailure()) { schedulingResult.addFailures( taskOrFailure.getTask(), Collections.singletonList(new TaskAssignmentResult( - assignableVMs.getDummyVM(), - taskOrFailure.getTask(), - false, - Collections.singletonList(taskOrFailure.getAssignmentFailure()), - null, - 0 - ) - )); + assignableVMs.getDummyVM(), + taskOrFailure.getTask(), + false, + Collections.singletonList(taskOrFailure.getAssignmentFailure()), + null, + 0 + ) + )); continue; } TaskRequest task = taskOrFailure.getTask(); failedTasksForAutoScaler.add(task); - if(hasResAllocs) { - if(resAllocsEvaluator.taskGroupFailed(task.taskGroupName())) { - if(logger.isDebugEnabled()) + if (hasResAllocs) { + if (resAllocsEvaluator.taskGroupFailed(task.taskGroupName())) { + if (logger.isDebugEnabled()) { logger.debug("Resource allocation limits reached for task: " + task.getId()); + } continue; } final AssignmentFailure resAllocsFailure = resAllocsEvaluator.hasResAllocs(task); - if(resAllocsFailure != null) { + if (resAllocsFailure != null) { final List failures = Collections.singletonList(new TaskAssignmentResult(assignableVMs.getDummyVM(), task, false, Collections.singletonList(resAllocsFailure), null, 0.0)); schedulingResult.addFailures(task, failures); failedTasksForAutoScaler.remove(task); // don't scale up for resAllocs failures - if(logger.isDebugEnabled()) + if (logger.isDebugEnabled()) { logger.debug("Resource allocation limit reached for task " + task.getId() + ": " + resAllocsFailure); + } continue; } } final AssignmentFailure maxResourceFailure = assignableVMs.getFailedMaxResource(null, task); - if(maxResourceFailure != null) { + if (maxResourceFailure != null) { final List failures = Collections.singletonList(new TaskAssignmentResult(assignableVMs.getDummyVM(), task, false, Collections.singletonList(maxResourceFailure), null, 0.0)); schedulingResult.addFailures(task, failures); - if(logger.isDebugEnabled()) + if (logger.isDebugEnabled()) { logger.debug("Task {}: maxResource failure: {}", task.getId(), maxResourceFailure); + } continue; } // create batches of VMs to evaluate assignments concurrently across the batches final BlockingQueue virtualMachines = new ArrayBlockingQueue<>(avms.size(), false, avms); - int nThreads = (int)Math.ceil((double)avms.size()/ PARALLEL_SCHED_EVAL_MIN_BATCH_SIZE); + int nThreads = (int) Math.ceil((double) avms.size() / PARALLEL_SCHED_EVAL_MIN_BATCH_SIZE); List> futures = new ArrayList<>(); - if(logger.isDebugEnabled()) + if (logger.isDebugEnabled()) { logger.debug("Launching {} threads for evaluating assignments for task {}", nThreads, task.getId()); - for(int b = 0; b() { - @Override - public EvalResult call() throws Exception { - return evalAssignments(task, virtualMachines); - } - })); + } + for (int b = 0; b < nThreads && b < maxConcurrent; b++) { + futures.add(executorService.submit(() -> evalAssignments(task, virtualMachines))); } List results = new ArrayList<>(); List bestResults = new ArrayList<>(); - for(Future f: futures) { + for (Future f : futures) { try { EvalResult evalResult = f.get(); - if(evalResult.exception!=null) { + if (evalResult.exception != null) { logger.warn("Error during concurrent task assignment eval - " + evalResult.exception.getMessage(), evalResult.exception); schedulingResult.addException(evalResult.exception); - } - else { + } else { results.add(evalResult); bestResults.add(evalResult.result); - if(logger.isDebugEnabled()) + if (logger.isDebugEnabled()) { logger.debug("Task {}: best result so far: {}", task.getId(), evalResult.result); + } totalNumAllocations += evalResult.numAllocationTrials; } - } catch (InterruptedException|ExecutionException e) { + } catch (InterruptedException | ExecutionException e) { logger.error("Unexpected during concurrent task assignment eval - " + e.getMessage(), e); } } - if(!schedulingResult.getExceptions().isEmpty()) + if (!schedulingResult.getExceptions().isEmpty()) { break; + } TaskAssignmentResult successfulResult = getSuccessfulResult(bestResults); List failures = new ArrayList<>(); - if(successfulResult == null) { - if(logger.isDebugEnabled()) + if (successfulResult == null) { + if (logger.isDebugEnabled()) { logger.debug("Task {}: no successful results", task.getId()); - for(EvalResult er: results) + } + for (EvalResult er : results) { failures.addAll(er.assignmentResults); + } schedulingResult.addFailures(task, failures); - } - else { - if(logger.isDebugEnabled()) + } else { + if (logger.isDebugEnabled()) { logger.debug("Task {}: found successful assignment on host {}", task.getId(), successfulResult.getHostname()); + } successfulResult.assignResult(); tasksIterationCount++; failedTasksForAutoScaler.remove(task); @@ -938,13 +983,14 @@ public EvalResult call() throws Exception { } } List idleResourcesList = new ArrayList<>(); - if(schedulingResult.getExceptions().isEmpty()) { + if (schedulingResult.getExceptions().isEmpty()) { List expirableLeases = new ArrayList<>(); for (AssignableVirtualMachine avm : avms) { VMAssignmentResult assignmentResult = avm.resetAndGetSuccessfullyAssignedRequests(); if (assignmentResult == null) { - if (!avm.hasPreviouslyAssignedTasks()) + if (!avm.hasPreviouslyAssignedTasks()) { idleResourcesList.add(avm.getCurrTotalLease()); + } expirableLeases.add(avm.getCurrTotalLease()); } else { resultMap.put(avm.getHostname(), assignmentResult); @@ -958,9 +1004,10 @@ public EvalResult call() throws Exception { .collect(Collectors.toList()); rejectedCount.addAndGet(assignableVMs.removeLimitedLeases(expirableLeases)); - final AutoScalerInput autoScalerInput = new AutoScalerInput(idleResourcesList, idleInactiveAVMs, failedTasksForAutoScaler); - if (autoScaler != null) + if (autoScaler != null) { + AutoScalerInput autoScalerInput = new AutoScalerInput(idleResourcesList, idleInactiveAVMs, failedTasksForAutoScaler); autoScaler.doAutoscale(autoScalerInput); + } } schedulingResult.setLeasesAdded(newLeases.size()); schedulingResult.setLeasesRejected(rejectedCount.get()); @@ -971,7 +1018,7 @@ public EvalResult call() throws Exception { } /* package */ Map> createPseudoHosts(Map groupCounts) { - return assignableVMs.createPseudoHosts(groupCounts, autoScaler == null? name -> null : autoScaler::getRule); + return assignableVMs.createPseudoHosts(groupCounts, autoScaler == null ? name -> null : autoScaler::getRule); } /* package */ void removePseudoHosts(Map> hostsMap) { @@ -988,21 +1035,22 @@ public EvalResult call() throws Exception { * information. Scheduling runs are blocked around the lock. * * @return a Map of state information with the hostname as the key and a Map of resource state as the value. - * The resource state Map contains a resource as the key and a two element Double array - the first - * element of which contains the amount of the resource used and the second element contains the - * amount still available (available does not include used). - * @see How to Learn Which Resources Are Available on Which Hosts + * The resource state Map contains a resource as the key and a two element Double array - the first + * element of which contains the amount of the resource used and the second element contains the + * amount still available (available does not include used). * @throws IllegalStateException if called concurrently with {@link #scheduleOnce(List, List)} or if called when - * using a {@link TaskSchedulingService}. + * using a {@link TaskSchedulingService}. + * @see How to Learn Which Resources Are Available on Which Hosts */ public Map> getResourceStatus() throws IllegalStateException { - if (usingSchedulingService) + if (usingSchedulingService) { throw new IllegalStateException(usingSchedSvcMesg); + } return getResourceStatusIntl(); } /* package */ Map> getResourceStatusIntl() { - try (AutoCloseable ac = stateMonitor.enter()) { + try (AutoCloseable ignored = stateMonitor.enter()) { return assignableVMs.getResourceStatus(); } catch (Exception e) { logger.error("Unexpected error from state monitor: " + e.getMessage()); @@ -1017,20 +1065,20 @@ public Map> getResourceStatus() throws Illegal * * @return a list containing the current state of all known VMs * @throws IllegalStateException if called concurrently with {@link #scheduleOnce(List, List)} or if called when - * using a {@link TaskSchedulingService}. + * using a {@link TaskSchedulingService}. * @see How to Learn the Amount of Resources Currently Available on Particular Hosts */ public List getVmCurrentStates() throws IllegalStateException { - if (usingSchedulingService) + if (usingSchedulingService) { throw new IllegalStateException(usingSchedSvcMesg); + } return getVmCurrentStatesIntl(); } /* package */ List getVmCurrentStatesIntl() throws IllegalStateException { - try (AutoCloseable ac = stateMonitor.enter()) { + try (AutoCloseable ignored = stateMonitor.enter()) { return assignableVMs.getVmCurrentStates(); - } - catch (Exception e) { + } catch (Exception e) { logger.error("Unexpected error from state monitor: " + e.getMessage(), e); throw new IllegalStateException(e); } @@ -1041,31 +1089,31 @@ private EvalResult evalAssignments(TaskRequest task, BlockingQueue buf = new ArrayList<>(N); List results = new ArrayList<>(); - while(true) { + while (true) { buf.clear(); int n = virtualMachines.drainTo(buf, N); - if(n == 0) + if (n == 0) { return new EvalResult(results, getSuccessfulResult(results), results.size(), null); - for(int m=0; m * In addition, in your framework's task completion callback that you supply to Mesos, you must call your * task scheduler's {@link #getTaskUnAssigner() getTaskUnassigner().call()} method to notify Fenzo that the @@ -1140,21 +1190,19 @@ public void expireAllLeases() throws IllegalStateException { * @throws IllegalStateException if the scheduler is shutdown via the {@link #isShutdown} method. */ public Action2 getTaskAssigner() throws IllegalStateException { - if (usingSchedulingService) + if (usingSchedulingService) { throw new IllegalStateException(usingSchedSvcMesg); + } return getTaskAssignerIntl(); } /* package */Action2 getTaskAssignerIntl() throws IllegalStateException { - return new Action2() { - @Override - public void call(TaskRequest request, String hostname) { - try (AutoCloseable ac = stateMonitor.enter()) { - assignableVMs.setTaskAssigned(request, hostname); - } catch (Exception e) { - logger.error("Unexpected error from state monitor: " + e.getMessage(), e); - throw new IllegalStateException(e); - } + return (request, hostname) -> { + try (AutoCloseable ignored = stateMonitor.enter()) { + assignableVMs.setTaskAssigned(request, hostname); + } catch (Exception e) { + logger.error("Unexpected error from state monitor: " + e.getMessage(), e); + throw new IllegalStateException(e); } }; } @@ -1184,46 +1232,38 @@ public void call(TaskRequest request, String hostname) { * @throws IllegalStateException if the scheduler is shutdown via the {@link #isShutdown} method. */ public Action2 getTaskUnAssigner() throws IllegalStateException { - return new Action2() { - @Override - public void call(String taskId, String hostname) { - assignableVMs.unAssignTask(taskId, hostname); - } - }; + return assignableVMs::unAssignTask; } /** - * Disable the virtual machine with the specified hostname. If the scheduler is not yet aware of the host - * with that hostname, it creates a new object for it, and therefore your disabling of it will be remembered - * when offers that concern that host come in later. The scheduler will not use disabled hosts for + * Disable the virtual machine with the specified hostname. The scheduler will not use disabled hosts for * allocating resources to tasks. * - * @param hostname the name of the host to disable + * @param hostname the name of the host to disable * @param durationMillis the length of time, starting from now, in milliseconds, during which the host will - * be disabled + * be disabled * @throws IllegalStateException if the scheduler is shutdown via the {@link #isShutdown} method. */ public void disableVM(String hostname, long durationMillis) throws IllegalStateException { - logger.info("Disable VM " + hostname + " for " + durationMillis + " millis"); - assignableVMs.disableUntil(hostname, System.currentTimeMillis()+durationMillis); + logger.debug("Disable VM " + hostname + " for " + durationMillis + " millis"); + assignableVMs.disableUntil(hostname, System.currentTimeMillis() + durationMillis); } /** - * Disable the virtual machine with the specified ID. If the scheduler is not yet aware of the host with - * that hostname, it creates a new object for it, and therefore your disabling of it will be remembered when - * offers that concern that host come in later. The scheduler will not use disabled hosts for allocating + * Disable the virtual machine with the specified ID. The scheduler will not use disabled hosts for allocating * resources to tasks. * - * @param vmID the ID of the host to disable + * @param vmID the ID of the host to disable * @param durationMillis the length of time, starting from now, in milliseconds, during which the host will - * be disabled + * be disabled * @return {@code true} if the ID matches a known VM, {@code false} otherwise. * @throws IllegalStateException if the scheduler is shutdown via the {@link #isShutdown} method. */ public boolean disableVMByVMId(String vmID, long durationMillis) throws IllegalStateException { final String hostname = assignableVMs.getHostnameFromVMId(vmID); - if(hostname == null) + if (hostname == null) { return false; + } disableVM(hostname, durationMillis); return true; } @@ -1236,7 +1276,7 @@ public boolean disableVMByVMId(String vmID, long durationMillis) throws IllegalS * @throws IllegalStateException if the scheduler is shutdown via the {@link #isShutdown} method. */ public void enableVM(String hostname) throws IllegalStateException { - logger.info("Enabling VM " + hostname); + logger.debug("Enabling VM " + hostname); assignableVMs.enableVM(hostname); } @@ -1258,7 +1298,7 @@ public void setActiveVmGroupAttributeName(String attributeName) { * to be enabled. * * @param vmGroups a list of VM group names that the scheduler is to consider to be enabled, or {@code null} - * if the scheduler is to consider every group to be enabled + * if the scheduler is to consider every group to be enabled */ public void setActiveVmGroups(List vmGroups) { assignableVMs.setActiveVmGroups(vmGroups); @@ -1268,10 +1308,11 @@ public void setActiveVmGroups(List vmGroups) { * Mark task scheduler as shutdown and shutdown any thread pool executors created. */ public void shutdown() { - if(isShutdown.compareAndSet(false, true)) { + if (isShutdown.compareAndSet(false, true)) { executorService.shutdown(); - if(autoScaler != null) + if (autoScaler != null) { autoScaler.shutdown(); + } } } } diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/TaskSchedulingService.java b/fenzo-core/src/main/java/com/netflix/fenzo/TaskSchedulingService.java index 1f9c21f..49a254a 100644 --- a/fenzo-core/src/main/java/com/netflix/fenzo/TaskSchedulingService.java +++ b/fenzo-core/src/main/java/com/netflix/fenzo/TaskSchedulingService.java @@ -136,12 +136,7 @@ private TaskSchedulingService(Builder builder) { * new leases. */ public void start() { - executorService.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - TaskSchedulingService.this.scheduleOnce(); - } - }, 0, loopIntervalMillis, TimeUnit.MILLISECONDS); + executorService.scheduleWithFixedDelay(TaskSchedulingService.this::scheduleOnce, 0, loopIntervalMillis, TimeUnit.MILLISECONDS); } /** diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/queues/tiered/Tier.java b/fenzo-core/src/main/java/com/netflix/fenzo/queues/tiered/Tier.java index 6fd2da5..a7beef6 100644 --- a/fenzo-core/src/main/java/com/netflix/fenzo/queues/tiered/Tier.java +++ b/fenzo-core/src/main/java/com/netflix/fenzo/queues/tiered/Tier.java @@ -300,7 +300,7 @@ public void setTotalResources(Map totalResourcesMap) { for (QueueBucket b : sortedBuckets.getSortedList()) { b.setTotalResources(tierResources); } - logger.info("Re-sorting buckets in tier " + tierNumber + " after totals changed"); + logger.debug("Re-sorting buckets in tier " + tierNumber + " after totals changed"); sortedBuckets.resort(); } } diff --git a/fenzo-core/src/test/java/com/netflix/fenzo/BinPackingSchedulerTests.java b/fenzo-core/src/test/java/com/netflix/fenzo/BinPackingSchedulerTests.java index 36b71a7..7f4d75a 100644 --- a/fenzo-core/src/test/java/com/netflix/fenzo/BinPackingSchedulerTests.java +++ b/fenzo-core/src/test/java/com/netflix/fenzo/BinPackingSchedulerTests.java @@ -45,12 +45,7 @@ private TaskScheduler getScheduler(VMTaskFitnessCalculator fitnessCalculator) { return new TaskScheduler.Builder() .withFitnessCalculator(fitnessCalculator) .withLeaseOfferExpirySecs(1000000) - .withLeaseRejectAction(new Action1() { - @Override - public void call(VirtualMachineLease virtualMachineLease) { - logger.info("Rejecting lease on " + virtualMachineLease.hostname()); - } - }) + .withLeaseRejectAction(virtualMachineLease -> logger.info("Rejecting lease on " + virtualMachineLease.hostname())) .build(); } diff --git a/fenzo-core/src/test/java/com/netflix/fenzo/TaskSchedulingServiceTest.java b/fenzo-core/src/test/java/com/netflix/fenzo/TaskSchedulingServiceTest.java index 7932ebb..5a29f2b 100644 --- a/fenzo-core/src/test/java/com/netflix/fenzo/TaskSchedulingServiceTest.java +++ b/fenzo-core/src/test/java/com/netflix/fenzo/TaskSchedulingServiceTest.java @@ -16,23 +16,35 @@ package com.netflix.fenzo; -import com.netflix.fenzo.functions.Action0; -import com.netflix.fenzo.functions.Action1; -import com.netflix.fenzo.plugins.BinPackingFitnessCalculators; -import com.netflix.fenzo.queues.*; -import com.netflix.fenzo.queues.tiered.QueuableTaskProvider; -import org.junit.Assert; -import org.junit.Test; - -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import com.netflix.fenzo.functions.Action0; +import com.netflix.fenzo.functions.Action1; +import com.netflix.fenzo.functions.Func1; +import com.netflix.fenzo.plugins.BinPackingFitnessCalculators; +import com.netflix.fenzo.queues.QAttributes; +import com.netflix.fenzo.queues.QueuableTask; +import com.netflix.fenzo.queues.TaskQueue; +import com.netflix.fenzo.queues.TaskQueueException; +import com.netflix.fenzo.queues.TaskQueueMultiException; +import com.netflix.fenzo.queues.TaskQueues; +import com.netflix.fenzo.queues.tiered.QueuableTaskProvider; +import org.junit.Assert; +import org.junit.Test; public class TaskSchedulingServiceTest { @@ -52,11 +64,8 @@ private TaskSchedulingService getSchedulingService(TaskQueue queue, TaskSchedule .withTaskQueue(queue) .withLoopIntervalMillis(loopMillis) .withMaxDelayMillis(maxDelayMillis) - .withPreSchedulingLoopHook(new Action0() { - @Override - public void call() { - //System.out.println("Pre-scheduling hook"); - } + .withPreSchedulingLoopHook(() -> { + //System.out.println("Pre-scheduling hook"); }) .withSchedulingResultCallback(resultCallback) .withTaskScheduler(scheduler) @@ -64,15 +73,15 @@ public void call() { } public TaskScheduler getScheduler() { + return getScheduler(avms -> avms); + } + + public TaskScheduler getScheduler(Func1, List> assignableVMsEvaluator) { return new TaskScheduler.Builder() .withLeaseOfferExpirySecs(1000000) - .withLeaseRejectAction(new Action1() { - @Override - public void call(VirtualMachineLease virtualMachineLease) { - System.out.println("Rejecting offer on host " + virtualMachineLease.hostname()); - } - }) + .withLeaseRejectAction(virtualMachineLease -> System.out.println("Rejecting offer on host " + virtualMachineLease.hostname())) .withFitnessCalculator(BinPackingFitnessCalculators.cpuMemBinPacker) + .withAssignableVMsEvaluator(assignableVMsEvaluator) .build(); } @@ -88,15 +97,12 @@ private void testOneTaskInternal(QueuableTask queuableTask, Action0 action) thro final CountDownLatch latch = new CountDownLatch(1); TaskQueue queue = TaskQueues.createTieredQueue(2); final TaskScheduler scheduler = getScheduler(); - Action1 resultCallback = new Action1() { - @Override - public void call(SchedulingResult schedulingResult) { - //System.out.println("Got scheduling result with " + schedulingResult.getResultMap().size() + " results"); - if (schedulingResult.getResultMap().size() > 0) { - //System.out.println("Assignment on host " + schedulingResult.getResultMap().values().iterator().next().getHostname()); - latch.countDown(); - scheduler.shutdown(); - } + Action1 resultCallback = schedulingResult -> { + //System.out.println("Got scheduling result with " + schedulingResult.getResultMap().size() + " results"); + if (schedulingResult.getResultMap().size() > 0) { + //System.out.println("Assignment on host " + schedulingResult.getResultMap().values().iterator().next().getHostname()); + latch.countDown(); + scheduler.shutdown(); } }; final TaskSchedulingService schedulingService = getSchedulingService(queue, scheduler, 1000L, resultCallback); @@ -144,30 +150,27 @@ public void testMultipleTaskAssignments() throws Exception { final CountDownLatch latch = new CountDownLatch(numTasks); final TaskScheduler scheduler = getScheduler(); final AtomicReference ref = new AtomicReference<>(); - Action1 resultCallback = new Action1() { - @Override - public void call(SchedulingResult schedulingResult) { - //System.out.println("Got scheduling result with " + schedulingResult.getResultMap().size() + " results"); - if (!schedulingResult.getExceptions().isEmpty()) { - Assert.fail(schedulingResult.getExceptions().get(0).getMessage()); - } - else if (schedulingResult.getResultMap().size() > 0) { - final VMAssignmentResult vmAssignmentResult = schedulingResult.getResultMap().values().iterator().next(); + Action1 resultCallback = schedulingResult -> { + //System.out.println("Got scheduling result with " + schedulingResult.getResultMap().size() + " results"); + if (!schedulingResult.getExceptions().isEmpty()) { + Assert.fail(schedulingResult.getExceptions().get(0).getMessage()); + } + else if (schedulingResult.getResultMap().size() > 0) { + final VMAssignmentResult vmAssignmentResult = schedulingResult.getResultMap().values().iterator().next(); // System.out.println("Assignment on host " + vmAssignmentResult.getHostname() + // " with " + vmAssignmentResult.getTasksAssigned().size() + " tasks" // ); - for (TaskAssignmentResult r: vmAssignmentResult.getTasksAssigned()) { - latch.countDown(); - } - ref.get().addLeases( - Collections.singletonList(LeaseProvider.getConsumedLease(vmAssignmentResult)) - ); + for (TaskAssignmentResult r: vmAssignmentResult.getTasksAssigned()) { + latch.countDown(); } - else { - final Map> failures = schedulingResult.getFailures(); - if (!failures.isEmpty()) { - Assert.fail(failures.values().iterator().next().iterator().next().toString()); - } + ref.get().addLeases( + Collections.singletonList(LeaseProvider.getConsumedLease(vmAssignmentResult)) + ); + } + else { + final Map> failures = schedulingResult.getFailures(); + if (!failures.isEmpty()) { + Assert.fail(failures.values().iterator().next().iterator().next().toString()); } } }; @@ -189,25 +192,22 @@ public void testOrderedAssignments() throws Exception { TaskQueue queue = TaskQueues.createTieredQueue(2); final TaskScheduler scheduler = getScheduler(); final BlockingQueue assignmentResults = new LinkedBlockingQueue<>(); - Action1 resultCallback = new Action1() { - @Override - public void call(SchedulingResult schedulingResult) { - final Map resultMap = schedulingResult.getResultMap(); - if (!resultMap.isEmpty()) { - for (VMAssignmentResult r: resultMap.values()) { - for (TaskAssignmentResult t: r.getTasksAssigned()) { - assignmentResults.offer((QueuableTask)t.getRequest()); - //System.out.println("******* Assignment for task " + t.getTaskId()); - } + Action1 resultCallback = schedulingResult -> { + final Map resultMap = schedulingResult.getResultMap(); + if (!resultMap.isEmpty()) { + for (VMAssignmentResult r: resultMap.values()) { + for (TaskAssignmentResult t: r.getTasksAssigned()) { + assignmentResults.offer((QueuableTask)t.getRequest()); + //System.out.println("******* Assignment for task " + t.getTaskId()); } } + } // final Map> failures = schedulingResult.getFailures(); // if (!failures.isEmpty()) { // for (Map.Entry> entry: failures.entrySet()) { // System.out.println("****** failures for task " + entry.getKey().getId()); // } // } - } }; final TaskSchedulingService schedulingService = getSchedulingService(queue, scheduler, 50L, resultCallback); // First, fill 4 VMs, each with 8 cores, with A using 15 cores, B using 6 cores, and C using 11 cores, with @@ -265,25 +265,22 @@ public void testMultiTierAllocation() throws Exception { TaskQueue queue = TaskQueues.createTieredQueue(2); final TaskScheduler scheduler = getScheduler(); final BlockingQueue assignmentResults = new LinkedBlockingQueue<>(); - Action1 resultCallback = new Action1() { - @Override - public void call(SchedulingResult schedulingResult) { - final Map resultMap = schedulingResult.getResultMap(); - if (!resultMap.isEmpty()) { - for (VMAssignmentResult r: resultMap.values()) { - for (TaskAssignmentResult t: r.getTasksAssigned()) { - assignmentResults.offer((QueuableTask)t.getRequest()); - //System.out.println("******* Assignment for task " + t.getTaskId()); - } + Action1 resultCallback = schedulingResult -> { + final Map resultMap = schedulingResult.getResultMap(); + if (!resultMap.isEmpty()) { + for (VMAssignmentResult r: resultMap.values()) { + for (TaskAssignmentResult t: r.getTasksAssigned()) { + assignmentResults.offer((QueuableTask)t.getRequest()); + //System.out.println("******* Assignment for task " + t.getTaskId()); } } + } // final Map> failures = schedulingResult.getFailures(); // if (!failures.isEmpty()) { // for (Map.Entry> entry: failures.entrySet()) { // System.out.println("****** failures for task " + entry.getKey().getId()); // } // } - } }; final TaskSchedulingService schedulingService = getSchedulingService(queue, scheduler, 50L, resultCallback); // fill 4 hosts with tasks from A (tier 0) and tasks from D1 (tier 1) @@ -313,17 +310,14 @@ public void call(SchedulingResult schedulingResult) { Assert.assertEquals(tier1bktA.getBucketName(), task.getQAttributes().getBucketName()); final CountDownLatch latch = new CountDownLatch(1); final AtomicReference>> ref = new AtomicReference<>(); - schedulingService.requestAllTasks(new Action1>>() { - @Override - public void call(Map> stateCollectionMap) { - //System.out.println("**************** Got tasks collection"); - final Collection tasks = stateCollectionMap.get(TaskQueue.TaskState.QUEUED); - //System.out.println("********* size=" + tasks.size()); + schedulingService.requestAllTasks(stateCollectionMap -> { + //System.out.println("**************** Got tasks collection"); + final Collection tasks = stateCollectionMap.get(TaskQueue.TaskState.QUEUED); + //System.out.println("********* size=" + tasks.size()); // if (!tasks.isEmpty()) // System.out.println("******** bucket: " + tasks.iterator().next().getQAttributes().getBucketName()); - ref.set(stateCollectionMap); - latch.countDown(); - } + ref.set(stateCollectionMap); + latch.countDown(); }); if (!latch.await(1000, TimeUnit.MILLISECONDS)) Assert.fail("Time out waiting for tasks collection"); @@ -341,25 +335,22 @@ public void testMultiResAllocation() throws Exception { final TaskScheduler scheduler = getScheduler(); final BlockingQueue assignmentResults = new LinkedBlockingQueue<>(); final AtomicReference ref = new AtomicReference<>(); - Action1 resultCallback = new Action1() { - @Override - public void call(SchedulingResult schedulingResult) { - final Map resultMap = schedulingResult.getResultMap(); - if (!resultMap.isEmpty()) { - for (VMAssignmentResult r: resultMap.values()) { - for (TaskAssignmentResult t: r.getTasksAssigned()) { - assignmentResults.offer((QueuableTask)t.getRequest()); - } - ref.get().addLeases(Collections.singletonList(LeaseProvider.getConsumedLease(r))); + Action1 resultCallback = schedulingResult -> { + final Map resultMap = schedulingResult.getResultMap(); + if (!resultMap.isEmpty()) { + for (VMAssignmentResult r: resultMap.values()) { + for (TaskAssignmentResult t: r.getTasksAssigned()) { + assignmentResults.offer((QueuableTask)t.getRequest()); } + ref.get().addLeases(Collections.singletonList(LeaseProvider.getConsumedLease(r))); } + } // final Map> failures = schedulingResult.getFailures(); // if (!failures.isEmpty()) { // for (Map.Entry> entry: failures.entrySet()) { // System.out.println("****** failures for task " + entry.getKey().getId()); // } // } - } }; final TaskSchedulingService schedulingService = getSchedulingService(queue, scheduler, 50L, resultCallback); ref.set(schedulingService); @@ -405,15 +396,12 @@ public void call(SchedulingResult schedulingResult) { } final AtomicReference bucketRef = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); - schedulingService.requestAllTasks(new Action1>>() { - @Override - public void call(Map> stateCollectionMap) { - final Collection tasks = stateCollectionMap.get(TaskQueue.TaskState.QUEUED); - if (tasks != null && !tasks.isEmpty()) { - for (QueuableTask t : tasks) - bucketRef.set(t.getQAttributes().getBucketName()); - latch.countDown(); - } + schedulingService.requestAllTasks(stateCollectionMap -> { + final Collection tasks = stateCollectionMap.get(TaskQueue.TaskState.QUEUED); + if (tasks != null && !tasks.isEmpty()) { + for (QueuableTask t : tasks) + bucketRef.set(t.getQAttributes().getBucketName()); + latch.countDown(); } }); if (!latch.await(2000, TimeUnit.MILLISECONDS)) @@ -427,14 +415,11 @@ public void testRemoveFromQueue() throws Exception { final CountDownLatch latch = new CountDownLatch(1); TaskQueue queue = TaskQueues.createTieredQueue(2); final TaskScheduler scheduler = getScheduler(); - Action1 resultCallback = new Action1() { - @Override - public void call(SchedulingResult schedulingResult) { - //System.out.println("Got scheduling result with " + schedulingResult.getResultMap().size() + " results"); - if (schedulingResult.getResultMap().size() > 0) { - //System.out.println("Assignment on host " + schedulingResult.getResultMap().values().iterator().next().getHostname()); - latch.countDown(); - } + Action1 resultCallback = schedulingResult -> { + //System.out.println("Got scheduling result with " + schedulingResult.getResultMap().size() + " results"); + if (schedulingResult.getResultMap().size() > 0) { + //System.out.println("Assignment on host " + schedulingResult.getResultMap().values().iterator().next().getHostname()); + latch.countDown(); } }; final TaskSchedulingService schedulingService = getSchedulingService(queue, scheduler, 100L, 200L, resultCallback); @@ -447,15 +432,12 @@ public void call(SchedulingResult schedulingResult) { Assert.fail("Did not assign resources in time"); final CountDownLatch latch2 = new CountDownLatch(1); final AtomicBoolean found = new AtomicBoolean(); - schedulingService.requestVmCurrentStates(new Action1>() { - @Override - public void call(List states) { - for (VirtualMachineCurrentState s: states) { - for (TaskRequest t: s.getRunningTasks()) { - if (t.getId().equals(task.getId())) { - found.set(true); - latch2.countDown(); - } + schedulingService.requestVmCurrentStates(states -> { + for (VirtualMachineCurrentState s: states) { + for (TaskRequest t: s.getRunningTasks()) { + if (t.getId().equals(task.getId())) { + found.set(true); + latch2.countDown(); } } } @@ -467,19 +449,16 @@ public void call(List states) { schedulingService.removeTask(task.getId(), task.getQAttributes(), leases.get(0).hostname()); found.set(false); final CountDownLatch latch3 = new CountDownLatch(1); - schedulingService.requestVmCurrentStates(new Action1>() { - @Override - public void call(List states) { - for (VirtualMachineCurrentState s: states) { - for (TaskRequest t: s.getRunningTasks()) { - if (t.getId().equals(task.getId())) { - found.set(true); - latch3.countDown(); - } + schedulingService.requestVmCurrentStates(states -> { + for (VirtualMachineCurrentState s: states) { + for (TaskRequest t: s.getRunningTasks()) { + if (t.getId().equals(task.getId())) { + found.set(true); + latch3.countDown(); } } - latch3.countDown(); } + latch3.countDown(); }); if (!latch3.await(5, TimeUnit.SECONDS)) { Assert.fail("Timeout waiting for vm states"); @@ -493,11 +472,8 @@ public void testMaxSchedIterDelay() throws Exception { TaskQueue queue = TaskQueues.createTieredQueue(2); final TaskScheduler scheduler = getScheduler(); queue.queueTask(QueuableTaskProvider.wrapTask(tier1bktA, TaskRequestProvider.getTaskRequest(1, 100, 1))); - Action1 resultCallback = new Action1() { - @Override - public void call(SchedulingResult schedulingResult) { - // no-op - } + Action1 resultCallback = schedulingResult -> { + // no-op }; final long maxDelay = 500L; final long loopMillis = 50L; @@ -536,11 +512,8 @@ public void call(SchedulingResult schedulingResult) { public void testInitWithPrevRunningTasks() throws Exception { TaskQueue queue = TaskQueues.createTieredQueue(2); final TaskScheduler scheduler = getScheduler(); - Action1 resultCallback = new Action1() { - @Override - public void call(SchedulingResult schedulingResult) { - // no-op - } + Action1 resultCallback = schedulingResult -> { + // no-op }; final long maxDelay = 500L; final long loopMillis = 50L; @@ -554,15 +527,12 @@ public void call(SchedulingResult schedulingResult) { final AtomicReference ref = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); schedulingService.requestVmCurrentStates( - new Action1>() { - @Override - public void call(List states) { - if (states != null && !states.isEmpty()) { - final VirtualMachineCurrentState state = states.iterator().next(); - ref.set(state.getHostname()); - } - latch.countDown(); + states -> { + if (states != null && !states.isEmpty()) { + final VirtualMachineCurrentState state = states.iterator().next(); + ref.set(state.getHostname()); } + latch.countDown(); } ); if (!latch.await(maxDelay * 2, TimeUnit.MILLISECONDS)) { @@ -582,26 +552,23 @@ public void testLargeTasksToInitInRunningState() throws Exception { final CountDownLatch latch = new CountDownLatch(6); final AtomicReference> ref = new AtomicReference<>(); final AtomicBoolean printFailures = new AtomicBoolean(); - Action1 resultCallback = new Action1() { - @Override - public void call(SchedulingResult schedulingResult) { - final List exceptions = schedulingResult.getExceptions(); - if (exceptions != null && !exceptions.isEmpty()) - ref.set(exceptions); - else if (!schedulingResult.getResultMap().isEmpty()) - System.out.println("#Assignments: " + schedulingResult.getResultMap().values().iterator().next().getTasksAssigned().size()); - else if(printFailures.get()) { - final Map> failures = schedulingResult.getFailures(); - if (!failures.isEmpty()) { - for (Map.Entry> entry: failures.entrySet()) { - System.out.println(" Failure for " + entry.getKey().getId() + ":"); - for(TaskAssignmentResult r: entry.getValue()) - System.out.println(" " + r.toString()); - } + Action1 resultCallback = schedulingResult -> { + final List exceptions = schedulingResult.getExceptions(); + if (exceptions != null && !exceptions.isEmpty()) + ref.set(exceptions); + else if (!schedulingResult.getResultMap().isEmpty()) + System.out.println("#Assignments: " + schedulingResult.getResultMap().values().iterator().next().getTasksAssigned().size()); + else if(printFailures.get()) { + final Map> failures = schedulingResult.getFailures(); + if (!failures.isEmpty()) { + for (Map.Entry> entry: failures.entrySet()) { + System.out.println(" Failure for " + entry.getKey().getId() + ":"); + for(TaskAssignmentResult r: entry.getValue()) + System.out.println(" " + r.toString()); } } - latch.countDown(); } + latch.countDown(); }; final long maxDelay = 100L; final long loopMillis = 20L; @@ -681,13 +648,36 @@ else if (!resultMap.isEmpty()) { schedulingService.shutdown(); } - private void setupTaskGetter(TaskSchedulingService schedulingService, final AtomicLong gotTasksAt, final CountDownLatch latch) throws TaskQueueException { - schedulingService.requestAllTasks(new Action1>>() { - @Override - public void call(Map> stateCollectionMap) { - gotTasksAt.set(System.currentTimeMillis()); + @Test + public void testAssignableVMsEvaluator() throws Exception { + int numVms = 10; + int numTasks = 5; + long loopMillis = 100; + TaskQueue queue = TaskQueues.createTieredQueue(2); + final CountDownLatch latch = new CountDownLatch(1); + final TaskScheduler scheduler = getScheduler(avms -> avms.stream().limit(1).collect(Collectors.toList())); + final AtomicReference ref = new AtomicReference<>(); + Action1 resultCallback = schedulingResult -> { + if (schedulingResult.getTotalVMsCount() == numVms && schedulingResult.getFailures().size() == numTasks - 1) { latch.countDown(); } + }; + final TaskSchedulingService schedulingService = getSchedulingService(queue, scheduler, loopMillis, resultCallback); + ref.set(schedulingService); + schedulingService.start(); + schedulingService.addLeases(LeaseProvider.getLeases(numVms, 1, 4000, 1, 10)); + for (int i = 0; i < numTasks; i++) { + queue.queueTask(QueuableTaskProvider.wrapTask(tier1bktA, TaskRequestProvider.getTaskRequest(1, 1000, 1))); + } + if (!latch.await(loopMillis * 10, TimeUnit.MILLISECONDS)) { + Assert.fail("Latch timed out without having a successful scheduling result"); + } + } + + private void setupTaskGetter(TaskSchedulingService schedulingService, final AtomicLong gotTasksAt, final CountDownLatch latch) throws TaskQueueException { + schedulingService.requestAllTasks(stateCollectionMap -> { + gotTasksAt.set(System.currentTimeMillis()); + latch.countDown(); }); } diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index c97a8bd..c44b679 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 01abdbe..69e3a96 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ -#Thu Mar 12 15:36:03 PDT 2015 +#Thu May 17 12:49:38 PDT 2018 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-2.13-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-4.6-all.zip