Skip to content
This repository was archived by the owner on Mar 3, 2023. It is now read-only.

Kubernetes V1Controller Cleanup #3752

Merged
merged 3 commits into from
Jan 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ static String Megabytes(ByteAmount amount) {
return String.format("%sMi", Long.toString(amount.asMegabytes()));
}

static double roundDecimal(double value, int places) {
double scale = Math.pow(10, places);
return Math.round(value * scale) / scale;
}

static class V1ControllerUtils<T> {
private static final Logger LOG = Logger.getLogger(V1Controller.class.getName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ public class V1Controller extends KubernetesController {
private final AppsV1Api appsClient;
private final CoreV1Api coreClient;

/**
* Configures the Kubernetes API Application and Core communications clients.
* @param configuration <code>topology</code> configurations.
* @param runtimeConfiguration Kubernetes runtime configurations.
*/
V1Controller(Config configuration, Config runtimeConfiguration) {
super(configuration, runtimeConfiguration);

Expand All @@ -119,6 +124,11 @@ public class V1Controller extends KubernetesController {
}
}

/**
* Configures all components required by a <code>topology</code> and submits it to the Kubernetes scheduler.
* @param packingPlan Used to configure the StatefulSets <code>Resource</code>s and replica count.
* @return Success indicator.
*/
@Override
boolean submit(PackingPlan packingPlan) {
final String topologyName = getTopologyName();
Expand Down Expand Up @@ -160,6 +170,15 @@ boolean submit(PackingPlan packingPlan) {
return true;
}

/**
* Shuts down a <code>topology</code> by deleting all associated resources.
* <ul>
* <li><code>Persistent Volume Claims</code> added by the <code>topology</code>.</li>
* <li><code>StatefulSet</code> for both the <code>Executors</code> and <code>Manager</code>.</li>
* <li>Headless <code>Service</code> which facilitates communication between all Pods.</li>
* </ul>
* @return Success indicator.
*/
@Override
boolean killTopology() {
removePersistentVolumeClaims();
Expand All @@ -186,6 +205,11 @@ boolean restart(int shardId) {
return true;
}

/**
* Adds a specified number of Pods to a <code>topology</code>'s <code>Executors</code>.
* @param containersToAdd Set of containers to be added.
* @return The passed in <code>Packing Plan</code>.
*/
@Override
public Set<PackingPlan.ContainerPlan>
addContainers(Set<PackingPlan.ContainerPlan> containersToAdd) {
Expand All @@ -210,6 +234,10 @@ boolean restart(int shardId) {
return containersToAdd;
}

/**
* Removes a specified number of Pods from a <code>topology</code>'s <code>Executors</code>.
* @param containersToRemove Set of containers to be removed.
*/
@Override
public void removeContainers(Set<PackingPlan.ContainerPlan> containersToRemove) {
final V1StatefulSet statefulSet;
Expand All @@ -232,6 +260,12 @@ public void removeContainers(Set<PackingPlan.ContainerPlan> containersToRemove)
}
}

/**
* Performs an in-place update of the replica count for a <code>topology</code>. This allows the
* <code>topology</code> Pod count to be scaled up or down.
* @param replicas The new number of Pod replicas required.
* @throws ApiException in the event there is a failure patching the StatefulSet.
*/
private void patchStatefulSetReplicas(int replicas) throws ApiException {
final String body =
String.format(KubernetesConstants.JSON_PATCH_STATEFUL_SET_REPLICAS_FORMAT,
Expand All @@ -253,11 +287,20 @@ private void patchStatefulSetReplicas(int replicas) throws ApiException {
appsClient.getApiClient());
}

/**
* Retrieves the <code>Executors</code> StatefulSet configurations for the Kubernetes cluster.
* @return <code>Executors</code> StatefulSet configurations.
* @throws ApiException in the event there is a failure retrieving the StatefulSet.
*/
V1StatefulSet getStatefulSet() throws ApiException {
return appsClient.readNamespacedStatefulSet(getStatefulSetName(true), getNamespace(),
null, null, null);
}

/**
* Deletes the headless <code>Service</code> for a <code>topology</code>'s <code>Executors</code>
* and <code>Manager</code> using the <code>topology</code>'s name.
*/
void deleteService() {
try (Response response = coreClient.deleteNamespacedServiceCall(getTopologyName(),
getNamespace(), null, null, 0, null,
Expand Down Expand Up @@ -293,6 +336,10 @@ void deleteService() {
+ "] in namespace [" + getNamespace() + "] is deleted.");
}

/**
* Deletes the StatefulSets for a <code>topology</code>'s <code>Executors</code> and <code>Manager</code>
* using <code>Label</code>s.
*/
void deleteStatefulSets() {
try (Response response = appsClient.deleteCollectionNamespacedStatefulSetCall(getNamespace(),
null, null, null, null, null, createTopologySelectorLabels(), null, null, null, null, null,
Expand Down Expand Up @@ -383,6 +430,10 @@ protected static String setShardIdEnvironmentVariableCommand(boolean isExecutor)
return String.format(pattern, ENV_SHARD_ID, ENV_SHARD_ID);
}

/**
* Creates a headless <code>Service</code> to facilitate communication between Pods in a <code>topology</code>.
* @return A fully configured <code>Service</code> to be used by a <code>topology</code>.
*/
private V1Service createTopologyService() {
final String topologyName = getTopologyName();

Expand Down Expand Up @@ -483,14 +534,26 @@ private V1StatefulSet createStatefulSet(Resource containerResource, int numberOf
return statefulSet;
}

/**
* Extracts general Pod <code>Annotation</code>s from configurations.
* @return Key-value pairs of general <code>Annotation</code>s to be added to the Pod.
*/
private Map<String, String> getPodAnnotations() {
return KubernetesContext.getPodAnnotations(getConfiguration());
}

/**
* Extracts <code>Service Annotations</code> for configurations.
* @return Key-value pairs of service <code>Annotation</code>s to be added to the Pod.
*/
private Map<String, String> getServiceAnnotations() {
return KubernetesContext.getServiceAnnotations(getConfiguration());
}

/**
* Generates <code>Label</code>s to indicate Prometheus scraping and the exposed port.
* @return Key-value pairs of Prometheus <code>Annotation</code>s to be added to the Pod.
*/
private Map<String, String> getPrometheusAnnotations() {
final Map<String, String> annotations = new HashMap<>();
annotations.put(KubernetesConstants.ANNOTATION_PROMETHEUS_SCRAPE, "true");
Expand All @@ -500,13 +563,24 @@ private Map<String, String> getPrometheusAnnotations() {
return annotations;
}

/**
* Generates the <code>heron</code> and <code>topology</code> name <code>Match Label</code>s.
* @param topologyName Name of the <code>topology</code>.
* @return Key-value pairs of <code>Match Label</code>s to be added to the Pod.
*/
private Map<String, String> getPodMatchLabels(String topologyName) {
final Map<String, String> labels = new HashMap<>();
labels.put(KubernetesConstants.LABEL_APP, KubernetesConstants.LABEL_APP_VALUE);
labels.put(KubernetesConstants.LABEL_TOPOLOGY, topologyName);
return labels;
}

/**
* Extracts <code>Label</code>s from configurations, generates the <code>heron</code> and
* <code>topology</code> name <code>Label</code>s.
* @param topologyName Name of the <code>topology</code>.
* @return Key-value pairs of <code>Label</code>s to be added to the Pod.
*/
private Map<String, String> getPodLabels(String topologyName) {
final Map<String, String> labels = new HashMap<>();
labels.put(KubernetesConstants.LABEL_APP, KubernetesConstants.LABEL_APP_VALUE);
Expand All @@ -515,6 +589,10 @@ private Map<String, String> getPodLabels(String topologyName) {
return labels;
}

/**
* Extracts <code>Selector Labels</code> for<code>Service</code>s from configurations.
* @return Key-value pairs of <code>Service Labels</code> to be added to the Pod.
*/
private Map<String, String> getServiceLabels() {
return KubernetesContext.getServiceLabels(getConfiguration());
}
Expand Down Expand Up @@ -639,6 +717,10 @@ protected void addVolumesIfPresent(final V1PodSpec spec) {
}
}

/**
* Adds <code>Volume Mounts</code> for <code>Secrets</code> to a pod.
* @param podSpec <code>Pod Spec</code> to add secrets to.
*/
private void mountSecretsAsVolumes(V1PodSpec podSpec) {
final Config config = getConfiguration();
final Map<String, String> secrets = KubernetesContext.getPodSecretsToMount(config);
Expand Down Expand Up @@ -705,7 +787,7 @@ private void configureHeronContainer(Resource resource, int numberOfInstances,
* @param container The <code>container</code> to be configured.
* @param configuration The <code>Config</code> object to check if a resource request needs to be set.
* @param resource User defined resources limits from input.
* @param isExecutor
* @param isExecutor Flag to indicate configuration for an <code>executor</code> or <code>manager</code>.
*/
@VisibleForTesting
protected void configureContainerResources(final V1Container container,
Expand All @@ -730,7 +812,7 @@ protected void configureContainerResources(final V1Container container,
// with precedence [1] CLI, [2] Config.
final Map<String, Quantity> limits = resourceRequirements.getLimits();
final Quantity limitCPU = limitsCLI.getOrDefault(KubernetesConstants.CPU,
Quantity.fromString(Double.toString(roundDecimal(resource.getCpu(), 3))));
Quantity.fromString(Double.toString(KubernetesUtils.roundDecimal(resource.getCpu(), 3))));
final Quantity limitMEMORY = limitsCLI.getOrDefault(KubernetesConstants.MEMORY,
Quantity.fromString(KubernetesUtils.Megabytes(resource.getRam())));

Expand Down Expand Up @@ -907,6 +989,10 @@ protected void mountVolumeIfPresent(final V1Container container) {
}
}

/**
* Adds <code>Secret Key</code> references to a <code>container</code>.
* @param container <code>container</code> to be configured.
*/
private void setSecretKeyRefs(V1Container container) {
final Config config = getConfiguration();
final Map<String, String> podSecretKeyRefs = KubernetesContext.getPodSecretKeyRefs(config);
Expand All @@ -930,11 +1016,6 @@ private void setSecretKeyRefs(V1Container container) {
}
}

public static double roundDecimal(double value, int places) {
double scale = Math.pow(10, places);
return Math.round(value * scale) / scale;
}

/**
* Initiates the process of locating and loading <code>Pod Template</code> from a <code>ConfigMap</code>.
* The loaded text is then parsed into a usable <code>Pod Template</code>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,11 +566,11 @@ public void testConfigureContainerResources() {
final Quantity defaultRAM = Quantity.fromString(
KubernetesUtils.Megabytes(resourceDefault.getRam()));
final Quantity defaultCPU = Quantity.fromString(
Double.toString(V1Controller.roundDecimal(resourceDefault.getCpu(), 3)));
Double.toString(KubernetesUtils.roundDecimal(resourceDefault.getCpu(), 3)));
final Quantity customRAM = Quantity.fromString(
KubernetesUtils.Megabytes(resourceCustom.getRam()));
final Quantity customCPU = Quantity.fromString(
Double.toString(V1Controller.roundDecimal(resourceCustom.getCpu(), 3)));
Double.toString(KubernetesUtils.roundDecimal(resourceCustom.getCpu(), 3)));
final Quantity customDisk = Quantity.fromString(
KubernetesUtils.Megabytes(resourceCustom.getDisk()));

Expand Down