Skip to content

Commit ab29f5d

Browse files
nguyent8ryannedolan
authored andcommitted
Create pipeline element status tables (#135)
* Create pipeline element status tables These new tables supplement additional information about pipeline elements based on their K8s statuses.
1 parent ed1b4de commit ab29f5d

File tree

13 files changed

+377
-39
lines changed

13 files changed

+377
-39
lines changed

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sMetadata.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,25 @@
1212
/** Built-in K8s metadata tables */
1313
public class K8sMetadata extends AbstractSchema {
1414

15-
private final HoptimatorConnection connection;
1615
private final Map<String, Table> tableMap = new HashMap<>();
1716
private final K8sDatabaseTable databaseTable;
1817
private final K8sEngineTable engineTable;
1918
private final K8sPipelineTable pipelineTable;
2019
private final K8sViewTable viewTable;
2120

2221
public K8sMetadata(HoptimatorConnection connection, K8sContext context) {
23-
this.connection = connection;
22+
K8sPipelineElementApi pipelineElementApi = new K8sPipelineElementApi(context);
23+
K8sPipelineElementMapApi pipelineElementMapApi = new K8sPipelineElementMapApi(pipelineElementApi);
24+
2425
this.engineTable = new K8sEngineTable(context);
2526
this.databaseTable = new K8sDatabaseTable(context, engineTable);
2627
this.pipelineTable = new K8sPipelineTable(context);
2728
this.viewTable = new K8sViewTable(connection, context);
2829
tableMap.put("DATABASES", databaseTable);
2930
tableMap.put("ENGINES", engineTable);
3031
tableMap.put("PIPELINES", pipelineTable);
32+
tableMap.put("PIPELINE_ELEMENTS", new K8sPipelineElementTable(pipelineElementApi));
33+
tableMap.put("PIPELINE_ELEMENT_MAP", new K8sPipelineElementMapTable(pipelineElementMapApi));
3134
tableMap.put("VIEWS", viewTable);
3235
}
3336

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.linkedin.hoptimator.k8s;
2+
3+
import java.util.HashSet;
4+
import java.util.List;
5+
import java.util.Set;
6+
import java.util.stream.Collectors;
7+
8+
import com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline;
9+
import com.linkedin.hoptimator.k8s.status.K8sPipelineElementStatus;
10+
11+
/** Represents a pipeline element status and its associated pipelines. */
12+
public class K8sPipelineElement {
13+
private String name;
14+
private final K8sPipelineElementStatus status;
15+
private final Set<V1alpha1Pipeline> pipelines = new HashSet<>();
16+
17+
public K8sPipelineElement(V1alpha1Pipeline pipeline, K8sPipelineElementStatus status) {
18+
this.name = status.getName();
19+
this.status = status;
20+
this.pipelines.add(pipeline);
21+
}
22+
23+
public String name() {
24+
return name;
25+
}
26+
27+
public K8sPipelineElementStatus status() {
28+
return status;
29+
}
30+
31+
public void addPipeline(V1alpha1Pipeline pipeline) {
32+
pipelines.add(pipeline);
33+
}
34+
35+
public List<String> pipelineNames() {
36+
return pipelines.stream().map(p -> p.getMetadata().getName()).collect(Collectors.toList());
37+
}
38+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.linkedin.hoptimator.k8s;
2+
3+
import java.sql.SQLException;
4+
import java.util.Collection;
5+
import java.util.HashMap;
6+
import java.util.List;
7+
import java.util.Map;
8+
9+
import com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline;
10+
import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineList;
11+
import com.linkedin.hoptimator.k8s.status.K8sPipelineElementStatus;
12+
import com.linkedin.hoptimator.k8s.status.K8sPipelineElementStatusEstimator;
13+
import com.linkedin.hoptimator.util.Api;
14+
15+
/** Provides all pipeline elements in a {@link com.linkedin.hoptimator.k8s.K8sContext} instance. */
16+
class K8sPipelineElementApi implements Api<K8sPipelineElement> {
17+
private final K8sContext context;
18+
19+
K8sPipelineElementApi(K8sContext context) {
20+
this.context = context;
21+
}
22+
23+
private Collection<K8sPipelineElement> discoverAllElements(K8sContext context) throws SQLException {
24+
final K8sApi<V1alpha1Pipeline, V1alpha1PipelineList> pipelineApi = new K8sApi<>(context, K8sApiEndpoints.PIPELINES);
25+
Collection<V1alpha1Pipeline> pipelines = pipelineApi.list();
26+
27+
Map<String, K8sPipelineElement> elements = new HashMap<>();
28+
K8sPipelineElementStatusEstimator statusEstimator = new K8sPipelineElementStatusEstimator(context);
29+
for (V1alpha1Pipeline pipeline : pipelines) {
30+
List<K8sPipelineElementStatus> elementStatuses = statusEstimator.estimateStatuses(pipeline);
31+
for (K8sPipelineElementStatus elementStatus : elementStatuses) {
32+
String key = elementStatus.getName();
33+
if (!elements.containsKey(key)) {
34+
elements.put(key, new K8sPipelineElement(pipeline, elementStatus));
35+
}
36+
elements.get(key).addPipeline(pipeline);
37+
}
38+
}
39+
return elements.values();
40+
}
41+
42+
/**
43+
* Lists all pipeline elements in the context.
44+
*/
45+
@Override
46+
public Collection<K8sPipelineElement> list() throws SQLException {
47+
return discoverAllElements(context);
48+
}
49+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package com.linkedin.hoptimator.k8s;
2+
3+
import java.sql.SQLException;
4+
import java.util.Collection;
5+
import java.util.stream.Collectors;
6+
import java.util.stream.Stream;
7+
8+
import com.linkedin.hoptimator.util.Api;
9+
10+
/** Provides the n:m mapping information between pipelines and their elements. */
11+
class K8sPipelineElementMapApi implements Api<K8sPipelineElementMapEntry> {
12+
13+
private final K8sPipelineElementApi pipelineElementApi;
14+
15+
K8sPipelineElementMapApi(K8sPipelineElementApi pipelineElementApi) {
16+
this.pipelineElementApi = pipelineElementApi;
17+
}
18+
19+
/**
20+
* Lists all n:m mapping information between pipelines and their elements.
21+
*/
22+
@Override
23+
public Collection<K8sPipelineElementMapEntry> list() throws SQLException {
24+
return pipelineElementApi.list()
25+
.stream()
26+
.flatMap(K8sPipelineElementMapApi::mapEntriesFromElement)
27+
.collect(Collectors.toList());
28+
}
29+
30+
private static Stream<K8sPipelineElementMapEntry> mapEntriesFromElement(K8sPipelineElement element) {
31+
String elementName = element.status().getName();
32+
return element.pipelineNames()
33+
.stream()
34+
.map(pipelineName -> new K8sPipelineElementMapEntry(elementName, pipelineName));
35+
}
36+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.linkedin.hoptimator.k8s;
2+
3+
/** Maps an element name to the name of one of its pipelines. */
4+
class K8sPipelineElementMapEntry {
5+
private String elementName;
6+
private String pipelineName;
7+
8+
K8sPipelineElementMapEntry(String elementName, String pipelineName) {
9+
this.elementName = elementName;
10+
this.pipelineName = pipelineName;
11+
}
12+
13+
public String elementName() {
14+
return elementName;
15+
}
16+
17+
public String pipelineName() {
18+
return pipelineName;
19+
}
20+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package com.linkedin.hoptimator.k8s;
2+
3+
import org.apache.calcite.schema.Schema;
4+
5+
import com.linkedin.hoptimator.util.RemoteTable;
6+
7+
/** Provides n:m mapping between pipeline elements and their pipelines. */
8+
public class K8sPipelineElementMapTable
9+
extends RemoteTable<K8sPipelineElementMapEntry, K8sPipelineElementMapTable.Row> {
10+
11+
// CHECKSTYLE:OFF
12+
public static class Row {
13+
public String ELEMENT_NAME;
14+
public String PIPELINE_NAME;
15+
16+
public Row(String elementName, String pipelineName) {
17+
this.ELEMENT_NAME = elementName;
18+
this.PIPELINE_NAME = pipelineName;
19+
}
20+
21+
@Override
22+
public String toString() {
23+
return String.join("\t", ELEMENT_NAME, PIPELINE_NAME);
24+
}
25+
}
26+
// CHECKSTYLE:ON
27+
28+
public K8sPipelineElementMapTable(K8sPipelineElementMapApi pipelineElementMapApi) {
29+
super(pipelineElementMapApi, Row.class);
30+
}
31+
32+
@Override
33+
public Row toRow(K8sPipelineElementMapEntry k8sDynamicPipelineElement) {
34+
return new Row(k8sDynamicPipelineElement.elementName(), k8sDynamicPipelineElement.pipelineName());
35+
}
36+
37+
@Override
38+
public Schema.TableType getJdbcTableType() {
39+
return Schema.TableType.SYSTEM_TABLE;
40+
}
41+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package com.linkedin.hoptimator.k8s;
2+
3+
import org.apache.calcite.schema.Schema;
4+
5+
import com.linkedin.hoptimator.k8s.status.K8sPipelineElementStatus;
6+
import com.linkedin.hoptimator.util.RemoteTable;
7+
8+
public class K8sPipelineElementTable extends RemoteTable<K8sPipelineElement, K8sPipelineElementTable.Row> {
9+
10+
// CHECKSTYLE:OFF
11+
public static class Row {
12+
public String NAME;
13+
public boolean READY;
14+
public boolean FAILED;
15+
public String MESSAGE;
16+
17+
public Row(String name, boolean ready, boolean failed, String message) {
18+
this.NAME = name;
19+
this.READY = ready;
20+
this.FAILED = failed;
21+
this.MESSAGE = message;
22+
}
23+
24+
@Override
25+
public String toString() {
26+
return String.join("\t", NAME, String.valueOf(READY), String.valueOf(FAILED), MESSAGE);
27+
}
28+
}
29+
// CHECKSTYLE:ON
30+
31+
public K8sPipelineElementTable(K8sPipelineElementApi pipelineElementApi) {
32+
super(pipelineElementApi, Row.class);
33+
}
34+
35+
@Override
36+
public Row toRow(K8sPipelineElement k8sPipelineElement) {
37+
K8sPipelineElementStatus status = k8sPipelineElement.status();
38+
return new Row(status.getName(), status.isReady(), status.isFailed(), status.getMessage());
39+
}
40+
41+
@Override
42+
public Schema.TableType getJdbcTableType() {
43+
return Schema.TableType.SYSTEM_TABLE;
44+
}
45+
}

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineTable.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,27 @@
44

55
import com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline;
66
import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineList;
7-
7+
import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineStatus;
88

99
public class K8sPipelineTable extends K8sTable<V1alpha1Pipeline, V1alpha1PipelineList, K8sPipelineTable.Row> {
1010

1111
// CHECKSTYLE:OFF
1212
public static class Row {
1313
public String NAME;
14-
public String STATUS;
14+
public boolean READY;
15+
public boolean FAILED;
16+
public String MESSAGE;
1517

16-
public Row(String name, String status) {
18+
public Row(String name, boolean ready, boolean failed, String message) {
1719
this.NAME = name;
18-
this.STATUS = status;
20+
this.READY = ready;
21+
this.FAILED = failed;
22+
this.MESSAGE = message;
1923
}
2024

2125
@Override
2226
public String toString() {
23-
return String.join("\t", NAME, STATUS);
27+
return String.join("\t", NAME, String.valueOf(READY), String.valueOf(FAILED), MESSAGE);
2428
}
2529
}
2630
// CHECKSTYLE:ON
@@ -31,7 +35,8 @@ public K8sPipelineTable(K8sContext context) {
3135

3236
@Override
3337
public Row toRow(V1alpha1Pipeline obj) {
34-
return new Row(obj.getMetadata().getName(), obj.getStatus().getMessage());
38+
V1alpha1PipelineStatus status = obj.getStatus();
39+
return new Row(obj.getMetadata().getName(), status.getReady(), status.getFailed(), status.getMessage());
3540
}
3641

3742
@Override

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTable.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@
66
import com.linkedin.hoptimator.util.RemoteTable;
77

88

9-
public abstract class K8sTable<T extends KubernetesObject, U extends KubernetesListObject, V>
10-
extends RemoteTable<T, V> {
9+
public abstract class K8sTable<ObjectType extends KubernetesObject, ObjectListType extends KubernetesListObject, RowType>
10+
extends RemoteTable<ObjectType, RowType> {
1111

1212
private final K8sContext context;
1313
private final K8sApiEndpoint endpoint;
1414

15-
public K8sTable(K8sContext context, K8sApiEndpoint<T, U> endpoint, Class<V> v) {
16-
super(new K8sApi<T, U>(context, endpoint), v);
15+
public K8sTable(K8sContext context, K8sApiEndpoint<ObjectType, ObjectListType> endpoint, Class<RowType> v) {
16+
super(new K8sApi<ObjectType, ObjectListType>(context, endpoint), v);
1717
this.context = context;
1818
this.endpoint = endpoint;
1919
}

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/status/K8sPipelineElementStatusEstimator.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,20 @@
11
package com.linkedin.hoptimator.k8s.status;
22

3-
import com.google.gson.JsonObject;
4-
import com.linkedin.hoptimator.k8s.K8sContext;
5-
import com.linkedin.hoptimator.k8s.K8sUtils;
6-
import com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline;
7-
import io.kubernetes.client.util.generic.KubernetesApiResponse;
8-
import io.kubernetes.client.util.generic.dynamic.DynamicKubernetesObject;
9-
import io.kubernetes.client.util.generic.dynamic.Dynamics;
10-
113
import java.util.Arrays;
124
import java.util.List;
135
import java.util.stream.Collectors;
146

157
import org.slf4j.Logger;
168
import org.slf4j.LoggerFactory;
179

10+
import com.google.gson.JsonObject;
11+
import io.kubernetes.client.util.generic.KubernetesApiResponse;
12+
import io.kubernetes.client.util.generic.dynamic.DynamicKubernetesObject;
13+
import io.kubernetes.client.util.generic.dynamic.Dynamics;
14+
15+
import com.linkedin.hoptimator.k8s.K8sContext;
16+
import com.linkedin.hoptimator.k8s.K8sUtils;
17+
import com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline;
1818

1919
/**
2020
* Estimates or guesses the status of an element of a {@link com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline} by inspecting its internal state.
@@ -48,28 +48,29 @@ private K8sPipelineElementStatus estimateElementStatus(String elementYaml, Strin
4848
String name = obj.getMetadata().getName();
4949
String namespace = obj.getMetadata().getNamespace() == null ? pipelineNamespace : obj.getMetadata().getNamespace();
5050
String kind = obj.getKind();
51+
String nameWithKind = String.format("%s/%s", kind, name);
5152
try {
5253
KubernetesApiResponse<DynamicKubernetesObject> existing =
5354
context.dynamic(obj.getApiVersion(), K8sUtils.guessPlural(obj)).get(namespace, name);
5455
String failureMessage =
55-
String.format("Failed to fetch %s/%s in namespace %s: %s.", kind, name, namespace, existing.toString());
56+
String.format("Failed to fetch %s in namespace %s: %s.", nameWithKind, namespace, existing.toString());
5657
existing.onFailure((code, status) -> log.warn(failureMessage));
5758
if (!existing.isSuccess()) {
58-
return defaultUnreadyStatusOnK8sObjectRetrievalFailure(name, failureMessage);
59+
return defaultUnreadyStatusOnK8sObjectRetrievalFailure(nameWithKind, failureMessage);
5960
}
60-
K8sPipelineElementStatus elementStatus = estimateDynamicObjectStatus(name, existing.getObject());
61+
K8sPipelineElementStatus elementStatus = estimateDynamicObjectStatus(nameWithKind, existing.getObject());
6162
if (elementStatus.isReady()) {
62-
log.info("{}/{} is ready.", kind, name);
63+
log.info("{} is ready.", nameWithKind);
6364
} else {
64-
log.info("{}/{} is NOT ready.", kind, name);
65+
log.info("{} is NOT ready.", nameWithKind);
6566
}
6667
return elementStatus;
6768
} catch (Exception e) {
6869
String failureMessage =
69-
String.format("Encountered exception while checking status of %s/%s in namespace %s: %s", kind, name,
70+
String.format("Encountered exception while checking status of %s in namespace %s: %s", nameWithKind,
7071
namespace, e);
7172
log.error(failureMessage);
72-
return defaultUnreadyStatusOnK8sObjectRetrievalFailure(name, failureMessage);
73+
return defaultUnreadyStatusOnK8sObjectRetrievalFailure(nameWithKind, failureMessage);
7374
}
7475
}
7576

0 commit comments

Comments
 (0)