diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/AssignableVMs.java b/fenzo-core/src/main/java/com/netflix/fenzo/AssignableVMs.java index f4c46c1..81bf959 100644 --- a/fenzo-core/src/main/java/com/netflix/fenzo/AssignableVMs.java +++ b/fenzo-core/src/main/java/com/netflix/fenzo/AssignableVMs.java @@ -77,7 +77,7 @@ private static class HostDisablePair { private final Map> maxResourcesMap; private final Map totalResourcesMap; private final VMRejectLimiter vmRejectLimiter; - private final AssignableVirtualMachine dummyVM = new AssignableVirtualMachine(null, null, "", null, 0L, null) { + private final AssignableVirtualMachine dummyVM = new AssignableVirtualMachine(null, null, null, "", null, 0L, null) { @Override void assignResult(TaskAssignmentResult result) { throw new UnsupportedOperationException(); @@ -88,11 +88,12 @@ void assignResult(TaskAssignmentResult result) { private final BlockingQueue unknownLeaseIdsToExpire = new LinkedBlockingQueue<>(); AssignableVMs(TaskTracker taskTracker, Action1 leaseRejectAction, + PreferentialNamedConsumableResourceEvaluator preferentialNamedConsumableResourceEvaluator, long leaseOfferExpirySecs, int maxOffersToReject, String attrNameToGroupMaxResources, boolean singleLeaseMode, String autoScaleByAttributeName) { this.taskTracker = taskTracker; vmCollection = new VMCollection( - hostname -> new AssignableVirtualMachine(vmIdToHostnameMap, leaseIdToHostnameMap, hostname, + hostname -> new AssignableVirtualMachine(preferentialNamedConsumableResourceEvaluator, vmIdToHostnameMap, leaseIdToHostnameMap, hostname, leaseRejectAction, leaseOfferExpirySecs, taskTracker, singleLeaseMode), autoScaleByAttributeName ); diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/AssignableVirtualMachine.java b/fenzo-core/src/main/java/com/netflix/fenzo/AssignableVirtualMachine.java index d39fb8d..1637c6f 100644 --- a/fenzo-core/src/main/java/com/netflix/fenzo/AssignableVirtualMachine.java +++ b/fenzo-core/src/main/java/com/netflix/fenzo/AssignableVirtualMachine.java @@ -99,6 +99,7 @@ public ResAsgmntResult(List failures, double fitness) { } } + private final PreferentialNamedConsumableResourceEvaluator preferentialNamedConsumableResourceEvaluator; private final Map leasesMap; private final BlockingQueue workersToUnAssign; private final BlockingQueue leasesToExpire; @@ -140,17 +141,20 @@ public ResAsgmntResult(List failures, double fitness) { private boolean firstLeaseAdded=false; private final List consumedResourcesToAssign = new ArrayList<>(); - public AssignableVirtualMachine(ConcurrentMap vmIdToHostnameMap, + public AssignableVirtualMachine(PreferentialNamedConsumableResourceEvaluator preferentialNamedConsumableResourceEvaluator, + ConcurrentMap vmIdToHostnameMap, ConcurrentMap leaseIdToHostnameMap, String hostname, Action1 leaseRejectAction, long leaseOfferExpirySecs, TaskTracker taskTracker) { - this(vmIdToHostnameMap, leaseIdToHostnameMap, hostname, leaseRejectAction, leaseOfferExpirySecs, taskTracker, false); + this(preferentialNamedConsumableResourceEvaluator, vmIdToHostnameMap, leaseIdToHostnameMap, hostname, leaseRejectAction, leaseOfferExpirySecs, taskTracker, false); } - public AssignableVirtualMachine(ConcurrentMap vmIdToHostnameMap, + public AssignableVirtualMachine(PreferentialNamedConsumableResourceEvaluator preferentialNamedConsumableResourceEvaluator, + ConcurrentMap vmIdToHostnameMap, ConcurrentMap leaseIdToHostnameMap, String hostname, Action1 leaseRejectAction, long leaseOfferExpirySecs, TaskTracker taskTracker, boolean singleLeaseMode) { + this.preferentialNamedConsumableResourceEvaluator = preferentialNamedConsumableResourceEvaluator; this.vmIdToHostnameMap = vmIdToHostnameMap; this.leaseIdToHostnameMap = leaseIdToHostnameMap; this.hostname = hostname; @@ -221,7 +225,7 @@ private void addToAvailableResources(VirtualMachineLease l) { int val0 = Integer.parseInt(val0Str); int val1 = Integer.parseInt(val1Str); final PreferentialNamedConsumableResourceSet crs = - new PreferentialNamedConsumableResourceSet(name, val0, val1); + new PreferentialNamedConsumableResourceSet(hostname, name, val0, val1); final Iterator iterator = consumedResourcesToAssign.iterator(); while(iterator.hasNext()) { TaskRequest request = iterator.next(); @@ -786,7 +790,7 @@ private ResAsgmntResult evalAndGetResourceAssignmentFailures(TaskRequest request for (Map.Entry entry : resourceSets.entrySet()) { if (!requestedNamedResNames.isEmpty()) requestedNamedResNames.remove(entry.getKey()); - final double fitness = entry.getValue().getFitness(request); + final double fitness = entry.getValue().getFitness(request, preferentialNamedConsumableResourceEvaluator); if (fitness == 0.0) { AssignmentFailure failure = new AssignmentFailure(VMResource.ResourceSet, 0.0, 0.0, 0.0, "ResourceSet " + entry.getValue().getName() + " unavailable" @@ -951,7 +955,7 @@ void assignResult(TaskAssignmentResult result) { result.addPort(currPortRanges.consumeNextPort()); } for(Map.Entry entry: resourceSets.entrySet()) { - result.addResourceSet(entry.getValue().consume(result.getRequest())); + result.addResourceSet(entry.getValue().consume(result.getRequest(), preferentialNamedConsumableResourceEvaluator)); } if(!taskTracker.addAssignedTask(result.getRequest(), this)) logger.error("Unexpected to re-add task to assigned state, id=" + result.getRequest().getId()); diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/CompositeSchedulingEventListener.java b/fenzo-core/src/main/java/com/netflix/fenzo/CompositeSchedulingEventListener.java new file mode 100644 index 0000000..3c285eb --- /dev/null +++ b/fenzo-core/src/main/java/com/netflix/fenzo/CompositeSchedulingEventListener.java @@ -0,0 +1,55 @@ +package com.netflix.fenzo; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.function.Consumer; + +class CompositeSchedulingEventListener implements SchedulingEventListener { + + private static final Logger logger = LoggerFactory.getLogger(CompositeSchedulingEventListener.class); + + private final List listeners; + + private CompositeSchedulingEventListener(Collection listeners) { + this.listeners = new ArrayList<>(listeners); + } + + @Override + public void onScheduleStart() { + safely(SchedulingEventListener::onScheduleStart); + } + + @Override + public void onAssignment(TaskAssignmentResult taskAssignmentResult) { + safely(listener -> listener.onAssignment(taskAssignmentResult)); + } + + @Override + public void onScheduleFinish() { + safely(SchedulingEventListener::onScheduleFinish); + } + + private void safely(Consumer action) { + listeners.forEach(listener -> { + try { + action.accept(listener); + } catch (Exception e) { + logger.warn("Scheduling event dispatching error: {} -> {}", listener.getClass().getSimpleName(), e.getMessage()); + if (logger.isDebugEnabled()) { + logger.debug("Details", e); + } + } + }); + } + + static SchedulingEventListener of(Collection listeners) { + if (listeners.isEmpty()) { + return NoOpSchedulingEventListener.INSTANCE; + } + return new CompositeSchedulingEventListener(listeners); + } +} diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/DefaultPreferentialNamedConsumableResourceEvaluator.java b/fenzo-core/src/main/java/com/netflix/fenzo/DefaultPreferentialNamedConsumableResourceEvaluator.java new file mode 100644 index 0000000..bba8f4c --- /dev/null +++ b/fenzo-core/src/main/java/com/netflix/fenzo/DefaultPreferentialNamedConsumableResourceEvaluator.java @@ -0,0 +1,20 @@ +package com.netflix.fenzo; + +/** + * Default {@link PreferentialNamedConsumableResourceEvaluator} implementation. + */ +public class DefaultPreferentialNamedConsumableResourceEvaluator implements PreferentialNamedConsumableResourceEvaluator { + + public static final PreferentialNamedConsumableResourceEvaluator INSTANCE = new DefaultPreferentialNamedConsumableResourceEvaluator(); + + @Override + public double evaluateIdle(String hostname, String resourceName, int index, double subResourcesNeeded, double subResourcesLimit) { + // unassigned: 0.0 indicates no fitness, so return 0.5, which is less than the case of assigned with 0 sub-resources + return 0.5 / (subResourcesLimit + 1); + } + + @Override + public double evaluate(String hostname, String resourceName, int index, double subResourcesNeeded, double subResourcesUsed, double subResourcesLimit) { + return Math.min(1.0, (subResourcesUsed + subResourcesNeeded + 1.0) / (subResourcesLimit + 1)); + } +} diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/NoOpSchedulingEventListener.java b/fenzo-core/src/main/java/com/netflix/fenzo/NoOpSchedulingEventListener.java new file mode 100644 index 0000000..6679402 --- /dev/null +++ b/fenzo-core/src/main/java/com/netflix/fenzo/NoOpSchedulingEventListener.java @@ -0,0 +1,18 @@ +package com.netflix.fenzo; + +class NoOpSchedulingEventListener implements SchedulingEventListener { + + static final SchedulingEventListener INSTANCE = new NoOpSchedulingEventListener(); + + @Override + public void onScheduleStart() { + } + + @Override + public void onAssignment(TaskAssignmentResult taskAssignmentResult) { + } + + @Override + public void onScheduleFinish() { + } +} diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/PreferentialNamedConsumableResourceEvaluator.java b/fenzo-core/src/main/java/com/netflix/fenzo/PreferentialNamedConsumableResourceEvaluator.java new file mode 100644 index 0000000..da7e942 --- /dev/null +++ b/fenzo-core/src/main/java/com/netflix/fenzo/PreferentialNamedConsumableResourceEvaluator.java @@ -0,0 +1,44 @@ +package com.netflix.fenzo; + +import com.netflix.fenzo.PreferentialNamedConsumableResourceSet.PreferentialNamedConsumableResource; + +/** + * Evaluator for {@link PreferentialNamedConsumableResource} selection process. Given an agent with matching + * ENI slot (either empty or with a matching name), this evaluator computes the fitness score. + * A custom implementation can provide fitness calculators augmented with additional information not available to + * Fenzo for making best placement decision. + * + *

Example

+ * {@link PreferentialNamedConsumableResource} can be used to model AWS ENI interfaces together with IP and security + * group assignments. To minimize number of AWS API calls and to improve efficiency, it is beneficial to place a task + * on an agent which has ENI profile with matching security group profile so the ENI can be reused. Or if a task + * is terminated, but agent releases its resources lazily, they can be reused by another task with a matching profile. + */ +public interface PreferentialNamedConsumableResourceEvaluator { + + /** + * Provide fitness score for an idle consumable resource. + * + * @param hostname hostname of an agent + * @param resourceName name to be associated with a resource with the given index + * @param index a consumable resource index + * @param subResourcesNeeded an amount of sub-resources required by a scheduled task + * @param subResourcesLimit a total amount of sub-resources available + * @return fitness score + */ + double evaluateIdle(String hostname, String resourceName, int index, double subResourcesNeeded, double subResourcesLimit); + + /** + * Provide fitness score for a consumable resource that is already associated with some tasks. These tasks and + * the current one having profiles so can share the resource. + * + * @param hostname hostname of an agent + * @param resourceName name associated with a resource with the given index + * @param index a consumable resource index + * @param subResourcesNeeded an amount of sub-resources required by a scheduled task + * @param subResourcesUsed an amount of sub-resources already used by other tasks + * @param subResourcesLimit a total amount of sub-resources available + * @return fitness score + */ + double evaluate(String hostname, String resourceName, int index, double subResourcesNeeded, double subResourcesUsed, double subResourcesLimit); +} diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/PreferentialNamedConsumableResourceSet.java b/fenzo-core/src/main/java/com/netflix/fenzo/PreferentialNamedConsumableResourceSet.java index 801a18d..762033e 100644 --- a/fenzo-core/src/main/java/com/netflix/fenzo/PreferentialNamedConsumableResourceSet.java +++ b/fenzo-core/src/main/java/com/netflix/fenzo/PreferentialNamedConsumableResourceSet.java @@ -19,8 +19,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; @@ -92,6 +90,7 @@ public double getFitness() { public static class PreferentialNamedConsumableResource { private final double maxFitness; + private final String hostname; private final int index; private final String attrName; private String resName=null; @@ -99,7 +98,8 @@ public static class PreferentialNamedConsumableResource { private final Map usageBy; private int usedSubResources=0; - PreferentialNamedConsumableResource(int i, String attrName, int limit) { + PreferentialNamedConsumableResource(String hostname, int i, String attrName, int limit) { + this.hostname = hostname; this.index = i; this.attrName = attrName; this.limit = limit; @@ -131,19 +131,41 @@ public Map getUsageBy() { return usedSubResources; } - double getFitness(TaskRequest request) { - String r = getResNameVal(attrName, request); - if(resName == null) - return 0.5 / maxFitness; // unassigned: 0.0 indicates no fitness, so return 0.5, which is less than - // the case of assigned with 0 sub-resources - if(!resName.equals(r)) + double getFitness(TaskRequest request, PreferentialNamedConsumableResourceEvaluator evaluator) { + TaskRequest.NamedResourceSetRequest setRequest = request.getCustomNamedResources()==null + ? null + : request.getCustomNamedResources().get(attrName); + + // This particular resource type is not requested. We assign to it virtual resource name 'CustomResAbsentKey', + // and request 0 sub-resources. + if(setRequest == null) { + if(resName == null) { + return evaluator.evaluateIdle(hostname, CustomResAbsentKey, index, 0, limit); + } + if(resName.equals(CustomResAbsentKey)) { + return evaluator.evaluate(hostname, CustomResAbsentKey, index, 0, usedSubResources, limit); + } return 0.0; - final TaskRequest.NamedResourceSetRequest setRequest = request.getCustomNamedResources()==null? - null : request.getCustomNamedResources().get(attrName); - double subResNeed = setRequest==null? 0.0 : setRequest.getNumSubResources(); - if(usedSubResources + subResNeed > limit) + } + + double subResNeed = setRequest.getNumSubResources(); + + // Resource not assigned yet to any task + if(resName == null) { + if(subResNeed > limit) { + return 0.0; + } + return evaluator.evaluateIdle(hostname, setRequest.getResValue(), index, subResNeed, limit); + } + + // Resource assigned different name than requested + if(!resName.equals(setRequest.getResValue())) { + return 0.0; + } + if(usedSubResources + subResNeed > limit) { return 0.0; - return Math.min(1.0, usedSubResources + subResNeed + 1.0 / maxFitness); + } + return evaluator.evaluate(hostname, setRequest.getResValue(), index, subResNeed, usedSubResources, limit); } void consume(TaskRequest request) { @@ -190,11 +212,11 @@ boolean release(TaskRequest request) { private final String name; private final List usageBy; - public PreferentialNamedConsumableResourceSet(String name, int val0, int val1) { + public PreferentialNamedConsumableResourceSet(String hostname, String name, int val0, int val1) { this.name = name; usageBy = new ArrayList<>(val0); for(int i=0; i weightedScaleDownConstraintEvaluators; + private PreferentialNamedConsumableResourceEvaluator preferentialNamedConsumableResourceEvaluator = new DefaultPreferentialNamedConsumableResourceEvaluator(); private Action1 autoscalerCallback=null; private long delayAutoscaleUpBySecs=0L; private long delayAutoscaleDownBySecs=0L; @@ -100,6 +101,8 @@ public Boolean call(Double f) { private boolean disableShortfallEvaluation=false; private Map resAllocs=null; private boolean singleOfferMode=false; + private final List schedulingEventListeners = new ArrayList<>(); + private int maxConcurrent = Runtime.getRuntime().availableProcessors(); /** * (Required) Call this method to establish a method that your task scheduler will call to notify you @@ -168,6 +171,11 @@ public Builder withFitnessCalculator(VMTaskFitnessCalculator fitnessCalculator) return this; } + public Builder withSchedulingEventListener(SchedulingEventListener schedulingEventListener) { + this.schedulingEventListeners.add(schedulingEventListener); + return this; + } + /** * Call this method to indicate which host attribute you want your task scheduler to use in order to * distinguish which hosts are in which autoscaling groups. You must call this method before you call @@ -237,6 +245,11 @@ public Builder withWeightedScaleDownConstraintEvaluators(Map assignmentResults, TaskAssignmentR private long lastVMPurgeAt=System.currentTimeMillis(); private final Builder builder; private final StateMonitor stateMonitor; + private final SchedulingEventListener schedulingEventListener; private final AutoScaler autoScaler; - private final int EXEC_SVC_THREADS=Runtime.getRuntime().availableProcessors(); - private final ExecutorService executorService = Executors.newFixedThreadPool(EXEC_SVC_THREADS); + private final int maxConcurrent; + private final ExecutorService executorService; private final AtomicBoolean isShutdown = new AtomicBoolean(); private final ResAllocsEvaluater resAllocsEvaluator; private final TaskTracker taskTracker; @@ -467,10 +496,13 @@ private TaskScheduler(Builder builder) { if(builder.leaseRejectAction ==null) throw new IllegalArgumentException("Lease reject action must be non-null"); this.builder = builder; + this.maxConcurrent = builder.maxConcurrent; + this.executorService = Executors.newFixedThreadPool(maxConcurrent); this.stateMonitor = new StateMonitor(); + this.schedulingEventListener = CompositeSchedulingEventListener.of(builder.schedulingEventListeners); taskTracker = new TaskTracker(); resAllocsEvaluator = new ResAllocsEvaluater(taskTracker, builder.resAllocs); - assignableVMs = new AssignableVMs(taskTracker, builder.leaseRejectAction, + assignableVMs = new AssignableVMs(taskTracker, builder.leaseRejectAction, builder.preferentialNamedConsumableResourceEvaluator, builder.leaseOfferExpirySecs, builder.maxOffersToReject, builder.autoScaleByAttributeName, builder.singleOfferMode, builder.autoScaleByAttributeName); if(builder.autoScaleByAttributeName != null && !builder.autoScaleByAttributeName.isEmpty()) { @@ -771,107 +803,113 @@ private SchedulingResult doSchedule( failedTasksForAutoScaler.add(taskOrFailure.getTask()); } } else { - while (true) { - final Assignable taskOrFailure = taskIterator.next(); - if(logger.isDebugEnabled()) - logger.debug("TaskSched: task=" + (taskOrFailure == null? "null" : taskOrFailure.getTask().getId())); - if (taskOrFailure == null) - break; - if(taskOrFailure.hasFailure()) { - schedulingResult.addFailures( - taskOrFailure.getTask(), - Collections.singletonList(new TaskAssignmentResult( - assignableVMs.getDummyVM(), - taskOrFailure.getTask(), - false, - Collections.singletonList(taskOrFailure.getAssignmentFailure()), - null, - 0 - ) - )); - continue; - } - TaskRequest task = taskOrFailure.getTask(); - failedTasksForAutoScaler.add(task); - if(hasResAllocs) { - if(resAllocsEvaluator.taskGroupFailed(task.taskGroupName())) { - if(logger.isDebugEnabled()) - logger.debug("Resource allocation limits reached for task: " + task.getId()); + schedulingEventListener.onScheduleStart(); + try { + while (true) { + final Assignable taskOrFailure = taskIterator.next(); + if(logger.isDebugEnabled()) + logger.debug("TaskSched: task=" + (taskOrFailure == null? "null" : taskOrFailure.getTask().getId())); + if (taskOrFailure == null) + break; + if(taskOrFailure.hasFailure()) { + schedulingResult.addFailures( + taskOrFailure.getTask(), + Collections.singletonList(new TaskAssignmentResult( + assignableVMs.getDummyVM(), + taskOrFailure.getTask(), + false, + Collections.singletonList(taskOrFailure.getAssignmentFailure()), + null, + 0 + ) + )); continue; } - final AssignmentFailure resAllocsFailure = resAllocsEvaluator.hasResAllocs(task); - if(resAllocsFailure != null) { - final List failures = Collections.singletonList(new TaskAssignmentResult(assignableVMs.getDummyVM(), - task, false, Collections.singletonList(resAllocsFailure), null, 0.0)); + TaskRequest task = taskOrFailure.getTask(); + failedTasksForAutoScaler.add(task); + if(hasResAllocs) { + if(resAllocsEvaluator.taskGroupFailed(task.taskGroupName())) { + if(logger.isDebugEnabled()) + logger.debug("Resource allocation limits reached for task: " + task.getId()); + continue; + } + final AssignmentFailure resAllocsFailure = resAllocsEvaluator.hasResAllocs(task); + if(resAllocsFailure != null) { + final List failures = Collections.singletonList(new TaskAssignmentResult(assignableVMs.getDummyVM(), + task, false, Collections.singletonList(resAllocsFailure), null, 0.0)); + schedulingResult.addFailures(task, failures); + failedTasksForAutoScaler.remove(task); // don't scale up for resAllocs failures + if(logger.isDebugEnabled()) + logger.debug("Resource allocation limit reached for task " + task.getId() + ": " + resAllocsFailure); + continue; + } + } + final AssignmentFailure maxResourceFailure = assignableVMs.getFailedMaxResource(null, task); + if(maxResourceFailure != null) { + final List failures = Collections.singletonList(new TaskAssignmentResult(assignableVMs.getDummyVM(), task, false, + Collections.singletonList(maxResourceFailure), null, 0.0)); schedulingResult.addFailures(task, failures); - failedTasksForAutoScaler.remove(task); // don't scale up for resAllocs failures if(logger.isDebugEnabled()) - logger.debug("Resource allocation limit reached for task " + task.getId() + ": " + resAllocsFailure); + logger.debug("Task {}: maxResource failure: {}", task.getId(), maxResourceFailure); continue; } - } - final AssignmentFailure maxResourceFailure = assignableVMs.getFailedMaxResource(null, task); - if(maxResourceFailure != null) { - final List failures = Collections.singletonList(new TaskAssignmentResult(assignableVMs.getDummyVM(), task, false, - Collections.singletonList(maxResourceFailure), null, 0.0)); - schedulingResult.addFailures(task, failures); + // create batches of VMs to evaluate assignments concurrently across the batches + final BlockingQueue virtualMachines = new ArrayBlockingQueue<>(avms.size(), false, avms); + int nThreads = (int)Math.ceil((double)avms.size()/ PARALLEL_SCHED_EVAL_MIN_BATCH_SIZE); + List> futures = new ArrayList<>(); if(logger.isDebugEnabled()) - logger.debug("Task {}: maxResource failure: {}", task.getId(), maxResourceFailure); - continue; - } - // create batches of VMs to evaluate assignments concurrently across the batches - final BlockingQueue virtualMachines = new ArrayBlockingQueue<>(avms.size(), false, avms); - int nThreads = (int)Math.ceil((double)avms.size()/ PARALLEL_SCHED_EVAL_MIN_BATCH_SIZE); - List> futures = new ArrayList<>(); - if(logger.isDebugEnabled()) - logger.debug("Launching {} threads for evaluating assignments for task {}", nThreads, task.getId()); - for(int b=0; b() { - @Override - public EvalResult call() throws Exception { - return evalAssignments(task, virtualMachines); - } - })); - } - List results = new ArrayList<>(); - List bestResults = new ArrayList<>(); - for(Future f: futures) { - try { - EvalResult evalResult = f.get(); - if(evalResult.exception!=null) { - logger.warn("Error during concurrent task assignment eval - " + evalResult.exception.getMessage(), - evalResult.exception); - schedulingResult.addException(evalResult.exception); - } - else { - results.add(evalResult); - bestResults.add(evalResult.result); - if(logger.isDebugEnabled()) - logger.debug("Task {}: best result so far: {}", task.getId(), evalResult.result); - totalNumAllocations += evalResult.numAllocationTrials; + logger.debug("Launching {} threads for evaluating assignments for task {}", nThreads, task.getId()); + for(int b = 0; b() { + @Override + public EvalResult call() throws Exception { + return evalAssignments(task, virtualMachines); + } + })); + } + List results = new ArrayList<>(); + List bestResults = new ArrayList<>(); + for(Future f: futures) { + try { + EvalResult evalResult = f.get(); + if(evalResult.exception!=null) { + logger.warn("Error during concurrent task assignment eval - " + evalResult.exception.getMessage(), + evalResult.exception); + schedulingResult.addException(evalResult.exception); + } + else { + results.add(evalResult); + bestResults.add(evalResult.result); + if(logger.isDebugEnabled()) + logger.debug("Task {}: best result so far: {}", task.getId(), evalResult.result); + totalNumAllocations += evalResult.numAllocationTrials; + } + } catch (InterruptedException|ExecutionException e) { + logger.error("Unexpected during concurrent task assignment eval - " + e.getMessage(), e); } - } catch (InterruptedException|ExecutionException e) { - logger.error("Unexpected during concurrent task assignment eval - " + e.getMessage(), e); + } + if(!schedulingResult.getExceptions().isEmpty()) + break; + TaskAssignmentResult successfulResult = getSuccessfulResult(bestResults); + List failures = new ArrayList<>(); + if(successfulResult == null) { + if(logger.isDebugEnabled()) + logger.debug("Task {}: no successful results", task.getId()); + for(EvalResult er: results) + failures.addAll(er.assignmentResults); + schedulingResult.addFailures(task, failures); + } + else { + if(logger.isDebugEnabled()) + logger.debug("Task {}: found successful assignment on host {}", task.getId(), + successfulResult.getHostname()); + successfulResult.assignResult(); + failedTasksForAutoScaler.remove(task); + schedulingEventListener.onAssignment(successfulResult); } } - if(!schedulingResult.getExceptions().isEmpty()) - break; - TaskAssignmentResult successfulResult = getSuccessfulResult(bestResults); - List failures = new ArrayList<>(); - if(successfulResult == null) { - if(logger.isDebugEnabled()) - logger.debug("Task {}: no successful results", task.getId()); - for(EvalResult er: results) - failures.addAll(er.assignmentResults); - schedulingResult.addFailures(task, failures); - } - else { - if(logger.isDebugEnabled()) - logger.debug("Task {}: found successful assignment on host {}", task.getId(), - successfulResult.getHostname()); - successfulResult.assignResult(); - failedTasksForAutoScaler.remove(task); - } + } finally { + schedulingEventListener.onScheduleFinish(); } } List idleResourcesList = new ArrayList<>(); diff --git a/fenzo-core/src/test/java/com/netflix/fenzo/VMCollectionTest.java b/fenzo-core/src/test/java/com/netflix/fenzo/VMCollectionTest.java index fb38b67..fd151a0 100644 --- a/fenzo-core/src/test/java/com/netflix/fenzo/VMCollectionTest.java +++ b/fenzo-core/src/test/java/com/netflix/fenzo/VMCollectionTest.java @@ -130,6 +130,7 @@ private VMCollection createVmCollection(ConcurrentMap vmIdTohost Action1 leaseRejectAction = l -> {}; return new VMCollection(s -> { AssignableVirtualMachine avm = new AssignableVirtualMachine( + DefaultPreferentialNamedConsumableResourceEvaluator.INSTANCE, vmIdTohostNames, leasesToHostnames, s,