Skip to content

Commit b55ae6f

Browse files
pmossmanetsybaev
authored andcommitted
Add timeout to connector pod init container command (#10592)
* add timeout to init container command * add disk usage check into init command * fix up disk usage checking and logs from init entrypoint * run format
1 parent 4f24b09 commit b55ae6f

File tree

2 files changed

+47
-8
lines changed

2 files changed

+47
-8
lines changed

airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,12 @@ public class KubePodProcess extends Process implements KubePod {
119119
private static final int KILLED_EXIT_CODE = 143;
120120
private static final int STDIN_REMOTE_PORT = 9001;
121121

122+
// init container should fail if no new data copied into the init container within
123+
// INIT_RETRY_TIMEOUT_MINUTES
124+
private static final double INIT_SLEEP_PERIOD_SECONDS = 0.1;
125+
private static final Duration INIT_RETRY_TIMEOUT_MINUTES = Duration.ofMinutes(1);
126+
private static final int INIT_RETRY_MAX_ITERATIONS = (int) (INIT_RETRY_TIMEOUT_MINUTES.toSeconds() / INIT_SLEEP_PERIOD_SECONDS);
127+
122128
private final KubernetesClient fabricClient;
123129
private final Pod podDefinition;
124130
// Necessary since it is not possible to retrieve the pod's actual exit code upon termination. This
@@ -152,20 +158,23 @@ public static String getPodIP(final KubernetesClient client, final String podNam
152158

153159
private static Container getInit(final boolean usesStdin,
154160
final List<VolumeMount> mainVolumeMounts,
155-
final String busyboxImage) {
156-
var initEntrypointStr = String.format("mkfifo %s && mkfifo %s", STDOUT_PIPE_FILE, STDERR_PIPE_FILE);
157-
158-
if (usesStdin) {
159-
initEntrypointStr = String.format("mkfifo %s && ", STDIN_PIPE_FILE) + initEntrypointStr;
160-
}
161+
final String busyboxImage)
162+
throws IOException {
161163

162-
initEntrypointStr = initEntrypointStr + String.format(" && until [ -f %s ]; do sleep 0.1; done;", SUCCESS_FILE_NAME);
164+
final var initCommand = MoreResources.readResource("entrypoints/sync/init.sh")
165+
.replaceAll("USES_STDIN_VALUE", String.valueOf(usesStdin))
166+
.replaceAll("STDOUT_PIPE_FILE_VALUE", STDOUT_PIPE_FILE)
167+
.replaceAll("STDERR_PIPE_FILE_VALUE", STDERR_PIPE_FILE)
168+
.replaceAll("STDIN_PIPE_FILE_VALUE", STDIN_PIPE_FILE)
169+
.replaceAll("MAX_ITERATION_VALUE", String.valueOf(INIT_RETRY_MAX_ITERATIONS))
170+
.replaceAll("SUCCESS_FILE_NAME_VALUE", SUCCESS_FILE_NAME)
171+
.replaceAll("SLEEP_PERIOD_VALUE", String.valueOf(INIT_SLEEP_PERIOD_SECONDS));
163172

164173
return new ContainerBuilder()
165174
.withName(INIT_CONTAINER_NAME)
166175
.withImage(busyboxImage)
167176
.withWorkingDir(CONFIG_DIR)
168-
.withCommand("sh", "-c", initEntrypointStr)
177+
.withCommand("sh", "-c", initCommand)
169178
.withVolumeMounts(mainVolumeMounts)
170179
.build();
171180
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
USES_STDIN=USES_STDIN_VALUE
2+
3+
mkfifo STDOUT_PIPE_FILE_VALUE
4+
mkfifo STDERR_PIPE_FILE_VALUE
5+
6+
if [ "$USES_STDIN" = true ]; then
7+
mkfifo STDIN_PIPE_FILE_VALUE
8+
fi
9+
10+
ITERATION=0
11+
MAX_ITERATION=MAX_ITERATION_VALUE
12+
DISK_USAGE=$(du -s /config | awk '{print $1;}')
13+
14+
until [ -f SUCCESS_FILE_NAME_VALUE -o $ITERATION -ge $MAX_ITERATION ]; do
15+
ITERATION=$((ITERATION+1))
16+
LAST_DISK_USAGE=$DISK_USAGE
17+
DISK_USAGE=$(du -s /config | awk '{print $1;}')
18+
if [ $DISK_USAGE -gt $LAST_DISK_USAGE ]; then
19+
ITERATION=0
20+
fi
21+
sleep SLEEP_PERIOD_VALUE
22+
done
23+
24+
if [ -f SUCCESS_FILE_NAME_VALUE ]; then
25+
echo "All files copied successfully, exiting with code 0..."
26+
exit 0
27+
else
28+
echo "Timeout while attempting to copy to init container, exiting with code 1..."
29+
exit 1
30+
fi

0 commit comments

Comments
 (0)