Skip to content
This repository was archived by the owner on Mar 3, 2023. It is now read-only.

Commit 9af45c7

Browse files
authored
Improving Kubernetes scheduler logic (#3653)
* Added support for HTTP_NOT_FOUND response code * Updated to use try-with-resources logic for Response cleanup * More cleanup. Now throwing TopologyRuntimeManagementException in more places
1 parent 0025251 commit 9af45c7

File tree

2 files changed

+55
-32
lines changed

2 files changed

+55
-32
lines changed

WORKSPACE

+2-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ jetty_version = "9.4.6.v20170531"
5454

5555
jersey_version = "2.25.1"
5656

57-
kubernetes_client_version = "8.0.0"
57+
kubernetes_client_version = "11.0.0"
5858

5959
load("@rules_jvm_external//:defs.bzl", "maven_install")
6060
load("@rules_jvm_external//:specs.bzl", "maven")
@@ -263,6 +263,7 @@ http_archive(
263263
http_archive(
264264
name = "org_apache_zookeeper",
265265
build_file = "@//:third_party/zookeeper/BUILD",
266+
sha256 = "bafc0abe7da696a2020ba11b8ce7d06f6e28e9bf1e5504de09be25b8b589777d",
266267
strip_prefix = "apache-zookeeper-3.5.8",
267268
urls = ["https://archive.apache.org/dist/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8.tar.gz"],
268269
)

heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java

+53-31
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@
7070

7171
import okhttp3.Response;
7272

73+
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
74+
7375
public class V1Controller extends KubernetesController {
7476

7577
private static final Logger LOG =
@@ -219,54 +221,74 @@ V1StatefulSet getStatefulSet() throws ApiException {
219221
null, null, null);
220222
}
221223

222-
boolean deleteService() {
223-
try {
224-
final Response response = coreClient.deleteNamespacedServiceCall(getTopologyName(),
224+
void deleteService() {
225+
try (Response response = coreClient.deleteNamespacedServiceCall(getTopologyName(),
225226
getNamespace(), null, null, 0, null,
226-
KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY, null, null).execute();
227-
228-
if (response.isSuccessful()) {
229-
LOG.log(Level.INFO, "Headless Service for the Job [" + getTopologyName()
230-
+ "] in namespace [" + getNamespace() + "] is deleted.");
231-
return true;
232-
} else {
227+
KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY, null, null).execute()) {
228+
229+
if (!response.isSuccessful()) {
230+
if (response.code() == HTTP_NOT_FOUND) {
231+
LOG.log(Level.WARNING, "Deleting non-existent Kubernetes headless service for Topology: "
232+
+ getTopologyName());
233+
return;
234+
}
233235
LOG.log(Level.SEVERE, "Error when deleting the Service of the job ["
234-
+ getTopologyName() + "] in namespace [" + getNamespace() + "]");
235-
LOG.log(Level.SEVERE, "Error killing topoogy message:" + response.message());
236+
+ getTopologyName() + "] in namespace [" + getNamespace() + "]");
237+
LOG.log(Level.SEVERE, "Error killing topology message:" + response.message());
236238
KubernetesUtils.logResponseBodyIfPresent(LOG, response);
237239

238240
throw new TopologyRuntimeManagementException(
239-
KubernetesUtils.errorMessageFromResponse(response));
241+
KubernetesUtils.errorMessageFromResponse(response));
240242
}
241-
} catch (IOException | ApiException e) {
242-
KubernetesUtils.logExceptionWithDetails(LOG, "Error deleting topology service", e);
243-
return false;
243+
} catch (ApiException e) {
244+
if (e.getCode() == HTTP_NOT_FOUND) {
245+
LOG.log(Level.WARNING, "Tried to delete a non-existent Kubernetes service for Topology: "
246+
+ getTopologyName());
247+
return;
248+
}
249+
throw new TopologyRuntimeManagementException("Error deleting topology ["
250+
+ getTopologyName() + "] Kubernetes service", e);
251+
} catch (IOException e) {
252+
throw new TopologyRuntimeManagementException("Error deleting topology ["
253+
+ getTopologyName() + "] Kubernetes service", e);
244254
}
255+
LOG.log(Level.INFO, "Headless Service for the Job [" + getTopologyName()
256+
+ "] in namespace [" + getNamespace() + "] is deleted.");
245257
}
246258

247-
boolean deleteStatefulSet() {
248-
try {
249-
final Response response = appsClient.deleteNamespacedStatefulSetCall(getTopologyName(),
259+
void deleteStatefulSet() {
260+
try (Response response = appsClient.deleteNamespacedStatefulSetCall(getTopologyName(),
250261
getNamespace(), null, null, 0, null,
251-
KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY, null, null).execute();
252-
253-
if (response.isSuccessful()) {
254-
LOG.log(Level.INFO, "StatefulSet for the Job [" + getTopologyName()
255-
+ "] in namespace [" + getNamespace() + "] is deleted.");
256-
return true;
257-
} else {
262+
KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY, null, null).execute()) {
263+
264+
if (!response.isSuccessful()) {
265+
if (response.code() == HTTP_NOT_FOUND) {
266+
LOG.log(Level.WARNING, "Tried to delete a non-existent StatefulSet for Topology: "
267+
+ getTopologyName());
268+
return;
269+
}
258270
LOG.log(Level.SEVERE, "Error when deleting the StatefulSet of the job ["
259-
+ getTopologyName() + "] in namespace [" + getNamespace() + "]");
271+
+ getTopologyName() + "] in namespace [" + getNamespace() + "]");
260272
LOG.log(Level.SEVERE, "Error killing topology message: " + response.message());
261273
KubernetesUtils.logResponseBodyIfPresent(LOG, response);
262274

263275
throw new TopologyRuntimeManagementException(
264-
KubernetesUtils.errorMessageFromResponse(response));
276+
KubernetesUtils.errorMessageFromResponse(response));
265277
}
266-
} catch (IOException | ApiException e) {
267-
KubernetesUtils.logExceptionWithDetails(LOG, "Error deleting topology", e);
268-
return false;
278+
} catch (ApiException e) {
279+
if (e.getCode() == HTTP_NOT_FOUND) {
280+
LOG.log(Level.WARNING, "Tried to delete a non-existent StatefulSet for Topology: "
281+
+ getTopologyName());
282+
return;
283+
}
284+
throw new TopologyRuntimeManagementException("Error deleting topology ["
285+
+ getTopologyName() + "] Kubernetes StatefulSet", e);
286+
} catch (IOException e) {
287+
throw new TopologyRuntimeManagementException("Error deleting topology ["
288+
+ getTopologyName() + "] Kubernetes StatefulSet", e);
269289
}
290+
LOG.log(Level.INFO, "StatefulSet for the Job [" + getTopologyName()
291+
+ "] in namespace [" + getNamespace() + "] is deleted.");
270292
}
271293

272294
protected List<String> getExecutorCommand(String containerId) {

0 commit comments

Comments
 (0)