Skip to content

Commit e7d7c77

Browse files
authored
add keepalive for TCP socket in KubePodProcess (#10528)
* add keepalive * afsdlk
1 parent 9c9dfb2 commit e7d7c77

File tree

3 files changed

+10
-2
lines changed

3 files changed

+10
-2
lines changed

airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,13 @@ private void setupStdOutAndStdErrListeners() {
520520
try {
521521
LOGGER.info("Creating stdout socket server...");
522522
final var socket = stdoutServerSocket.accept(); // blocks until connected
523+
// cat /proc/sys/net/ipv4/tcp_keepalive_time
524+
// 300
525+
// cat /proc/sys/net/ipv4/tcp_keepalive_probes
526+
// 5
527+
// cat /proc/sys/net/ipv4/tcp_keepalive_intvl
528+
// 60
529+
socket.setKeepAlive(true);
523530
LOGGER.info("Setting stdout...");
524531
this.stdout = socket.getInputStream();
525532
} catch (final IOException e) {
@@ -531,6 +538,7 @@ private void setupStdOutAndStdErrListeners() {
531538
try {
532539
LOGGER.info("Creating stderr socket server...");
533540
final var socket = stderrServerSocket.accept(); // blocks until connected
541+
socket.setKeepAlive(true);
534542
LOGGER.info("Setting stderr...");
535543
this.stderr = socket.getInputStream();
536544
} catch (final IOException e) {

airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ public boolean isFinished() {
138138
Preconditions.checkState(destinationProcess != null);
139139
// As this check is done on every message read, it is important for this operation to be efficient.
140140
// Short circuit early to avoid checking the underlying process.
141-
final var isEmpty = !messageIterator.hasNext();
141+
final var isEmpty = !messageIterator.hasNext(); // hasNext is blocking.
142142
if (!isEmpty) {
143143
return false;
144144
}

airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public boolean isFinished() {
9090
Preconditions.checkState(sourceProcess != null);
9191
// As this check is done on every message read, it is important for this operation to be efficient.
9292
// Short circuit early to avoid checking the underlying process.
93-
final var isEmpty = !messageIterator.hasNext();
93+
final var isEmpty = !messageIterator.hasNext(); // hasNext is blocking.
9494
if (!isEmpty) {
9595
return false;
9696
}

0 commit comments

Comments
 (0)