Skip to content

Commit 198e580

Browse files
authored
Build failure reasons for synchronous jobs (check/spec/discover) (#14715)
* demo for surfacing synchronous job failures * add missing changes for StandardDiscoverCatalogOutput impl * extract trace message failure reason for discover job * move to using a single pojo to represent synchronous job outputs * format * handle new output type in check before sync * re-genericize DefaultSynchronousSchedulerClient.execute * fix failing tests * fix failing scheduler client tests * get spec returns failure reason from trace message * build failure reason from trace message for check job * type safety * only consider error-type trace messages * add more tests * just use nulls * this was removed but incorrectly re-added when merging master into the branch * check output version for workflow replay support * refactor trace message finding to util method * additionalProperties: true * add versioning for CheckConnectionWorkflow * update comment
1 parent 36c659d commit 198e580

35 files changed

+826
-161
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
---
2+
"$schema": http://json-schema.org/draft-07/schema#
3+
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/ConnectorJobOutput.yaml
4+
title: ConnectorJobOutput
5+
description: connector command job output
6+
type: object
7+
additionalProperties: true
8+
required:
9+
- outputType
10+
properties:
11+
outputType:
12+
type: string
13+
enum:
14+
- checkConnection
15+
- discoverCatalog
16+
- spec
17+
checkConnection:
18+
"$ref": StandardCheckConnectionOutput.yaml
19+
discoverCatalog:
20+
existingJavaType: io.airbyte.protocol.models.AirbyteCatalog
21+
spec:
22+
existingJavaType: io.airbyte.protocol.models.ConnectorSpecification
23+
failureReason:
24+
"$ref": FailureReason.yaml

airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/DefaultSynchronousSchedulerClient.java

+23-10
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import com.fasterxml.jackson.databind.JsonNode;
88
import com.google.common.annotations.VisibleForTesting;
9+
import io.airbyte.config.ConnectorJobOutput;
910
import io.airbyte.config.DestinationConnection;
1011
import io.airbyte.config.JobCheckConnectionConfig;
1112
import io.airbyte.config.JobConfig.ConfigType;
@@ -22,6 +23,7 @@
2223
import io.airbyte.workers.temporal.TemporalResponse;
2324
import java.io.IOException;
2425
import java.time.Instant;
26+
import java.util.Optional;
2527
import java.util.UUID;
2628
import java.util.function.Function;
2729
import javax.annotation.Nullable;
@@ -55,6 +57,7 @@ public SynchronousResponse<StandardCheckConnectionOutput> createSourceCheckConne
5557
ConfigType.CHECK_CONNECTION_SOURCE,
5658
source.getSourceDefinitionId(),
5759
jobId -> temporalClient.submitCheckConnection(UUID.randomUUID(), 0, jobCheckConnectionConfig),
60+
ConnectorJobOutput::getCheckConnection,
5861
source.getWorkspaceId());
5962
}
6063

@@ -74,11 +77,13 @@ public SynchronousResponse<StandardCheckConnectionOutput> createDestinationCheck
7477
ConfigType.CHECK_CONNECTION_DESTINATION,
7578
destination.getDestinationDefinitionId(),
7679
jobId -> temporalClient.submitCheckConnection(UUID.randomUUID(), 0, jobCheckConnectionConfig),
80+
ConnectorJobOutput::getCheckConnection,
7781
destination.getWorkspaceId());
7882
}
7983

