Skip to content
This repository was archived by the owner on Mar 31, 2023. It is now read-only.

ENI fitness #173

Merged
merged 6 commits into from
Feb 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions fenzo-core/src/main/java/com/netflix/fenzo/AssignableVMs.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private static class HostDisablePair {
private final Map<String, Map<VMResource, Double>> maxResourcesMap;
private final Map<VMResource, Double> totalResourcesMap;
private final VMRejectLimiter vmRejectLimiter;
private final AssignableVirtualMachine dummyVM = new AssignableVirtualMachine(null, null, "", null, 0L, null) {
private final AssignableVirtualMachine dummyVM = new AssignableVirtualMachine(null, null, null, "", null, 0L, null) {
@Override
void assignResult(TaskAssignmentResult result) {
throw new UnsupportedOperationException();
Expand All @@ -88,11 +88,12 @@ void assignResult(TaskAssignmentResult result) {
private final BlockingQueue<String> unknownLeaseIdsToExpire = new LinkedBlockingQueue<>();

AssignableVMs(TaskTracker taskTracker, Action1<VirtualMachineLease> leaseRejectAction,
PreferentialNamedConsumableResourceEvaluator preferentialNamedConsumableResourceEvaluator,
long leaseOfferExpirySecs, int maxOffersToReject,
String attrNameToGroupMaxResources, boolean singleLeaseMode, String autoScaleByAttributeName) {
this.taskTracker = taskTracker;
vmCollection = new VMCollection(
hostname -> new AssignableVirtualMachine(vmIdToHostnameMap, leaseIdToHostnameMap, hostname,
hostname -> new AssignableVirtualMachine(preferentialNamedConsumableResourceEvaluator, vmIdToHostnameMap, leaseIdToHostnameMap, hostname,
leaseRejectAction, leaseOfferExpirySecs, taskTracker, singleLeaseMode),
autoScaleByAttributeName
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public ResAsgmntResult(List<AssignmentFailure> failures, double fitness) {
}
}

private final PreferentialNamedConsumableResourceEvaluator preferentialNamedConsumableResourceEvaluator;
private final Map<String, VirtualMachineLease> leasesMap;
private final BlockingQueue<String> workersToUnAssign;
private final BlockingQueue<String> leasesToExpire;
Expand Down Expand Up @@ -140,17 +141,20 @@ public ResAsgmntResult(List<AssignmentFailure> failures, double fitness) {
private boolean firstLeaseAdded=false;
private final List<TaskRequest> consumedResourcesToAssign = new ArrayList<>();

public AssignableVirtualMachine(ConcurrentMap<String, String> vmIdToHostnameMap,
public AssignableVirtualMachine(PreferentialNamedConsumableResourceEvaluator preferentialNamedConsumableResourceEvaluator,
ConcurrentMap<String, String> vmIdToHostnameMap,
ConcurrentMap<String, String> leaseIdToHostnameMap,
String hostname, Action1<VirtualMachineLease> leaseRejectAction,
long leaseOfferExpirySecs, TaskTracker taskTracker) {
this(vmIdToHostnameMap, leaseIdToHostnameMap, hostname, leaseRejectAction, leaseOfferExpirySecs, taskTracker, false);
this(preferentialNamedConsumableResourceEvaluator, vmIdToHostnameMap, leaseIdToHostnameMap, hostname, leaseRejectAction, leaseOfferExpirySecs, taskTracker, false);
}

public AssignableVirtualMachine(ConcurrentMap<String, String> vmIdToHostnameMap,
public AssignableVirtualMachine(PreferentialNamedConsumableResourceEvaluator preferentialNamedConsumableResourceEvaluator,
ConcurrentMap<String, String> vmIdToHostnameMap,
ConcurrentMap<String, String> leaseIdToHostnameMap,
String hostname, Action1<VirtualMachineLease> leaseRejectAction,
long leaseOfferExpirySecs, TaskTracker taskTracker, boolean singleLeaseMode) {
this.preferentialNamedConsumableResourceEvaluator = preferentialNamedConsumableResourceEvaluator;
this.vmIdToHostnameMap = vmIdToHostnameMap;
this.leaseIdToHostnameMap = leaseIdToHostnameMap;
this.hostname = hostname;
Expand Down Expand Up @@ -221,7 +225,7 @@ private void addToAvailableResources(VirtualMachineLease l) {
int val0 = Integer.parseInt(val0Str);
int val1 = Integer.parseInt(val1Str);
final PreferentialNamedConsumableResourceSet crs =
new PreferentialNamedConsumableResourceSet(name, val0, val1);
new PreferentialNamedConsumableResourceSet(hostname, name, val0, val1);
final Iterator<TaskRequest> iterator = consumedResourcesToAssign.iterator();
while(iterator.hasNext()) {
TaskRequest request = iterator.next();
Expand Down Expand Up @@ -786,7 +790,7 @@ private ResAsgmntResult evalAndGetResourceAssignmentFailures(TaskRequest request
for (Map.Entry<String, PreferentialNamedConsumableResourceSet> entry : resourceSets.entrySet()) {
if (!requestedNamedResNames.isEmpty())
requestedNamedResNames.remove(entry.getKey());
final double fitness = entry.getValue().getFitness(request);
final double fitness = entry.getValue().getFitness(request, preferentialNamedConsumableResourceEvaluator);
if (fitness == 0.0) {
AssignmentFailure failure = new AssignmentFailure(VMResource.ResourceSet, 0.0, 0.0, 0.0,
"ResourceSet " + entry.getValue().getName() + " unavailable"
Expand Down Expand Up @@ -951,7 +955,7 @@ void assignResult(TaskAssignmentResult result) {
result.addPort(currPortRanges.consumeNextPort());
}
for(Map.Entry<String, PreferentialNamedConsumableResourceSet> entry: resourceSets.entrySet()) {
result.addResourceSet(entry.getValue().consume(result.getRequest()));
result.addResourceSet(entry.getValue().consume(result.getRequest(), preferentialNamedConsumableResourceEvaluator));
}
if(!taskTracker.addAssignedTask(result.getRequest(), this))
logger.error("Unexpected to re-add task to assigned state, id=" + result.getRequest().getId());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.netflix.fenzo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;

class CompositeSchedulingEventListener implements SchedulingEventListener {

private static final Logger logger = LoggerFactory.getLogger(CompositeSchedulingEventListener.class);

private final List<SchedulingEventListener> listeners;

private CompositeSchedulingEventListener(Collection<SchedulingEventListener> listeners) {
this.listeners = new ArrayList<>(listeners);
}

@Override
public void onScheduleStart() {
safely(SchedulingEventListener::onScheduleStart);
}

@Override
public void onAssignment(TaskAssignmentResult taskAssignmentResult) {
safely(listener -> listener.onAssignment(taskAssignmentResult));
}

@Override
public void onScheduleFinish() {
safely(SchedulingEventListener::onScheduleFinish);
}

private void safely(Consumer<SchedulingEventListener> action) {
listeners.forEach(listener -> {
try {
action.accept(listener);
} catch (Exception e) {
logger.warn("Scheduling event dispatching error: {} -> {}", listener.getClass().getSimpleName(), e.getMessage());
if (logger.isDebugEnabled()) {
logger.debug("Details", e);
}
}
});
}

static SchedulingEventListener of(Collection<SchedulingEventListener> listeners) {
if (listeners.isEmpty()) {
return NoOpSchedulingEventListener.INSTANCE;
}
return new CompositeSchedulingEventListener(listeners);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.netflix.fenzo;

/**
* Default {@link PreferentialNamedConsumableResourceEvaluator} implementation.
*/
public class DefaultPreferentialNamedConsumableResourceEvaluator implements PreferentialNamedConsumableResourceEvaluator {

public static final PreferentialNamedConsumableResourceEvaluator INSTANCE = new DefaultPreferentialNamedConsumableResourceEvaluator();

@Override
public double evaluateIdle(String hostname, String resourceName, int index, double subResourcesNeeded, double subResourcesLimit) {
// unassigned: 0.0 indicates no fitness, so return 0.5, which is less than the case of assigned with 0 sub-resources
return 0.5 / (subResourcesLimit + 1);
}

@Override
public double evaluate(String hostname, String resourceName, int index, double subResourcesNeeded, double subResourcesUsed, double subResourcesLimit) {
return Math.min(1.0, (subResourcesUsed + subResourcesNeeded + 1.0) / (subResourcesLimit + 1));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.netflix.fenzo;

class NoOpSchedulingEventListener implements SchedulingEventListener {

static final SchedulingEventListener INSTANCE = new NoOpSchedulingEventListener();

@Override
public void onScheduleStart() {
}

@Override
public void onAssignment(TaskAssignmentResult taskAssignmentResult) {
}

@Override
public void onScheduleFinish() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.netflix.fenzo;

import com.netflix.fenzo.PreferentialNamedConsumableResourceSet.PreferentialNamedConsumableResource;

/**
* Evaluator for {@link PreferentialNamedConsumableResource} selection process. Given an agent with matching
* ENI slot (either empty or with a matching name), this evaluator computes the fitness score.
* A custom implementation can provide fitness calculators augmented with additional information not available to
* Fenzo for making best placement decision.
*
* <h1>Example</h1>
* {@link PreferentialNamedConsumableResource} can be used to model AWS ENI interfaces together with IP and security
* group assignments. To minimize number of AWS API calls and to improve efficiency, it is beneficial to place a task
* on an agent which has ENI profile with matching security group profile so the ENI can be reused. Or if a task
* is terminated, but agent releases its resources lazily, they can be reused by another task with a matching profile.
*/
public interface PreferentialNamedConsumableResourceEvaluator {

/**
* Provide fitness score for an idle consumable resource.
*
* @param hostname hostname of an agent
* @param resourceName name to be associated with a resource with the given index
* @param index a consumable resource index
* @param subResourcesNeeded an amount of sub-resources required by a scheduled task
* @param subResourcesLimit a total amount of sub-resources available
* @return fitness score
*/
double evaluateIdle(String hostname, String resourceName, int index, double subResourcesNeeded, double subResourcesLimit);

/**
* Provide fitness score for a consumable resource that is already associated with some tasks. These tasks and
* the current one having profiles so can share the resource.
*
* @param hostname hostname of an agent
* @param resourceName name associated with a resource with the given index
* @param index a consumable resource index
* @param subResourcesNeeded an amount of sub-resources required by a scheduled task
* @param subResourcesUsed an amount of sub-resources already used by other tasks
* @param subResourcesLimit a total amount of sub-resources available
* @return fitness score
*/
double evaluate(String hostname, String resourceName, int index, double subResourcesNeeded, double subResourcesUsed, double subResourcesLimit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -92,14 +90,16 @@ public double getFitness() {

public static class PreferentialNamedConsumableResource {
private final double maxFitness;
private final String hostname;
private final int index;
private final String attrName;
private String resName=null;
private final int limit;
private final Map<String, TaskRequest.NamedResourceSetRequest> usageBy;
private int usedSubResources=0;

PreferentialNamedConsumableResource(int i, String attrName, int limit) {
PreferentialNamedConsumableResource(String hostname, int i, String attrName, int limit) {
this.hostname = hostname;
this.index = i;
this.attrName = attrName;
this.limit = limit;
Expand Down Expand Up @@ -131,19 +131,41 @@ public Map<String, TaskRequest.NamedResourceSetRequest> getUsageBy() {
return usedSubResources;
}

double getFitness(TaskRequest request) {
String r = getResNameVal(attrName, request);
if(resName == null)
return 0.5 / maxFitness; // unassigned: 0.0 indicates no fitness, so return 0.5, which is less than
// the case of assigned with 0 sub-resources
if(!resName.equals(r))
double getFitness(TaskRequest request, PreferentialNamedConsumableResourceEvaluator evaluator) {
TaskRequest.NamedResourceSetRequest setRequest = request.getCustomNamedResources()==null
? null
: request.getCustomNamedResources().get(attrName);

// This particular resource type is not requested. We assign to it virtual resource name 'CustomResAbsentKey',
// and request 0 sub-resources.
if(setRequest == null) {
if(resName == null) {
return evaluator.evaluateIdle(hostname, CustomResAbsentKey, index, 0, limit);
}
if(resName.equals(CustomResAbsentKey)) {
return evaluator.evaluate(hostname, CustomResAbsentKey, index, 0, usedSubResources, limit);
}
return 0.0;
final TaskRequest.NamedResourceSetRequest setRequest = request.getCustomNamedResources()==null?
null : request.getCustomNamedResources().get(attrName);
double subResNeed = setRequest==null? 0.0 : setRequest.getNumSubResources();
if(usedSubResources + subResNeed > limit)
}

double subResNeed = setRequest.getNumSubResources();

// Resource not assigned yet to any task
if(resName == null) {
if(subResNeed > limit) {
return 0.0;
}
return evaluator.evaluateIdle(hostname, setRequest.getResValue(), index, subResNeed, limit);
}

// Resource assigned different name than requested
if(!resName.equals(setRequest.getResValue())) {
return 0.0;
}
if(usedSubResources + subResNeed > limit) {
return 0.0;
return Math.min(1.0, usedSubResources + subResNeed + 1.0 / maxFitness);
}
return evaluator.evaluate(hostname, setRequest.getResValue(), index, subResNeed, usedSubResources, limit);
}

void consume(TaskRequest request) {
Expand Down Expand Up @@ -190,11 +212,11 @@ boolean release(TaskRequest request) {
private final String name;
private final List<PreferentialNamedConsumableResource> usageBy;

public PreferentialNamedConsumableResourceSet(String name, int val0, int val1) {
public PreferentialNamedConsumableResourceSet(String hostname, String name, int val0, int val1) {
this.name = name;
usageBy = new ArrayList<>(val0);
for(int i=0; i<val0; i++)
usageBy.add(new PreferentialNamedConsumableResource(i, name, val1));
usageBy.add(new PreferentialNamedConsumableResource(hostname, i, name, val1));
}

public String getName() {
Expand All @@ -209,8 +231,8 @@ public String getName() {
// return false;
// }

ConsumeResult consume(TaskRequest request) {
return consumeIntl(request, false);
ConsumeResult consume(TaskRequest request, PreferentialNamedConsumableResourceEvaluator evaluator) {
return consumeIntl(request, false, evaluator);
}

void assign(TaskRequest request) {
Expand All @@ -233,15 +255,15 @@ void assign(TaskRequest request) {
}

// returns 0.0 for no fitness at all, or <=1.0 for fitness
double getFitness(TaskRequest request) {
return consumeIntl(request, true).fitness;
double getFitness(TaskRequest request, PreferentialNamedConsumableResourceEvaluator evaluator) {
return consumeIntl(request, true, evaluator).fitness;
}

private ConsumeResult consumeIntl(TaskRequest request, boolean skipConsume) {
private ConsumeResult consumeIntl(TaskRequest request, boolean skipConsume, PreferentialNamedConsumableResourceEvaluator evaluator) {
PreferentialNamedConsumableResource best = null;
double bestFitness=0.0;
for(PreferentialNamedConsumableResource r: usageBy) {
double f = r.getFitness(request);
double f = r.getFitness(request, evaluator);
if(f == 0.0)
continue;
if(bestFitness < f) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.netflix.fenzo;

/**
* A callback API providing notification about Fenzo task placement decisions during the scheduling process.
*/
public interface SchedulingEventListener {

/**
* Called before a new scheduling iteration is started.
*/
void onScheduleStart();

/**
* Called when a new task placement decision is made (a task gets resources allocated on a server).
*
* @param taskAssignmentResult task assignment result
*/
void onAssignment(TaskAssignmentResult taskAssignmentResult);

/**
* Called when the scheduling iteration completes.
*/
void onScheduleFinish();
}
Loading