From a2b2b46817a911d550974f9637f67f3e13428386 Mon Sep 17 00:00:00 2001 From: Sherif Nada Date: Wed, 7 Jul 2021 17:32:28 -0700 Subject: [PATCH 1/5] fix silent failures in sources --- .../integrations/base/IntegrationRunner.java | 15 +++- .../workers/DefaultReplicationWorker.java | 5 +- .../airbyte/DefaultAirbyteDestination.java | 6 +- .../airbyte/DefaultAirbyteSource.java | 5 +- .../workers/DefaultReplicationWorkerTest.java | 78 +++++++++---------- .../DefaultAirbyteDestinationTest.java | 10 +++ .../airbyte/DefaultAirbyteSourceTest.java | 10 +++ 7 files changed, 76 insertions(+), 53 deletions(-) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java index e59d39bcc4c62..67e8422e53d2d 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java @@ -64,7 +64,10 @@ public IntegrationRunner(Source source) { } @VisibleForTesting - IntegrationRunner(IntegrationCliParser cliParser, Consumer outputRecordCollector, Destination destination, Source source) { + IntegrationRunner(IntegrationCliParser cliParser, + Consumer outputRecordCollector, + Destination destination, + Source source) { Preconditions.checkState(destination != null ^ source != null, "can only pass in a destination or a source"); this.cliParser = cliParser; this.outputRecordCollector = outputRecordCollector; @@ -97,10 +100,14 @@ public void run(String[] args) throws Exception { // todo (cgardens) - it is incongruous that that read and write return airbyte message (the // envelope) while the other commands return what goes inside it. case READ -> { + final JsonNode config = parseConfig(parsed.getConfigPath()); - final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class); - final Optional stateOptional = parsed.getStatePath().map(IntegrationRunner::parseConfig); - final AutoCloseableIterator messageIterator = source.read(config, catalog, stateOptional.orElse(null)); + final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), + ConfiguredAirbyteCatalog.class); + final Optional stateOptional = + parsed.getStatePath().map(IntegrationRunner::parseConfig); + final AutoCloseableIterator messageIterator = source.read(config, catalog, + stateOptional.orElse(null)); try (messageIterator) { messageIterator.forEachRemaining(outputRecordCollector::accept); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultReplicationWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultReplicationWorker.java index 03fe3aa81a6bb..6a30b0d5bf32f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultReplicationWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultReplicationWorker.java @@ -151,9 +151,12 @@ public ReplicationOutput run(StandardSyncInput syncInput, Path jobRoot) throws W } final ReplicationStatus outputStatus; + // First check if the process was cancelled. Cancellation takes precedence over failures. if (cancelled.get()) { outputStatus = ReplicationStatus.CANCELLED; - } else if (hasFailed.get()) { + } + // if the process was not cancelled but still failed, then it's an actual failure + else if (hasFailed.get()) { outputStatus = ReplicationStatus.FAILED; } else { outputStatus = ReplicationStatus.COMPLETED; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java index 6b3631cfdda64..59423b292b5da 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java @@ -110,7 +110,7 @@ public void notifyEndOfStream() throws IOException { } @Override - public void close() throws IOException { + public void close() throws Exception { if (destinationProcess == null) { return; } @@ -122,9 +122,7 @@ public void close() throws IOException { LOGGER.debug("Closing destination process"); WorkerUtils.gentleClose(destinationProcess, 10, TimeUnit.HOURS); if (destinationProcess.isAlive() || destinationProcess.exitValue() != 0) { - LOGGER.warn( - "Destination process might not have shut down correctly. destination process alive: {}, destination process exit value: {}. This warning is normal if the job was cancelled.", - destinationProcess.isAlive(), destinationProcess.exitValue()); + throw new WorkerException("Destination process terminated with exit code " + destinationProcess.exitValue()); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java index 4998b6bb56c34..3957ef606eaab 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java @@ -33,6 +33,7 @@ import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.workers.WorkerConstants; +import io.airbyte.workers.WorkerException; import io.airbyte.workers.WorkerUtils; import io.airbyte.workers.process.IntegrationLauncher; import java.nio.file.Path; @@ -129,9 +130,7 @@ public void close() throws Exception { FORCED_SHUTDOWN_DURATION); if (sourceProcess.isAlive() || sourceProcess.exitValue() != 0) { - LOGGER.warn( - "Source process might not have shut down correctly. source process alive: {}, source process exit value: {}. This warning is normal if the job was cancelled.", - sourceProcess.isAlive(), sourceProcess.exitValue()); + throw new WorkerException("Source process exited with exit code: " + sourceProcess.exitValue()); } } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/DefaultReplicationWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/DefaultReplicationWorkerTest.java index 1ecb849ea29d5..d84f2f4cd19b7 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/DefaultReplicationWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/DefaultReplicationWorkerTest.java @@ -217,7 +217,43 @@ void testCancellation() throws InterruptedException { @Test void testPopulatesOutputOnSuccess() throws WorkerException { - testPopulatesOutput(); + final JsonNode expectedState = Jsons.jsonNode(ImmutableMap.of("updated_at", 10L)); + when(sourceMessageTracker.getRecordCount()).thenReturn(12L); + when(sourceMessageTracker.getBytesCount()).thenReturn(100L); + when(destinationMessageTracker.getOutputState()).thenReturn(Optional.of(new State().withState(expectedState))); + + final ReplicationWorker worker = new DefaultReplicationWorker( + JOB_ID, + JOB_ATTEMPT, + source, + mapper, + destination, + sourceMessageTracker, + destinationMessageTracker); + + final ReplicationOutput actual = worker.run(syncInput, jobRoot); + final ReplicationOutput replicationOutput = new ReplicationOutput() + .withReplicationAttemptSummary(new ReplicationAttemptSummary() + .withRecordsSynced(12L) + .withBytesSynced(100L) + .withStatus(ReplicationStatus.COMPLETED)) + .withOutputCatalog(syncInput.getCatalog()) + .withState(new State().withState(expectedState)); + + // good enough to verify that times are present. + assertNotNull(actual.getReplicationAttemptSummary().getStartTime()); + assertNotNull(actual.getReplicationAttemptSummary().getEndTime()); + + // verify output object matches declared json schema spec. + final Set validate = new JsonSchemaValidator() + .validate(Jsons.jsonNode(Jsons.jsonNode(JsonSchemaValidator.getSchema(ConfigSchema.REPLICATION_OUTPUT.getFile()))), Jsons.jsonNode(actual)); + assertTrue(validate.isEmpty(), "Validation errors: " + Strings.join(validate, ",")); + + // remove times so we can do the rest of the object <> object comparison. + actual.getReplicationAttemptSummary().withStartTime(null); + actual.getReplicationAttemptSummary().withEndTime(null); + + assertEquals(replicationOutput, actual); } @Test @@ -295,44 +331,4 @@ void testDoesNotPopulateOnIrrecoverableFailure() { assertThrows(WorkerException.class, () -> worker.run(syncInput, jobRoot)); } - private void testPopulatesOutput() throws WorkerException { - final JsonNode expectedState = Jsons.jsonNode(ImmutableMap.of("updated_at", 10L)); - when(sourceMessageTracker.getRecordCount()).thenReturn(12L); - when(sourceMessageTracker.getBytesCount()).thenReturn(100L); - when(destinationMessageTracker.getOutputState()).thenReturn(Optional.of(new State().withState(expectedState))); - - final ReplicationWorker worker = new DefaultReplicationWorker( - JOB_ID, - JOB_ATTEMPT, - source, - mapper, - destination, - sourceMessageTracker, - destinationMessageTracker); - - final ReplicationOutput actual = worker.run(syncInput, jobRoot); - final ReplicationOutput replicationOutput = new ReplicationOutput() - .withReplicationAttemptSummary(new ReplicationAttemptSummary() - .withRecordsSynced(12L) - .withBytesSynced(100L) - .withStatus(ReplicationStatus.COMPLETED)) - .withOutputCatalog(syncInput.getCatalog()) - .withState(new State().withState(expectedState)); - - // good enough to verify that times are present. - assertNotNull(actual.getReplicationAttemptSummary().getStartTime()); - assertNotNull(actual.getReplicationAttemptSummary().getEndTime()); - - // verify output object matches declared json schema spec. - final Set validate = new JsonSchemaValidator() - .validate(Jsons.jsonNode(Jsons.jsonNode(JsonSchemaValidator.getSchema(ConfigSchema.REPLICATION_OUTPUT.getFile()))), Jsons.jsonNode(actual)); - assertTrue(validate.isEmpty(), "Validation errors: " + Strings.join(validate, ",")); - - // remove times so we can do the rest of the object <> object comparison. - actual.getReplicationAttemptSummary().withStartTime(null); - actual.getReplicationAttemptSummary().withEndTime(null); - - assertEquals(replicationOutput, actual); - } - } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestinationTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestinationTest.java index 65e8bf1341ab6..72c793b9bef39 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestinationTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestinationTest.java @@ -154,4 +154,14 @@ public void testCloseNotifiesLifecycle() throws Exception { verify(outputStream).close(); } + @Test + public void testNonzeroExitCodeThrowsException() throws Exception { + final AirbyteDestination destination = new DefaultAirbyteDestination(integrationLauncher); + destination.start(DESTINATION_CONFIG, jobRoot); + + when(process.isAlive()).thenReturn(false); + when(process.exitValue()).thenReturn(1); + Assertions.assertThrows(WorkerException.class, destination::close); + } + } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSourceTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSourceTest.java index f69dd70a6ea50..f7890e77b2a98 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSourceTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSourceTest.java @@ -147,4 +147,14 @@ public void testSuccessfulLifecycle() throws Exception { verify(process).exitValue(); } + @Test + public void testNonzeroExitCodeThrows() throws Exception { + final AirbyteSource tap = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor); + tap.start(SOURCE_CONFIG, jobRoot); + + when(process.exitValue()).thenReturn(1); + + Assertions.assertThrows(WorkerException.class, tap::close); + } + } From 3c0207812abd41fa5f8d063053277ff93e213a0d Mon Sep 17 00:00:00 2001 From: Sherif Nada Date: Wed, 7 Jul 2021 17:36:11 -0700 Subject: [PATCH 2/5] save --- .../integrations/base/IntegrationRunner.java | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java index 67e8422e53d2d..f297f63856727 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java @@ -33,10 +33,12 @@ import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; + import java.nio.file.Path; import java.util.Optional; import java.util.Scanner; import java.util.function.Consumer; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,11 +65,10 @@ public IntegrationRunner(Source source) { this(new IntegrationCliParser(), Destination::defaultOutputRecordCollector, null, source); } - @VisibleForTesting - IntegrationRunner(IntegrationCliParser cliParser, - Consumer outputRecordCollector, - Destination destination, - Source source) { + @VisibleForTesting IntegrationRunner(IntegrationCliParser cliParser, + Consumer outputRecordCollector, + Destination destination, + Source source) { Preconditions.checkState(destination != null ^ source != null, "can only pass in a destination or a source"); this.cliParser = cliParser; this.outputRecordCollector = outputRecordCollector; @@ -102,12 +103,9 @@ public void run(String[] args) throws Exception { case READ -> { final JsonNode config = parseConfig(parsed.getConfigPath()); - final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), - ConfiguredAirbyteCatalog.class); - final Optional stateOptional = - parsed.getStatePath().map(IntegrationRunner::parseConfig); - final AutoCloseableIterator messageIterator = source.read(config, catalog, - stateOptional.orElse(null)); + final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class); + final Optional stateOptional = parsed.getStatePath().map(IntegrationRunner::parseConfig); + final AutoCloseableIterator messageIterator = source.read(config, catalog, stateOptional.orElse(null)); try (messageIterator) { messageIterator.forEachRemaining(outputRecordCollector::accept); } From 685b49be8d081ad6c0248e0d0bbe90658dee25e9 Mon Sep 17 00:00:00 2001 From: Sherif Nada Date: Wed, 7 Jul 2021 17:44:08 -0700 Subject: [PATCH 3/5] fmt --- .../airbyte/integrations/base/IntegrationRunner.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java index f297f63856727..4e9ad5ff9ec51 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java @@ -33,12 +33,10 @@ import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; - import java.nio.file.Path; import java.util.Optional; import java.util.Scanner; import java.util.function.Consumer; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,10 +63,11 @@ public IntegrationRunner(Source source) { this(new IntegrationCliParser(), Destination::defaultOutputRecordCollector, null, source); } - @VisibleForTesting IntegrationRunner(IntegrationCliParser cliParser, - Consumer outputRecordCollector, - Destination destination, - Source source) { + @VisibleForTesting + IntegrationRunner(IntegrationCliParser cliParser, + Consumer outputRecordCollector, + Destination destination, + Source source) { Preconditions.checkState(destination != null ^ source != null, "can only pass in a destination or a source"); this.cliParser = cliParser; this.outputRecordCollector = outputRecordCollector; From e394e998dfa2362616202123f2979567218d1743 Mon Sep 17 00:00:00 2001 From: Sherif Nada Date: Wed, 7 Jul 2021 22:37:10 -0700 Subject: [PATCH 4/5] reword exception --- .../protocols/airbyte/DefaultAirbyteDestination.java | 3 ++- .../protocols/airbyte/DefaultAirbyteSource.java | 12 +++++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java index 59423b292b5da..b3498d81736b6 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java @@ -122,7 +122,8 @@ public void close() throws Exception { LOGGER.debug("Closing destination process"); WorkerUtils.gentleClose(destinationProcess, 10, TimeUnit.HOURS); if (destinationProcess.isAlive() || destinationProcess.exitValue() != 0) { - throw new WorkerException("Destination process terminated with exit code " + destinationProcess.exitValue()); + String message = destinationProcess.isAlive() ? "Destination has not terminated " : "Destination process exit with code " + destinationProcess.exitValue(); + throw new WorkerException(message + ". This warning is normal if the job was cancelled."); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java index 3957ef606eaab..80a76ec5b37ec 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java @@ -36,11 +36,13 @@ import io.airbyte.workers.WorkerException; import io.airbyte.workers.WorkerUtils; import io.airbyte.workers.process.IntegrationLauncher; + import java.nio.file.Path; import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.Iterator; import java.util.Optional; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,10 +68,9 @@ public DefaultAirbyteSource(final IntegrationLauncher integrationLauncher) { this(integrationLauncher, new DefaultAirbyteStreamFactory(), new HeartbeatMonitor(HEARTBEAT_FRESH_DURATION)); } - @VisibleForTesting - DefaultAirbyteSource(final IntegrationLauncher integrationLauncher, - final AirbyteStreamFactory streamFactory, - final HeartbeatMonitor heartbeatMonitor) { + @VisibleForTesting DefaultAirbyteSource(final IntegrationLauncher integrationLauncher, + final AirbyteStreamFactory streamFactory, + final HeartbeatMonitor heartbeatMonitor) { this.integrationLauncher = integrationLauncher; this.streamFactory = streamFactory; this.heartbeatMonitor = heartbeatMonitor; @@ -130,7 +131,8 @@ public void close() throws Exception { FORCED_SHUTDOWN_DURATION); if (sourceProcess.isAlive() || sourceProcess.exitValue() != 0) { - throw new WorkerException("Source process exited with exit code: " + sourceProcess.exitValue()); + String message = sourceProcess.isAlive() ? "Source has not terminated " : "Source process exit with code " + sourceProcess.exitValue(); + throw new WorkerException(message + ". This warning is normal if the job was cancelled."); } } From b02e0587a8c8c315b1cc31f509cf9ae0bbd9a3d8 Mon Sep 17 00:00:00 2001 From: Sherif Nada Date: Wed, 7 Jul 2021 22:39:04 -0700 Subject: [PATCH 5/5] fmt --- .../protocols/airbyte/DefaultAirbyteDestination.java | 3 ++- .../workers/protocols/airbyte/DefaultAirbyteSource.java | 9 ++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java index b3498d81736b6..195f8ec3bf268 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java @@ -122,7 +122,8 @@ public void close() throws Exception { LOGGER.debug("Closing destination process"); WorkerUtils.gentleClose(destinationProcess, 10, TimeUnit.HOURS); if (destinationProcess.isAlive() || destinationProcess.exitValue() != 0) { - String message = destinationProcess.isAlive() ? "Destination has not terminated " : "Destination process exit with code " + destinationProcess.exitValue(); + String message = + destinationProcess.isAlive() ? "Destination has not terminated " : "Destination process exit with code " + destinationProcess.exitValue(); throw new WorkerException(message + ". This warning is normal if the job was cancelled."); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java index 80a76ec5b37ec..cc6a0fb919e78 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java @@ -36,13 +36,11 @@ import io.airbyte.workers.WorkerException; import io.airbyte.workers.WorkerUtils; import io.airbyte.workers.process.IntegrationLauncher; - import java.nio.file.Path; import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.Iterator; import java.util.Optional; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,9 +66,10 @@ public DefaultAirbyteSource(final IntegrationLauncher integrationLauncher) { this(integrationLauncher, new DefaultAirbyteStreamFactory(), new HeartbeatMonitor(HEARTBEAT_FRESH_DURATION)); } - @VisibleForTesting DefaultAirbyteSource(final IntegrationLauncher integrationLauncher, - final AirbyteStreamFactory streamFactory, - final HeartbeatMonitor heartbeatMonitor) { + @VisibleForTesting + DefaultAirbyteSource(final IntegrationLauncher integrationLauncher, + final AirbyteStreamFactory streamFactory, + final HeartbeatMonitor heartbeatMonitor) { this.integrationLauncher = integrationLauncher; this.streamFactory = streamFactory; this.heartbeatMonitor = heartbeatMonitor;