8084
@Override
81-
public SynchronousResponse<AirbyteCatalog> createDiscoverSchemaJob(final SourceConnection source, final String dockerImage) throws IOException {
85+
public SynchronousResponse<AirbyteCatalog> createDiscoverSchemaJob(final SourceConnection source, final String dockerImage)
86+
throws IOException {
8287
final JsonNode sourceConfiguration = oAuthConfigSupplier.injectSourceOAuthParameters(
8388
source.getSourceDefinitionId(),
8489
source.getWorkspaceId(),
@@ -91,6 +96,7 @@ public SynchronousResponse<AirbyteCatalog> createDiscoverSchemaJob(final SourceC
9196
ConfigType.DISCOVER_SCHEMA,
9297
source.getSourceDefinitionId(),
9398
jobId -> temporalClient.submitDiscoverSchema(UUID.randomUUID(), 0, jobDiscoverCatalogConfig),
99+
ConnectorJobOutput::getDiscoverCatalog,
94100
source.getWorkspaceId());
95101
}
96102

@@ -102,25 +108,32 @@ public SynchronousResponse<ConnectorSpecification> createGetSpecJob(final String
102108
ConfigType.GET_SPEC,
103109
null,
104110
jobId -> temporalClient.submitGetSpec(UUID.randomUUID(), 0, jobSpecConfig),
111+
ConnectorJobOutput::getSpec,
105112
null);
106113
}
107114

108115
@VisibleForTesting
109-
<T> SynchronousResponse<T> execute(final ConfigType configType,
110-
@Nullable final UUID connectorDefinitionId,
111-
final Function<UUID, TemporalResponse<T>> executor,
112-
final UUID workspaceId) {
116+
<T, U> SynchronousResponse<T> execute(final ConfigType configType,
117+
@Nullable final UUID connectorDefinitionId,
118+
final Function<UUID, TemporalResponse<U>> executor,
119+
final Function<U, T> outputMapper,
120+
final UUID workspaceId) {
113121
final long createdAt = Instant.now().toEpochMilli();
114122
final UUID jobId = UUID.randomUUID();
115123
try {
116124
track(jobId, configType, connectorDefinitionId, workspaceId, JobState.STARTED, null);
117-
final TemporalResponse<T> operationOutput = executor.apply(jobId);
118-
final JobState outputState = operationOutput.getMetadata().isSucceeded() ? JobState.SUCCEEDED : JobState.FAILED;
119-
track(jobId, configType, connectorDefinitionId, workspaceId, outputState, operationOutput.getOutput().orElse(null));
120-
final long endedAt = Instant.now().toEpochMilli();
125+
final TemporalResponse<U> temporalResponse = executor.apply(jobId);
126+
final Optional<U> jobOutput = temporalResponse.getOutput();
127+
final T mappedOutput = jobOutput.map(outputMapper).orElse(null);
128+
final JobState outputState = temporalResponse.getMetadata().isSucceeded() ? JobState.SUCCEEDED : JobState.FAILED;
121129

130+
track(jobId, configType, connectorDefinitionId, workspaceId, outputState, mappedOutput);
131+
// TODO(pedro): report ConnectorJobOutput's failureReason to the JobErrorReporter, like the above
132+
133+
final long endedAt = Instant.now().toEpochMilli();
122134
return SynchronousResponse.fromTemporalResponse(
123-
operationOutput,
135+
temporalResponse,
136+
mappedOutput,
124137
jobId,
125138
configType,
126139
connectorDefinitionId,

airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/SynchronousResponse.java

+8-7
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,13 @@ public static <T> SynchronousResponse<T> success(final T output, final Synchrono
2222
return new SynchronousResponse<>(output, metadata);
2323
}
2424

25-
public static <T> SynchronousResponse<T> fromTemporalResponse(final TemporalResponse<T> temporalResponse,
26-
final UUID id,
27-
final ConfigType configType,
28-
final UUID configId,
29-
final long createdAt,
30-
final long endedAt) {
25+
public static <T, U> SynchronousResponse<T> fromTemporalResponse(final TemporalResponse<U> temporalResponse,
26+
final T output,
27+
final UUID id,
28+
final ConfigType configType,
29+
final UUID configId,
30+
final long createdAt,
31+
final long endedAt) {
3132

3233
final SynchronousJobMetadata metadata = SynchronousJobMetadata.fromJobMetadata(
3334
temporalResponse.getMetadata(),
@@ -36,7 +37,7 @@ public static <T> SynchronousResponse<T> fromTemporalResponse(final TemporalResp
3637
configId,
3738
createdAt,
3839
endedAt);
39-
return new SynchronousResponse<>(temporalResponse.getOutput().orElse(null), metadata);
40+
return new SynchronousResponse<>(output, metadata);
4041
}
4142

4243
public SynchronousResponse(final T output, final SynchronousJobMetadata metadata) {

airbyte-scheduler/client/src/test/java/io/airbyte/scheduler/client/DefaultSynchronousSchedulerClientTest.java

+35-7
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.fasterxml.jackson.databind.JsonNode;
2020
import com.google.common.collect.ImmutableMap;
2121
import io.airbyte.commons.json.Jsons;
22+
import io.airbyte.config.ConnectorJobOutput;
2223
import io.airbyte.config.DestinationConnection;
2324
import io.airbyte.config.JobCheckConnectionConfig;
2425
import io.airbyte.config.JobConfig.ConfigType;
@@ -97,10 +98,11 @@ class ExecuteSynchronousJob {
9798
void testExecuteJobSuccess() {
9899
final UUID sourceDefinitionId = UUID.randomUUID();
99100
final Function<UUID, TemporalResponse<String>> function = mock(Function.class);
101+
final Function<String, String> mapperFunction = output -> output;
100102
when(function.apply(any(UUID.class))).thenReturn(new TemporalResponse<>("hello", createMetadata(true)));
101103

102104
final SynchronousResponse<String> response = schedulerClient
103-
.execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, WORKSPACE_ID);
105+
.execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID);
104106

105107
assertNotNull(response);
106108
assertEquals("hello", response.getOutput());
@@ -114,15 +116,36 @@ void testExecuteJobSuccess() {
114116
verify(jobTracker).trackDiscover(any(UUID.class), eq(sourceDefinitionId), eq(WORKSPACE_ID), eq(JobState.SUCCEEDED));
115117
}
116118

119+
@SuppressWarnings("unchecked")
120+
@Test
121+
void testExecuteMappedOutput() {
122+
final UUID sourceDefinitionId = UUID.randomUUID();
123+
final Function<UUID, TemporalResponse<Integer>> function = mock(Function.class);
124+
final Function<Integer, String> mapperFunction = Object::toString;
125+
when(function.apply(any(UUID.class))).thenReturn(new TemporalResponse<>(42, createMetadata(true)));
126+
127+
final SynchronousResponse<String> response = schedulerClient
128+
.execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID);
129+
130+
assertNotNull(response);
131+
assertEquals("42", response.getOutput());
132+
assertEquals(ConfigType.DISCOVER_SCHEMA, response.getMetadata().getConfigType());
133+
assertTrue(response.getMetadata().getConfigId().isPresent());
134+
assertEquals(sourceDefinitionId, response.getMetadata().getConfigId().get());
135+
assertTrue(response.getMetadata().isSucceeded());
136+
assertEquals(LOG_PATH, response.getMetadata().getLogPath());
137+
}
138+
117139
@SuppressWarnings("unchecked")
118140
@Test
119141
void testExecuteJobFailure() {
120142
final UUID sourceDefinitionId = UUID.randomUUID();
121143
final Function<UUID, TemporalResponse<String>> function = mock(Function.class);
144+
final Function<String, String> mapperFunction = output -> output;
122145
when(function.apply(any(UUID.class))).thenReturn(new TemporalResponse<>(null, createMetadata(false)));
123146

124147
final SynchronousResponse<String> response = schedulerClient
125-
.execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, WORKSPACE_ID);
148+
.execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID);
126149

127150
assertNotNull(response);
128151
assertNull(response.getOutput());
@@ -141,11 +164,12 @@ void testExecuteJobFailure() {
141164
void testExecuteRuntimeException() {
142165
final UUID sourceDefinitionId = UUID.randomUUID();
143166
final Function<UUID, TemporalResponse<String>> function = mock(Function.class);
167+
final Function<String, String> mapperFunction = output -> output;
144168
when(function.apply(any(UUID.class))).thenThrow(new RuntimeException());
145169

146170
assertThrows(
147171
RuntimeException.class,
148-
() -> schedulerClient.execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, WORKSPACE_ID));
172+
() -> schedulerClient.execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID));
149173

150174
verify(jobTracker).trackDiscover(any(UUID.class), eq(sourceDefinitionId), eq(WORKSPACE_ID), eq(JobState.STARTED));
151175
verify(jobTracker).trackDiscover(any(UUID.class), eq(sourceDefinitionId), eq(WORKSPACE_ID), eq(JobState.FAILED));
@@ -164,8 +188,9 @@ void testCreateSourceCheckConnectionJob() throws IOException {
164188
.withDockerImage(DOCKER_IMAGE);
165189

166190
final StandardCheckConnectionOutput mockOutput = mock(StandardCheckConnectionOutput.class);
191+
final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withCheckConnection(mockOutput);
167192
when(temporalClient.submitCheckConnection(any(UUID.class), eq(0), eq(jobCheckConnectionConfig)))
168-
.thenReturn(new TemporalResponse<>(mockOutput, createMetadata(true)));
193+
.thenReturn(new TemporalResponse<>(jobOutput, createMetadata(true)));
169194
final SynchronousResponse<StandardCheckConnectionOutput> response =
170195
schedulerClient.createSourceCheckConnectionJob(SOURCE_CONNECTION, DOCKER_IMAGE);
171196
assertEquals(mockOutput, response.getOutput());
@@ -178,8 +203,9 @@ void testCreateDestinationCheckConnectionJob() throws IOException {
178203
.withDockerImage(DOCKER_IMAGE);
179204

180205
final StandardCheckConnectionOutput mockOutput = mock(StandardCheckConnectionOutput.class);
206+
final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withCheckConnection(mockOutput);
181207
when(temporalClient.submitCheckConnection(any(UUID.class), eq(0), eq(jobCheckConnectionConfig)))
182-
.thenReturn(new TemporalResponse<>(mockOutput, createMetadata(true)));
208+
.thenReturn(new TemporalResponse<>(jobOutput, createMetadata(true)));
183209
final SynchronousResponse<StandardCheckConnectionOutput> response =
184210
schedulerClient.createDestinationCheckConnectionJob(DESTINATION_CONNECTION, DOCKER_IMAGE);
185211
assertEquals(mockOutput, response.getOutput());
@@ -192,8 +218,9 @@ void testCreateDiscoverSchemaJob() throws IOException {
192218
.withDockerImage(DOCKER_IMAGE);
193219

194220
final AirbyteCatalog mockOutput = mock(AirbyteCatalog.class);
221+
final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withDiscoverCatalog(mockOutput);
195222
when(temporalClient.submitDiscoverSchema(any(UUID.class), eq(0), eq(jobDiscoverCatalogConfig)))
196-
.thenReturn(new TemporalResponse<>(mockOutput, createMetadata(true)));
223+
.thenReturn(new TemporalResponse<>(jobOutput, createMetadata(true)));
197224
final SynchronousResponse<AirbyteCatalog> response = schedulerClient.createDiscoverSchemaJob(SOURCE_CONNECTION, DOCKER_IMAGE);
198225
assertEquals(mockOutput, response.getOutput());
199226
}
@@ -203,8 +230,9 @@ void testCreateGetSpecJob() throws IOException {
203230
final JobGetSpecConfig jobSpecConfig = new JobGetSpecConfig().withDockerImage(DOCKER_IMAGE);
204231

205232
final ConnectorSpecification mockOutput = mock(ConnectorSpecification.class);
233+
final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withSpec(mockOutput);
206234
when(temporalClient.submitGetSpec(any(UUID.class), eq(0), eq(jobSpecConfig)))
207-
.thenReturn(new TemporalResponse<>(mockOutput, createMetadata(true)));
235+
.thenReturn(new TemporalResponse<>(jobOutput, createMetadata(true)));
208236
final SynchronousResponse<ConnectorSpecification> response = schedulerClient.createGetSpecJob(DOCKER_IMAGE);
209237
assertEquals(mockOutput, response.getOutput());
210238
}

airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,8 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final Source
221221
final SynchronousResponse<AirbyteCatalog> response = synchronousSchedulerClient.createDiscoverSchemaJob(source, imageName);
222222
final SourceDiscoverSchemaRead returnValue = discoverJobToOutput(response);
223223
if (response.isSuccess()) {
224-
final UUID catalogId = configRepository.writeActorCatalogFetchEvent(response.getOutput(), source.getSourceId(), connectorVersion, configHash);
224+
final UUID catalogId =
225+
configRepository.writeActorCatalogFetchEvent(response.getOutput(), source.getSourceId(), connectorVersion, configHash);
225226
returnValue.catalogId(catalogId);
226227
}
227228
return returnValue;

airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java

+29
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,27 @@
66

77
import com.fasterxml.jackson.databind.JsonNode;
88
import io.airbyte.config.Configs.WorkerEnvironment;
9+
import io.airbyte.config.ConnectorJobOutput;
10+
import io.airbyte.config.ConnectorJobOutput.OutputType;
11+
import io.airbyte.config.FailureReason;
912
import io.airbyte.config.StandardSyncInput;
1013
import io.airbyte.config.WorkerDestinationConfig;
1114
import io.airbyte.config.WorkerSourceConfig;
1215
import io.airbyte.config.helpers.LogClientSingleton;
16+
import io.airbyte.protocol.models.AirbyteMessage;
17+
import io.airbyte.protocol.models.AirbyteMessage.Type;
18+
import io.airbyte.protocol.models.AirbyteTraceMessage;
1319
import io.airbyte.scheduler.models.JobRunConfig;
20+
import io.airbyte.workers.exception.WorkerException;
21+
import io.airbyte.workers.helper.FailureHelper;
1422
import java.nio.file.Path;
1523
import java.time.Duration;
1624
import java.time.temporal.ChronoUnit;
25+
import java.util.ArrayList;
26+
import java.util.List;
1727
import java.util.Map;
1828
import java.util.Objects;
29+
import java.util.Optional;
1930
import java.util.concurrent.TimeUnit;
2031
import java.util.stream.Collectors;
2132
import javax.annotation.Nullable;
@@ -100,6 +111,24 @@ public static WorkerDestinationConfig syncToWorkerDestinationConfig(final Standa
100111
.withState(sync.getState());
101112
}
102113

114+
public static ConnectorJobOutput getJobFailureOutputOrThrow(final OutputType outputType,
115+
final Map<Type, List<AirbyteMessage>> messagesByType,
116+
final String defaultErrorMessage)
117+
throws WorkerException {
118+
final Optional<AirbyteTraceMessage> traceMessage =
119+
messagesByType.getOrDefault(Type.TRACE, new ArrayList<>()).stream()
120+
.map(AirbyteMessage::getTrace)
121+
.filter(trace -> trace.getType() == AirbyteTraceMessage.Type.ERROR)
122+
.findFirst();
123+
124+
if (traceMessage.isPresent()) {
125+
final FailureReason failureReason = FailureHelper.genericFailure(traceMessage.get(), null, null);
126+
return new ConnectorJobOutput().withOutputType(outputType).withFailureReason(failureReason);
127+
}
128+
129+
throw new WorkerException(defaultErrorMessage);
130+
}
131+
103132
public static Map<String, JsonNode> mapStreamNamesToSchemas(final StandardSyncInput syncInput) {
104133
return syncInput.getCatalog().getStreams().stream().collect(
105134
Collectors.toMap(

airbyte-workers/src/main/java/io/airbyte/workers/general/CheckConnectionWorker.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44

55
package io.airbyte.workers.general;
66

7+
import io.airbyte.config.ConnectorJobOutput;
78
import io.airbyte.config.StandardCheckConnectionInput;
8-
import io.airbyte.config.StandardCheckConnectionOutput;
99
import io.airbyte.workers.Worker;
1010

11-
public interface CheckConnectionWorker extends Worker<StandardCheckConnectionInput, StandardCheckConnectionOutput> {}
11+
public interface CheckConnectionWorker extends Worker<StandardCheckConnectionInput, ConnectorJobOutput> {}

0 commit comments

Comments
 (0)