Skip to content

Commit aca70d0

Browse files
authored
🐛 platform: Fix silent failures in sources (#4617)
1 parent 9b20c1a commit aca70d0

File tree

7 files changed

+73
-50
lines changed

7 files changed

+73
-50
lines changed

airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,10 @@ public IntegrationRunner(Source source) {
6464
}
6565

6666
@VisibleForTesting
67-
IntegrationRunner(IntegrationCliParser cliParser, Consumer<AirbyteMessage> outputRecordCollector, Destination destination, Source source) {
67+
IntegrationRunner(IntegrationCliParser cliParser,
68+
Consumer<AirbyteMessage> outputRecordCollector,
69+
Destination destination,
70+
Source source) {
6871
Preconditions.checkState(destination != null ^ source != null, "can only pass in a destination or a source");
6972
this.cliParser = cliParser;
7073
this.outputRecordCollector = outputRecordCollector;
@@ -97,6 +100,7 @@ public void run(String[] args) throws Exception {
97100
// todo (cgardens) - it is incongruous that that read and write return airbyte message (the
98101
// envelope) while the other commands return what goes inside it.
99102
case READ -> {
103+
100104
final JsonNode config = parseConfig(parsed.getConfigPath());
101105
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);
102106
final Optional<JsonNode> stateOptional = parsed.getStatePath().map(IntegrationRunner::parseConfig);

airbyte-workers/src/main/java/io/airbyte/workers/DefaultReplicationWorker.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,9 +151,12 @@ public ReplicationOutput run(StandardSyncInput syncInput, Path jobRoot) throws W
151151
}
152152

153153
final ReplicationStatus outputStatus;
154+
// First check if the process was cancelled. Cancellation takes precedence over failures.
154155
if (cancelled.get()) {
155156
outputStatus = ReplicationStatus.CANCELLED;
156-
} else if (hasFailed.get()) {
157+
}
158+
// if the process was not cancelled but still failed, then it's an actual failure
159+
else if (hasFailed.get()) {
157160
outputStatus = ReplicationStatus.FAILED;
158161
} else {
159162
outputStatus = ReplicationStatus.COMPLETED;

airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public void notifyEndOfStream() throws IOException {
110110
}
111111

112112
@Override
113-
public void close() throws IOException {
113+
public void close() throws Exception {
114114
if (destinationProcess == null) {
115115
return;
116116
}
@@ -122,9 +122,9 @@ public void close() throws IOException {
122122
LOGGER.debug("Closing destination process");
123123
WorkerUtils.gentleClose(destinationProcess, 10, TimeUnit.HOURS);
124124
if (destinationProcess.isAlive() || destinationProcess.exitValue() != 0) {
125-
LOGGER.warn(
126-
"Destination process might not have shut down correctly. destination process alive: {}, destination process exit value: {}. This warning is normal if the job was cancelled.",
127-
destinationProcess.isAlive(), destinationProcess.exitValue());
125+
String message =
126+
destinationProcess.isAlive() ? "Destination has not terminated " : "Destination process exit with code " + destinationProcess.exitValue();
127+
throw new WorkerException(message + ". This warning is normal if the job was cancelled.");
128128
}
129129
}
130130

airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import io.airbyte.protocol.models.AirbyteMessage;
3434
import io.airbyte.protocol.models.AirbyteMessage.Type;
3535
import io.airbyte.workers.WorkerConstants;
36+
import io.airbyte.workers.WorkerException;
3637
import io.airbyte.workers.WorkerUtils;
3738
import io.airbyte.workers.process.IntegrationLauncher;
3839
import java.nio.file.Path;
@@ -129,9 +130,8 @@ public void close() throws Exception {
129130
FORCED_SHUTDOWN_DURATION);
130131

131132
if (sourceProcess.isAlive() || sourceProcess.exitValue() != 0) {
132-
LOGGER.warn(
133-
"Source process might not have shut down correctly. source process alive: {}, source process exit value: {}. This warning is normal if the job was cancelled.",
134-
sourceProcess.isAlive(), sourceProcess.exitValue());
133+
String message = sourceProcess.isAlive() ? "Source has not terminated " : "Source process exit with code " + sourceProcess.exitValue();
134+
throw new WorkerException(message + ". This warning is normal if the job was cancelled.");
135135
}
136136
}
137137

airbyte-workers/src/test/java/io/airbyte/workers/DefaultReplicationWorkerTest.java

Lines changed: 37 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,43 @@ void testCancellation() throws InterruptedException {
217217

218218
@Test
219219
void testPopulatesOutputOnSuccess() throws WorkerException {
220-
testPopulatesOutput();
220+
final JsonNode expectedState = Jsons.jsonNode(ImmutableMap.of("updated_at", 10L));
221+
when(sourceMessageTracker.getRecordCount()).thenReturn(12L);
222+
when(sourceMessageTracker.getBytesCount()).thenReturn(100L);
223+
when(destinationMessageTracker.getOutputState()).thenReturn(Optional.of(new State().withState(expectedState)));
224+
225+
final ReplicationWorker worker = new DefaultReplicationWorker(
226+
JOB_ID,
227+
JOB_ATTEMPT,
228+
source,
229+
mapper,
230+
destination,
231+
sourceMessageTracker,
232+
destinationMessageTracker);
233+
234+
final ReplicationOutput actual = worker.run(syncInput, jobRoot);
235+
final ReplicationOutput replicationOutput = new ReplicationOutput()
236+
.withReplicationAttemptSummary(new ReplicationAttemptSummary()
237+
.withRecordsSynced(12L)
238+
.withBytesSynced(100L)
239+
.withStatus(ReplicationStatus.COMPLETED))
240+
.withOutputCatalog(syncInput.getCatalog())
241+
.withState(new State().withState(expectedState));
242+
243+
// good enough to verify that times are present.
244+
assertNotNull(actual.getReplicationAttemptSummary().getStartTime());
245+
assertNotNull(actual.getReplicationAttemptSummary().getEndTime());
246+
247+
// verify output object matches declared json schema spec.
248+
final Set<String> validate = new JsonSchemaValidator()
249+
.validate(Jsons.jsonNode(Jsons.jsonNode(JsonSchemaValidator.getSchema(ConfigSchema.REPLICATION_OUTPUT.getFile()))), Jsons.jsonNode(actual));
250+
assertTrue(validate.isEmpty(), "Validation errors: " + Strings.join(validate, ","));
251+
252+
// remove times so we can do the rest of the object <> object comparison.
253+
actual.getReplicationAttemptSummary().withStartTime(null);
254+
actual.getReplicationAttemptSummary().withEndTime(null);
255+
256+
assertEquals(replicationOutput, actual);
221257
}
222258

223259
@Test
@@ -295,44 +331,4 @@ void testDoesNotPopulateOnIrrecoverableFailure() {
295331
assertThrows(WorkerException.class, () -> worker.run(syncInput, jobRoot));
296332
}
297333

298-
private void testPopulatesOutput() throws WorkerException {
299-
final JsonNode expectedState = Jsons.jsonNode(ImmutableMap.of("updated_at", 10L));
300-
when(sourceMessageTracker.getRecordCount()).thenReturn(12L);
301-
when(sourceMessageTracker.getBytesCount()).thenReturn(100L);
302-
when(destinationMessageTracker.getOutputState()).thenReturn(Optional.of(new State().withState(expectedState)));
303-
304-
final ReplicationWorker worker = new DefaultReplicationWorker(
305-
JOB_ID,
306-
JOB_ATTEMPT,
307-
source,
308-
mapper,
309-
destination,
310-
sourceMessageTracker,
311-
destinationMessageTracker);
312-
313-
final ReplicationOutput actual = worker.run(syncInput, jobRoot);
314-
final ReplicationOutput replicationOutput = new ReplicationOutput()
315-
.withReplicationAttemptSummary(new ReplicationAttemptSummary()
316-
.withRecordsSynced(12L)
317-
.withBytesSynced(100L)
318-
.withStatus(ReplicationStatus.COMPLETED))
319-
.withOutputCatalog(syncInput.getCatalog())
320-
.withState(new State().withState(expectedState));
321-
322-
// good enough to verify that times are present.
323-
assertNotNull(actual.getReplicationAttemptSummary().getStartTime());
324-
assertNotNull(actual.getReplicationAttemptSummary().getEndTime());
325-
326-
// verify output object matches declared json schema spec.
327-
final Set<String> validate = new JsonSchemaValidator()
328-
.validate(Jsons.jsonNode(Jsons.jsonNode(JsonSchemaValidator.getSchema(ConfigSchema.REPLICATION_OUTPUT.getFile()))), Jsons.jsonNode(actual));
329-
assertTrue(validate.isEmpty(), "Validation errors: " + Strings.join(validate, ","));
330-
331-
// remove times so we can do the rest of the object <> object comparison.
332-
actual.getReplicationAttemptSummary().withStartTime(null);
333-
actual.getReplicationAttemptSummary().withEndTime(null);
334-
335-
assertEquals(replicationOutput, actual);
336-
}
337-
338334
}

airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestinationTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,4 +154,14 @@ public void testCloseNotifiesLifecycle() throws Exception {
154154
verify(outputStream).close();
155155
}
156156

157+
@Test
158+
public void testNonzeroExitCodeThrowsException() throws Exception {
159+
final AirbyteDestination destination = new DefaultAirbyteDestination(integrationLauncher);
160+
destination.start(DESTINATION_CONFIG, jobRoot);
161+
162+
when(process.isAlive()).thenReturn(false);
163+
when(process.exitValue()).thenReturn(1);
164+
Assertions.assertThrows(WorkerException.class, destination::close);
165+
}
166+
157167
}

airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSourceTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,4 +147,14 @@ public void testSuccessfulLifecycle() throws Exception {
147147
verify(process).exitValue();
148148
}
149149

150+
@Test
151+
public void testNonzeroExitCodeThrows() throws Exception {
152+
final AirbyteSource tap = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor);
153+
tap.start(SOURCE_CONFIG, jobRoot);
154+
155+
when(process.exitValue()).thenReturn(1);
156+
157+
Assertions.assertThrows(WorkerException.class, tap::close);
158+
}
159+
150160
}

0 commit comments

Comments
 (0)