48
48
import io .kubernetes .client .openapi .ApiException ;
49
49
import io .kubernetes .client .openapi .Configuration ;
50
50
import io .kubernetes .client .openapi .apis .AppsV1Api ;
51
+ import io .kubernetes .client .openapi .apis .CoreV1Api ;
51
52
import io .kubernetes .client .openapi .models .V1Container ;
52
53
import io .kubernetes .client .openapi .models .V1ContainerPort ;
53
54
import io .kubernetes .client .openapi .models .V1EnvVar ;
58
59
import io .kubernetes .client .openapi .models .V1PodSpec ;
59
60
import io .kubernetes .client .openapi .models .V1PodTemplateSpec ;
60
61
import io .kubernetes .client .openapi .models .V1ResourceRequirements ;
62
+ import io .kubernetes .client .openapi .models .V1Service ;
63
+ import io .kubernetes .client .openapi .models .V1ServiceSpec ;
61
64
import io .kubernetes .client .openapi .models .V1StatefulSet ;
62
65
import io .kubernetes .client .openapi .models .V1StatefulSetSpec ;
63
66
import io .kubernetes .client .openapi .models .V1Toleration ;
66
69
67
70
import okhttp3 .Response ;
68
71
69
- public class AppsV1Controller extends KubernetesController {
72
+ public class V1Controller extends KubernetesController {
70
73
71
74
private static final Logger LOG =
72
- Logger .getLogger (AppsV1Controller .class .getName ());
75
+ Logger .getLogger (V1Controller .class .getName ());
73
76
74
77
private static final String ENV_SHARD_ID = "SHARD_ID" ;
75
78
76
79
private final AppsV1Api appsClient ;
80
+ private final CoreV1Api coreClient ;
77
81
78
- AppsV1Controller (Config configuration , Config runtimeConfiguration ) {
82
+ V1Controller (Config configuration , Config runtimeConfiguration ) {
79
83
super (configuration , runtimeConfiguration );
80
84
try {
81
85
final ApiClient apiClient = io .kubernetes .client .util .Config .defaultClient ();
82
86
Configuration .setDefaultApiClient (apiClient );
83
87
appsClient = new AppsV1Api (apiClient );
88
+ coreClient = new CoreV1Api (apiClient );
84
89
} catch (IOException e ) {
85
90
LOG .log (Level .SEVERE , "Failed to setup Kubernetes client" + e );
86
91
throw new RuntimeException (e );
@@ -96,6 +101,16 @@ boolean submit(PackingPlan packingPlan) {
96
101
97
102
final Resource containerResource = getContainerResource (packingPlan );
98
103
104
+ final V1Service topologyService = createTopologyyService ();
105
+ try {
106
+ final V1Service response =
107
+ coreClient .createNamespacedService (getNamespace (), topologyService , null ,
108
+ null , null );
109
+ } catch (ApiException e ) {
110
+ KubernetesUtils .logExceptionWithDetails (LOG , "Error creating topology service" , e );
111
+ throw new TopologySubmissionException (e .getMessage ());
112
+ }
113
+
99
114
// find the max number of instances in a container so we can open
100
115
// enough ports if remote debugging is enabled.
101
116
int numberOfInstances = 0 ;
@@ -118,11 +133,9 @@ boolean submit(PackingPlan packingPlan) {
118
133
119
134
@ Override
120
135
boolean killTopology () {
121
- return
122
- isStatefulSet ()
123
- ? deleteStatefulSet ()
124
- :
125
- new KubernetesCompat ().killTopology (getKubernetesUri (), getTopologyName (), getNamespace ());
136
+ deleteStatefulSet ();
137
+ deleteService ();
138
+ return true ;
126
139
}
127
140
128
141
@ Override
@@ -199,6 +212,31 @@ V1StatefulSet getStatefulSet() throws ApiException {
199
212
null , null , null );
200
213
}
201
214
215
+ boolean deleteService () {
216
+ try {
217
+ final Response response = coreClient .deleteNamespacedServiceCall (getTopologyName (),
218
+ getNamespace (), null , null , 0 , null ,
219
+ KubernetesConstants .DELETE_OPTIONS_PROPAGATION_POLICY , null , null ).execute ();
220
+
221
+ if (response .isSuccessful ()) {
222
+ LOG .log (Level .INFO , "Headless Service for the Job [" + getTopologyName ()
223
+ + "] in namespace [" + getNamespace () + "] is deleted." );
224
+ return true ;
225
+ } else {
226
+ LOG .log (Level .SEVERE , "Error when deleting the Service of the job ["
227
+ + getTopologyName () + "] in namespace [" + getNamespace () + "]" );
228
+ LOG .log (Level .SEVERE , "Error killing topoogy message:" + response .message ());
229
+ KubernetesUtils .logResponseBodyIfPresent (LOG , response );
230
+
231
+ throw new TopologyRuntimeManagementException (
232
+ KubernetesUtils .errorMessageFromResponse (response ));
233
+ }
234
+ } catch (IOException | ApiException e ) {
235
+ KubernetesUtils .logExceptionWithDetails (LOG , "Error deleting topology service" , e );
236
+ return false ;
237
+ }
238
+ }
239
+
202
240
boolean deleteStatefulSet () {
203
241
try {
204
242
final Response response = appsClient .deleteNamespacedStatefulSetCall (getTopologyName (),
@@ -211,7 +249,7 @@ boolean deleteStatefulSet() {
211
249
return true ;
212
250
} else {
213
251
LOG .log (Level .SEVERE , "Error when deleting the StatefulSet of the job ["
214
- + getTopologyName () + "]: in namespace [" + getNamespace () + "]" );
252
+ + getTopologyName () + "] in namespace [" + getNamespace () + "]" );
215
253
LOG .log (Level .SEVERE , "Error killing topology message: " + response .message ());
216
254
KubernetesUtils .logResponseBodyIfPresent (LOG , response );
217
255
@@ -224,18 +262,6 @@ boolean deleteStatefulSet() {
224
262
}
225
263
}
226
264
227
- boolean isStatefulSet () {
228
- try {
229
- final V1StatefulSet response =
230
- appsClient .readNamespacedStatefulSet (getTopologyName (), getNamespace (),
231
- null , null , null );
232
- return response .getKind ().equals ("StatefulSet" );
233
- } catch (ApiException e ) {
234
- LOG .warning ("isStatefulSet check " + e .getMessage ());
235
- }
236
- return false ;
237
- }
238
-
239
265
protected List <String > getExecutorCommand (String containerId ) {
240
266
final Map <ExecutorPort , String > ports =
241
267
KubernetesConstants .EXECUTOR_PORTS .entrySet ()
@@ -262,6 +288,26 @@ private static String setShardIdEnvironmentVariableCommand() {
262
288
return String .format ("%s=${POD_NAME##*-} && echo shardId=${%s}" , ENV_SHARD_ID , ENV_SHARD_ID );
263
289
}
264
290
291
+ private V1Service createTopologyyService () {
292
+ final String topologyName = getTopologyName ();
293
+ final Config runtimeConfiguration = getRuntimeConfiguration ();
294
+
295
+ final V1Service service = new V1Service ();
296
+
297
+ // setup service metadata
298
+ final V1ObjectMeta objectMeta = new V1ObjectMeta ();
299
+ objectMeta .name (topologyName );
300
+ service .setMetadata (objectMeta );
301
+
302
+ // create the headless service
303
+ final V1ServiceSpec serviceSpec = new V1ServiceSpec ();
304
+ serviceSpec .clusterIP ("None" );
305
+ serviceSpec .setSelector (getMatchLabels (topologyName ));
306
+
307
+ service .setSpec (serviceSpec );
308
+
309
+ return service ;
310
+ }
265
311
266
312
private V1StatefulSet createStatefulSet (Resource containerResource , int numberOfInstances ) {
267
313
final String topologyName = getTopologyName ();
0 commit comments