@@ -8,7 +8,7 @@ import io.airbyte.featureflag.UseCustomK8sInitCheck
8
8
import io.airbyte.metrics.lib.MetricAttribute
9
9
import io.airbyte.metrics.lib.MetricClient
10
10
import io.airbyte.metrics.lib.OssMetricsRegistry
11
- import io.airbyte.workers.process.KubePodResourceHelper
11
+ import io.airbyte.workers.pod.ContainerConstants
12
12
import io.airbyte.workload.launcher.pods.KubePodLauncher.Constants.FABRIC8_COMPLETED_REASON_VALUE
13
13
import io.airbyte.workload.launcher.pods.KubePodLauncher.Constants.KUBECTL_COMPLETED_VALUE
14
14
import io.airbyte.workload.launcher.pods.KubePodLauncher.Constants.KUBECTL_PHASE_FIELD_NAME
@@ -199,7 +199,7 @@ class KubePodLauncher(
199
199
.waitUntilCondition(
200
200
{ p: Pod ? ->
201
201
Objects .nonNull(p) &&
202
- (Readiness .getInstance().isReady(p) || KubePodResourceHelper . isTerminal(p))
202
+ (Readiness .getInstance().isReady(p) || isTerminal(p))
203
203
},
204
204
waitDuration.toMinutes(),
205
205
TimeUnit .MINUTES ,
@@ -220,7 +220,7 @@ class KubePodLauncher(
220
220
.waitUntilCondition(
221
221
{ p: Pod ? ->
222
222
Objects .nonNull(p) &&
223
- (Readiness .getInstance().isReady(p) || KubePodResourceHelper . isTerminal(p))
223
+ (Readiness .getInstance().isReady(p) || isTerminal(p))
224
224
},
225
225
waitDuration.toMinutes(),
226
226
TimeUnit .MINUTES ,
@@ -240,7 +240,7 @@ class KubePodLauncher(
240
240
.list()
241
241
.items
242
242
.stream()
243
- .filter { kubePod: Pod -> ! KubePodResourceHelper . isTerminal(kubePod) && ! PodStatusUtil .isInitializing(kubePod) }
243
+ .filter { kubePod: Pod -> ! isTerminal(kubePod) && ! PodStatusUtil .isInitializing(kubePod) }
244
244
.findAny()
245
245
.isPresent
246
246
},
@@ -292,6 +292,34 @@ class KubePodLauncher(
292
292
)
293
293
}
294
294
295
+ /* *
296
+ * Checks that the pod's main container(s) are in a terminal state.
297
+ */
298
+ private fun isTerminal (pod : Pod ? ): Boolean {
299
+ // if pod is null or there is no status default to false.
300
+ if (pod?.status == null ) {
301
+ return false
302
+ }
303
+
304
+ // Get statuses for all "non-init" containers.
305
+ val mainContainerStatuses =
306
+ pod.status
307
+ .containerStatuses
308
+ .stream()
309
+ .filter { containerStatus -> (ContainerConstants .INIT_CONTAINER_NAME ) != containerStatus.name }
310
+ .toList()
311
+
312
+ // There should be at least 1 container with a status.
313
+ if (mainContainerStatuses.size < 1 ) {
314
+ logger.warn { " Unexpectedly no non-init container statuses found for pod: ${pod.fullResourceName} " }
315
+ return false
316
+ }
317
+
318
+ return mainContainerStatuses.all {
319
+ it.state?.terminated != null
320
+ }
321
+ }
322
+
295
323
private fun listActivePods (labels : Map <String , String >): FilterWatchListDeletable <Pod , PodList , PodResource > {
296
324
return kubernetesClient.pods()
297
325
.inNamespace(namespace)
0 commit comments