Skip to content

Commit 8d9dd4f

Browse files
colesnodgrassakashkulk
authored andcommitted
add initContainer to container-orchestrator pod definition (#19088)
* init attempt at initcontainer * wait for init container to be up instead of main container * copy files to init container * Revert "Bmoric/extract webbackend api (#18988)" This reverts commit b05a5b2. * Revert "Revert "Bmoric/extract webbackend api (#18988)"" This reverts commit ebef6e4. * block on initContainer status; cleanup init script * add log messages * add quotes to log messages * pr feedback, add comment to bash script
1 parent 8fc81f1 commit 8d9dd4f

File tree

2 files changed

+48
-16
lines changed

2 files changed

+48
-16
lines changed

airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,10 @@
3939
* application. Unlike {@link KubePodProcess} there is no heartbeat mechanism that requires the
4040
* launching pod and the launched pod to co-exist for the duration of execution for the launched
4141
* pod.
42-
*
42+
* <p>
4343
* Instead, this process creates the pod and interacts with a document store on cloud storage to
4444
* understand the state of the created pod.
45-
*
45+
* <p>
4646
* The document store is considered to be the truth when retrieving the status for an async pod
4747
* process. If the store isn't updated by the underlying pod, it will appear as failed.
4848
*/
@@ -190,10 +190,12 @@ public boolean hasExited() {
190190
public boolean waitFor(final long timeout, final TimeUnit unit) throws InterruptedException {
191191
// implementation copied from Process.java since this isn't a real Process
192192
long remainingNanos = unit.toNanos(timeout);
193-
if (hasExited())
193+
if (hasExited()) {
194194
return true;
195-
if (timeout <= 0)
195+
}
196+
if (timeout <= 0) {
196197
return false;
198+
}
197199

198200
final long deadline = System.nanoTime() + remainingNanos;
199201
do {
@@ -202,8 +204,9 @@ public boolean waitFor(final long timeout, final TimeUnit unit) throws Interrupt
202204
// We are waiting polling every 500ms for status. The trade-off here is between how often
203205
// we poll our status storage (GCS) and how reactive we are to detect that a process is done.
204206
Thread.sleep(Math.min(TimeUnit.NANOSECONDS.toMillis(remainingNanos) + 1, 500));
205-
if (hasExited())
207+
if (hasExited()) {
206208
return true;
209+
}
207210
remainingNanos = deadline - System.nanoTime();
208211
} while (remainingNanos > 0);
209212

@@ -236,7 +239,7 @@ private boolean checkStatus(final AsyncKubePodStatus status) {
236239

237240
/**
238241
* Checks terminal states first, then running, then initialized. Defaults to not started.
239-
*
242+
* <p>
240243
* The order matters here!
241244
*/
242245
public AsyncKubePodStatus getDocStoreStatus() {
@@ -298,6 +301,33 @@ public void create(final Map<String, String> allLabels,
298301
final List<ContainerPort> containerPorts = KubePodProcess.createContainerPortList(portMap);
299302
containerPorts.add(new ContainerPort(serverPort, null, null, null, null));
300303

304+
final var initContainer = new ContainerBuilder()
305+
.withName(KubePodProcess.INIT_CONTAINER_NAME)
306+
.withImage("busybox:1.35")
307+
.withVolumeMounts(volumeMounts)
308+
.withCommand(List.of(
309+
"sh",
310+
"-c",
311+
String.format("""
312+
i=0
313+
until [ $i -gt 60 ]
314+
do
315+
echo "$i - waiting for config file transfer to complete..."
316+
# check if the upload-complete file exists, if so exit without error
317+
if [ -f "%s/%s" ]; then
318+
exit 0
319+
fi
320+
i=$((i+1))
321+
sleep 1
322+
done
323+
echo "config files did not transfer in time"
324+
# no upload-complete file was created in time, exit with error
325+
exit 1
326+
""",
327+
KubePodProcess.CONFIG_DIR,
328+
KubePodProcess.SUCCESS_FILE_NAME)))
329+
.build();
330+
301331
final var mainContainer = new ContainerBuilder()
302332
.withName(KubePodProcess.MAIN_CONTAINER_NAME)
303333
.withImage(kubePodInfo.mainContainerInfo().image())
@@ -316,9 +346,11 @@ public void create(final Map<String, String> allLabels,
316346
.withLabels(allLabels)
317347
.endMetadata()
318348
.withNewSpec()
319-
.withServiceAccount("airbyte-admin").withAutomountServiceAccountToken(true)
349+
.withServiceAccount("airbyte-admin")
350+
.withAutomountServiceAccountToken(true)
320351
.withRestartPolicy("Never")
321352
.withContainers(mainContainer)
353+
.withInitContainers(initContainer)
322354
.withVolumes(volumes)
323355
.endSpec()
324356
.build();
@@ -332,9 +364,9 @@ public void create(final Map<String, String> allLabels,
332364
kubernetesClient.pods()
333365
.inNamespace(kubePodInfo.namespace())
334366
.withName(kubePodInfo.name())
335-
.waitUntilCondition(p -> {
336-
return !p.getStatus().getContainerStatuses().isEmpty() && p.getStatus().getContainerStatuses().get(0).getState().getWaiting() == null;
337-
}, 5, TimeUnit.MINUTES);
367+
.waitUntilCondition(p -> !p.getStatus().getInitContainerStatuses().isEmpty()
368+
&& p.getStatus().getInitContainerStatuses().get(0).getState().getWaiting() == null,
369+
5, TimeUnit.MINUTES);
338370

339371
final var podStatus = kubernetesClient.pods()
340372
.inNamespace(kubePodInfo.namespace())
@@ -343,7 +375,7 @@ public void create(final Map<String, String> allLabels,
343375
.getStatus();
344376

345377
final var containerState = podStatus
346-
.getContainerStatuses()
378+
.getInitContainerStatuses()
347379
.get(0)
348380
.getState();
349381

@@ -378,7 +410,7 @@ public static void copyFilesToKubeConfigVolumeMain(final Pod podDefinition, fina
378410
// several issues with copying files. See https://github.com/airbytehq/airbyte/issues/8643 for
379411
// details.
380412
final String command = String.format("kubectl cp %s %s/%s:%s -c %s", tmpFile, podDefinition.getMetadata().getNamespace(),
381-
podDefinition.getMetadata().getName(), containerPath, "main");
413+
podDefinition.getMetadata().getName(), containerPath, KubePodProcess.INIT_CONTAINER_NAME);
382414
log.info(command);
383415

384416
proc = Runtime.getRuntime().exec(command);

airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubePodProcess.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@
6767
* parent process starting a Kube Pod Process needs to exist within the Kube networking space. This
6868
* is so the parent process can forward data into the child's stdin and read the child's stdout and
6969
* stderr streams and copy configuration files over.
70-
*
70+
* <p>
7171
* This is made possible by:
7272
* <ul>
7373
* <li>1) An init container that creates 3 named pipes corresponding to stdin, stdout and std err on
@@ -91,7 +91,7 @@
9191
* </ul>
9292
* The docker image used for this pod process must expose a AIRBYTE_ENTRYPOINT which contains the
9393
* entrypoint we will wrap when creating the main container in the pod.
94-
*
94+
* <p>
9595
* See the constructor for more information.
9696
*/
9797

@@ -104,7 +104,7 @@ public class KubePodProcess extends Process implements KubePod {
104104
private static final Logger LOGGER = LoggerFactory.getLogger(KubePodProcess.class);
105105

106106
public static final String MAIN_CONTAINER_NAME = "main";
107-
private static final String INIT_CONTAINER_NAME = "init";
107+
public static final String INIT_CONTAINER_NAME = "init";
108108
private static final String DEFAULT_MEMORY_REQUEST = "25Mi";
109109
private static final String DEFAULT_MEMORY_LIMIT = "50Mi";
110110
private static final String DEFAULT_CPU_REQUEST = "0.1";
@@ -701,7 +701,7 @@ public KubePodInfo getInfo() {
701701

702702
/**
703703
* Close all open resource in the opposite order of resource creation.
704-
*
704+
* <p>
705705
* Null checks exist because certain local Kube clusters (e.g. Docker for Desktop) back this
706706
* implementation with OS processes and resources, which are automatically reaped by the OS.
707707
*/

0 commit comments

Comments
 (0)