Skip to content

Commit f3c94fb

Browse files
only compute diff if the schema discovery actually succeeded (#22377)
1 parent b309b29 commit f3c94fb

File tree

2 files changed

+35
-1
lines changed

2 files changed

+35
-1
lines changed

airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/SchedulerHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final Source
269269
isCustomConnector);
270270
final SourceDiscoverSchemaRead discoveredSchema = retrieveDiscoveredSchema(persistedCatalogId, sourceDef);
271271

272-
if (discoverSchemaRequestBody.getConnectionId() != null) {
272+
if (persistedCatalogId.isSuccess() && discoverSchemaRequestBody.getConnectionId() != null) {
273273
// modify discoveredSchema object to add CatalogDiff, containsBreakingChange, and connectionStatus
274274
generateCatalogDiffsAndDisableConnectionsIfNeeded(discoveredSchema, discoverSchemaRequestBody);
275275
}

airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/SchedulerHandlerTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -977,6 +977,40 @@ void testDiscoverSchemaForSourceMultipleConnectionsFeatureFlagOn() throws IOExce
977977
assertEquals(ConnectionStatus.INACTIVE, connectionUpdateValues.get(2).getStatus());
978978
}
979979

980+
@Test
981+
void testDiscoverSchemaFromSourceIdWithConnectionUpdateNonSuccessResponse() throws IOException, JsonValidationException, ConfigNotFoundException {
982+
final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID());
983+
final SourceDiscoverSchemaRequestBody request = new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId())
984+
.connectionId(UUID.randomUUID());
985+
986+
// Mock the source definition.
987+
when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId()))
988+
.thenReturn(new StandardSourceDefinition()
989+
.withDockerRepository(SOURCE_DOCKER_REPO)
990+
.withDockerImageTag(SOURCE_DOCKER_TAG)
991+
.withProtocolVersion(SOURCE_PROTOCOL_VERSION)
992+
.withSourceDefinitionId(source.getSourceDefinitionId()));
993+
// Mock the source itself.
994+
when(configRepository.getSourceConnection(source.getSourceId())).thenReturn(source);
995+
// Mock the Discover job results.
996+
final SynchronousResponse<UUID> discoverResponse = (SynchronousResponse<UUID>) jobResponse;
997+
final SynchronousJobMetadata metadata = mock(SynchronousJobMetadata.class);
998+
when(discoverResponse.isSuccess()).thenReturn(false);
999+
when(discoverResponse.getMetadata()).thenReturn(metadata);
1000+
when(metadata.isSucceeded()).thenReturn(false);
1001+
when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG, new Version(SOURCE_PROTOCOL_VERSION),
1002+
false))
1003+
.thenReturn(discoverResponse);
1004+
1005+
final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request);
1006+
1007+
assertNull(actual.getCatalog());
1008+
assertNotNull(actual.getJobInfo());
1009+
assertFalse(actual.getJobInfo().getSucceeded());
1010+
verify(synchronousSchedulerClient).createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG, new Version(SOURCE_PROTOCOL_VERSION),
1011+
false);
1012+
}
1013+
9801014
@Test
9811015
void testDiscoverSchemaForSourceFromSourceCreate() throws JsonValidationException, IOException, ConfigNotFoundException {
9821016
final SourceConnection source = new SourceConnection()

0 commit comments

Comments
 (0)