Skip to content

Commit f924359

Browse files
authored
report synchronous check/spec/discover failures to JobErrorReporter (#14818)
* report failures for synchronous check/discover, refactor common logic * allow null workspace, send spec errors * add failure origin, format * rm connector_type, fix failing tests * add tests for other job types * log instead of throw * move swallow to common spot * connector jobs use context instead of passing full config * sync jobs use context instead of passing raw config * fix failing test * fix failing scheduler client test
1 parent bbbd1ad commit f924359

File tree

14 files changed

+507
-81
lines changed

14 files changed

+507
-81
lines changed

airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/DefaultSynchronousSchedulerClient.java

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import com.fasterxml.jackson.databind.JsonNode;
88
import com.google.common.annotations.VisibleForTesting;
9+
import io.airbyte.commons.lang.Exceptions;
910
import io.airbyte.config.ConnectorJobOutput;
1011
import io.airbyte.config.DestinationConnection;
1112
import io.airbyte.config.JobCheckConnectionConfig;
@@ -16,6 +17,8 @@
1617
import io.airbyte.config.StandardCheckConnectionOutput;
1718
import io.airbyte.protocol.models.AirbyteCatalog;
1819
import io.airbyte.protocol.models.ConnectorSpecification;
20+
import io.airbyte.scheduler.persistence.job_error_reporter.ConnectorJobReportingContext;
21+
import io.airbyte.scheduler.persistence.job_error_reporter.JobErrorReporter;
1922
import io.airbyte.scheduler.persistence.job_factory.OAuthConfigSupplier;
2023
import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
2124
import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState;
@@ -26,19 +29,27 @@
2629
import java.util.Optional;
2730
import java.util.UUID;
2831
import java.util.function.Function;
32+
import java.util.function.Supplier;
2933
import javax.annotation.Nullable;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
3036

3137
public class DefaultSynchronousSchedulerClient implements SynchronousSchedulerClient {
3238

39+
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSynchronousSchedulerClient.class);
40+
3341
private final TemporalClient temporalClient;
3442
private final JobTracker jobTracker;
43+
private final JobErrorReporter jobErrorReporter;
3544
private final OAuthConfigSupplier oAuthConfigSupplier;
3645

3746
public DefaultSynchronousSchedulerClient(final TemporalClient temporalClient,
3847
final JobTracker jobTracker,
48+
final JobErrorReporter jobErrorReporter,
3949
final OAuthConfigSupplier oAuthConfigSupplier) {
4050
this.temporalClient = temporalClient;
4151
this.jobTracker = jobTracker;
52+
this.jobErrorReporter = jobErrorReporter;
4253
this.oAuthConfigSupplier = oAuthConfigSupplier;
4354
}
4455

@@ -53,10 +64,14 @@ public SynchronousResponse<StandardCheckConnectionOutput> createSourceCheckConne
5364
.withConnectionConfiguration(sourceConfiguration)
5465
.withDockerImage(dockerImage);
5566

67+
final UUID jobId = UUID.randomUUID();
68+
final ConnectorJobReportingContext jobReportingContext = new ConnectorJobReportingContext(jobId, dockerImage);
69+
5670
return execute(
5771
ConfigType.CHECK_CONNECTION_SOURCE,
72+
jobReportingContext,
5873
source.getSourceDefinitionId(),
59-
jobId -> temporalClient.submitCheckConnection(UUID.randomUUID(), 0, jobCheckConnectionConfig),
74+
() -> temporalClient.submitCheckConnection(UUID.randomUUID(), 0, jobCheckConnectionConfig),
6075
ConnectorJobOutput::getCheckConnection,
6176
source.getWorkspaceId());
6277
}
@@ -73,10 +88,14 @@ public SynchronousResponse<StandardCheckConnectionOutput> createDestinationCheck
7388
.withConnectionConfiguration(destinationConfiguration)
7489
.withDockerImage(dockerImage);
7590

91+
final UUID jobId = UUID.randomUUID();
92+
final ConnectorJobReportingContext jobReportingContext = new ConnectorJobReportingContext(jobId, dockerImage);
93+
7694
return execute(
7795
ConfigType.CHECK_CONNECTION_DESTINATION,
96+
jobReportingContext,
7897
destination.getDestinationDefinitionId(),
79-
jobId -> temporalClient.submitCheckConnection(UUID.randomUUID(), 0, jobCheckConnectionConfig),
98+
() -> temporalClient.submitCheckConnection(jobId, 0, jobCheckConnectionConfig),
8099
ConnectorJobOutput::getCheckConnection,
81100
destination.getWorkspaceId());
82101
}
@@ -92,10 +111,14 @@ public SynchronousResponse<AirbyteCatalog> createDiscoverSchemaJob(final SourceC
92111
.withConnectionConfiguration(sourceConfiguration)
93112
.withDockerImage(dockerImage);
94113

114+
final UUID jobId = UUID.randomUUID();
115+
final ConnectorJobReportingContext jobReportingContext = new ConnectorJobReportingContext(jobId, dockerImage);
116+
95117
return execute(
96118
ConfigType.DISCOVER_SCHEMA,
119+
jobReportingContext,
97120
source.getSourceDefinitionId(),
98-
jobId -> temporalClient.submitDiscoverSchema(UUID.randomUUID(), 0, jobDiscoverCatalogConfig),
121+
() -> temporalClient.submitDiscoverSchema(jobId, 0, jobDiscoverCatalogConfig),
99122
ConnectorJobOutput::getDiscoverCatalog,
100123
source.getWorkspaceId());
101124
}
@@ -104,31 +127,39 @@ public SynchronousResponse<AirbyteCatalog> createDiscoverSchemaJob(final SourceC
104127
public SynchronousResponse<ConnectorSpecification> createGetSpecJob(final String dockerImage) throws IOException {
105128
final JobGetSpecConfig jobSpecConfig = new JobGetSpecConfig().withDockerImage(dockerImage);
106129

130+
final UUID jobId = UUID.randomUUID();
131+
final ConnectorJobReportingContext jobReportingContext = new ConnectorJobReportingContext(jobId, dockerImage);
132+
107133
return execute(
108134
ConfigType.GET_SPEC,
135+
jobReportingContext,
109136
null,
110-
jobId -> temporalClient.submitGetSpec(UUID.randomUUID(), 0, jobSpecConfig),
137+
() -> temporalClient.submitGetSpec(jobId, 0, jobSpecConfig),
111138
ConnectorJobOutput::getSpec,
112139
null);
113140
}
114141

115142
@VisibleForTesting
116143
<T, U> SynchronousResponse<T> execute(final ConfigType configType,
144+
final ConnectorJobReportingContext jobContext,
117145
@Nullable final UUID connectorDefinitionId,
118-
final Function<UUID, TemporalResponse<U>> executor,
146+
final Supplier<TemporalResponse<U>> executor,
119147
final Function<U, T> outputMapper,
120148
final UUID workspaceId) {
121149
final long createdAt = Instant.now().toEpochMilli();
122-
final UUID jobId = UUID.randomUUID();
150+
final UUID jobId = jobContext.jobId();
123151
try {
124152
track(jobId, configType, connectorDefinitionId, workspaceId, JobState.STARTED, null);
125-
final TemporalResponse<U> temporalResponse = executor.apply(jobId);
153+
final TemporalResponse<U> temporalResponse = executor.get();
126154
final Optional<U> jobOutput = temporalResponse.getOutput();
127155
final T mappedOutput = jobOutput.map(outputMapper).orElse(null);
128156
final JobState outputState = temporalResponse.getMetadata().isSucceeded() ? JobState.SUCCEEDED : JobState.FAILED;
129157

130158
track(jobId, configType, connectorDefinitionId, workspaceId, outputState, mappedOutput);
131-
// TODO(pedro): report ConnectorJobOutput's failureReason to the JobErrorReporter, like the above
159+
160+
if (outputState == JobState.FAILED && jobOutput.isPresent()) {
161+
reportError(configType, jobContext, jobOutput.get(), connectorDefinitionId, workspaceId);
162+
}
132163

133164
final long endedAt = Instant.now().toEpochMilli();
134165
return SynchronousResponse.fromTemporalResponse(
@@ -177,4 +208,34 @@ private <T> void track(final UUID jobId,
177208

178209
}
179210

211+
private <S, T> void reportError(final ConfigType configType,
212+
final ConnectorJobReportingContext jobContext,
213+
final T jobOutput,
214+
final UUID connectorDefinitionId,
215+
final UUID workspaceId) {
216+
Exceptions.swallow(() -> {
217+
switch (configType) {
218+
case CHECK_CONNECTION_SOURCE -> jobErrorReporter.reportSourceCheckJobFailure(
219+
connectorDefinitionId,
220+
workspaceId,
221+
((ConnectorJobOutput) jobOutput).getFailureReason(),
222+
jobContext);
223+
case CHECK_CONNECTION_DESTINATION -> jobErrorReporter.reportDestinationCheckJobFailure(
224+
connectorDefinitionId,
225+
workspaceId,
226+
((ConnectorJobOutput) jobOutput).getFailureReason(),
227+
jobContext);
228+
case DISCOVER_SCHEMA -> jobErrorReporter.reportDiscoverJobFailure(
229+
connectorDefinitionId,
230+
workspaceId,
231+
((ConnectorJobOutput) jobOutput).getFailureReason(),
232+
jobContext);
233+
case GET_SPEC -> jobErrorReporter.reportSpecJobFailure(
234+
((ConnectorJobOutput) jobOutput).getFailureReason(),
235+
jobContext);
236+
default -> LOGGER.error("Tried to report job failure for type {}, but this job type is not supported", configType);
237+
}
238+
});
239+
}
240+
180241
}

airbyte-scheduler/client/src/test/java/io/airbyte/scheduler/client/DefaultSynchronousSchedulerClientTest.java

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import static org.mockito.ArgumentMatchers.eq;
1515
import static org.mockito.Mockito.mock;
1616
import static org.mockito.Mockito.verify;
17+
import static org.mockito.Mockito.verifyNoInteractions;
1718
import static org.mockito.Mockito.when;
1819

1920
import com.fasterxml.jackson.databind.JsonNode;
@@ -29,6 +30,8 @@
2930
import io.airbyte.config.StandardCheckConnectionOutput;
3031
import io.airbyte.protocol.models.AirbyteCatalog;
3132
import io.airbyte.protocol.models.ConnectorSpecification;
33+
import io.airbyte.scheduler.persistence.job_error_reporter.ConnectorJobReportingContext;
34+
import io.airbyte.scheduler.persistence.job_error_reporter.JobErrorReporter;
3235
import io.airbyte.scheduler.persistence.job_factory.OAuthConfigSupplier;
3336
import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
3437
import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState;
@@ -39,6 +42,7 @@
3942
import java.nio.file.Path;
4043
import java.util.UUID;
4144
import java.util.function.Function;
45+
import java.util.function.Supplier;
4246
import org.junit.jupiter.api.BeforeEach;
4347
import org.junit.jupiter.api.DisplayName;
4448
import org.junit.jupiter.api.Nested;
@@ -69,15 +73,17 @@ class DefaultSynchronousSchedulerClientTest {
6973

7074
private TemporalClient temporalClient;
7175
private JobTracker jobTracker;
76+
private JobErrorReporter jobErrorReporter;
7277
private OAuthConfigSupplier oAuthConfigSupplier;
7378
private DefaultSynchronousSchedulerClient schedulerClient;
7479

7580
@BeforeEach
7681
void setup() throws IOException {
7782
temporalClient = mock(TemporalClient.class);
7883
jobTracker = mock(JobTracker.class);
84+
jobErrorReporter = mock(JobErrorReporter.class);
7985
oAuthConfigSupplier = mock(OAuthConfigSupplier.class);
80-
schedulerClient = new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, oAuthConfigSupplier);
86+
schedulerClient = new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, jobErrorReporter, oAuthConfigSupplier);
8187

8288
when(oAuthConfigSupplier.injectSourceOAuthParameters(any(), any(), eq(CONFIGURATION))).thenReturn(CONFIGURATION);
8389
when(oAuthConfigSupplier.injectDestinationOAuthParameters(any(), any(), eq(CONFIGURATION))).thenReturn(CONFIGURATION);
@@ -97,12 +103,13 @@ class ExecuteSynchronousJob {
97103
@Test
98104
void testExecuteJobSuccess() {
99105
final UUID sourceDefinitionId = UUID.randomUUID();
100-
final Function<UUID, TemporalResponse<String>> function = mock(Function.class);
106+
final Supplier<TemporalResponse<String>> function = mock(Supplier.class);
101107
final Function<String, String> mapperFunction = output -> output;
102-
when(function.apply(any(UUID.class))).thenReturn(new TemporalResponse<>("hello", createMetadata(true)));
108+
when(function.get()).thenReturn(new TemporalResponse<>("hello", createMetadata(true)));
103109

110+
final ConnectorJobReportingContext jobContext = new ConnectorJobReportingContext(UUID.randomUUID(), "source-airbyte:1.2.3");
104111
final SynchronousResponse<String> response = schedulerClient
105-
.execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID);
112+
.execute(ConfigType.DISCOVER_SCHEMA, jobContext, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID);
106113

107114
assertNotNull(response);
108115
assertEquals("hello", response.getOutput());
@@ -114,18 +121,20 @@ void testExecuteJobSuccess() {
114121

115122
verify(jobTracker).trackDiscover(any(UUID.class), eq(sourceDefinitionId), eq(WORKSPACE_ID), eq(JobState.STARTED));
116123
verify(jobTracker).trackDiscover(any(UUID.class), eq(sourceDefinitionId), eq(WORKSPACE_ID), eq(JobState.SUCCEEDED));
124+
verifyNoInteractions(jobErrorReporter);
117125
}
118126

119127
@SuppressWarnings("unchecked")
120128
@Test
121129
void testExecuteMappedOutput() {
122130
final UUID sourceDefinitionId = UUID.randomUUID();
123-
final Function<UUID, TemporalResponse<Integer>> function = mock(Function.class);
131+
final Supplier<TemporalResponse<Integer>> function = mock(Supplier.class);
124132
final Function<Integer, String> mapperFunction = Object::toString;
125-
when(function.apply(any(UUID.class))).thenReturn(new TemporalResponse<>(42, createMetadata(true)));
133+
when(function.get()).thenReturn(new TemporalResponse<>(42, createMetadata(true)));
126134

135+
final ConnectorJobReportingContext jobContext = new ConnectorJobReportingContext(UUID.randomUUID(), "source-airbyte:1.2.3");
127136
final SynchronousResponse<String> response = schedulerClient
128-
.execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID);
137+
.execute(ConfigType.DISCOVER_SCHEMA, jobContext, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID);
129138

130139
assertNotNull(response);
131140
assertEquals("42", response.getOutput());
@@ -140,12 +149,13 @@ void testExecuteMappedOutput() {
140149
@Test
141150
void testExecuteJobFailure() {
142151
final UUID sourceDefinitionId = UUID.randomUUID();
143-
final Function<UUID, TemporalResponse<String>> function = mock(Function.class);
152+
final Supplier<TemporalResponse<String>> function = mock(Supplier.class);
144153
final Function<String, String> mapperFunction = output -> output;
145-
when(function.apply(any(UUID.class))).thenReturn(new TemporalResponse<>(null, createMetadata(false)));
154+
when(function.get()).thenReturn(new TemporalResponse<>(null, createMetadata(false)));
146155

156+
final ConnectorJobReportingContext jobContext = new ConnectorJobReportingContext(UUID.randomUUID(), "source-airbyte:1.2.3");
147157
final SynchronousResponse<String> response = schedulerClient
148-
.execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID);
158+
.execute(ConfigType.DISCOVER_SCHEMA, jobContext, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID);
149159

150160
assertNotNull(response);
151161
assertNull(response.getOutput());
@@ -157,22 +167,26 @@ void testExecuteJobFailure() {
157167

158168
verify(jobTracker).trackDiscover(any(UUID.class), eq(sourceDefinitionId), eq(WORKSPACE_ID), eq(JobState.STARTED));
159169
verify(jobTracker).trackDiscover(any(UUID.class), eq(sourceDefinitionId), eq(WORKSPACE_ID), eq(JobState.FAILED));
170+
verifyNoInteractions(jobErrorReporter);
160171
}
161172

162173
@SuppressWarnings("unchecked")
163174
@Test
164175
void testExecuteRuntimeException() {
165176
final UUID sourceDefinitionId = UUID.randomUUID();
166-
final Function<UUID, TemporalResponse<String>> function = mock(Function.class);
177+
final Supplier<TemporalResponse<String>> function = mock(Supplier.class);
167178
final Function<String, String> mapperFunction = output -> output;
168-
when(function.apply(any(UUID.class))).thenThrow(new RuntimeException());
179+
when(function.get()).thenThrow(new RuntimeException());
169180

181+
final ConnectorJobReportingContext jobContext = new ConnectorJobReportingContext(UUID.randomUUID(), "source-airbyte:1.2.3");
170182
assertThrows(
171183
RuntimeException.class,
172-
() -> schedulerClient.execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID));
184+
() -> schedulerClient.execute(ConfigType.DISCOVER_SCHEMA, jobContext, sourceDefinitionId, function,
185+
mapperFunction, WORKSPACE_ID));
173186

174187
verify(jobTracker).trackDiscover(any(UUID.class), eq(sourceDefinitionId), eq(WORKSPACE_ID), eq(JobState.STARTED));
175188
verify(jobTracker).trackDiscover(any(UUID.class), eq(sourceDefinitionId), eq(WORKSPACE_ID), eq(JobState.FAILED));
189+
verifyNoInteractions(jobErrorReporter);
176190
}
177191

178192
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.scheduler.persistence.job_error_reporter;
6+
7+
import java.util.UUID;
8+
9+
public record ConnectorJobReportingContext(UUID jobId, String dockerImage) {}

0 commit comments

Comments
 (0)