Skip to content

Commit 2e2219f

Browse files
malikdiarrasuhomud
authored andcommitted
Display union of schema and replicated schema in connection settings page (#11611)
* Fix selected flag in merged catalog. * Merge cached schema and sync schema by default
1 parent 8f76229 commit 2e2219f

File tree

4 files changed

+46
-10
lines changed

4 files changed

+46
-10
lines changed

airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -794,6 +794,16 @@ private Map<UUID, AirbyteCatalog> findCatalogByHash(final String catalogHash, fi
794794
return result;
795795
}
796796

797+
public ActorCatalog getActorCatalogById(final UUID actorCatalogId)
798+
throws IOException, ConfigNotFoundException {
799+
final Result<Record> result = database.query(ctx -> ctx.select(ACTOR_CATALOG.asterisk())
800+
.from(ACTOR_CATALOG).where(ACTOR_CATALOG.ID.eq(actorCatalogId))).fetch();
801+
if (result.size() > 0) {
802+
return DbConverter.buildActorCatalog(result.get(0));
803+
}
804+
throw new ConfigNotFoundException(ConfigSchema.ACTOR_CATALOG, actorCatalogId);
805+
}
806+
797807
/**
798808
* Store an Airbyte catalog in DB if it is not present already
799809
*

airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.google.common.collect.ImmutableMap.Builder;
1010
import com.google.common.collect.Lists;
1111
import io.airbyte.analytics.TrackingClient;
12+
import io.airbyte.api.model.AirbyteCatalog;
1213
import io.airbyte.api.model.ConnectionCreate;
1314
import io.airbyte.api.model.ConnectionRead;
1415
import io.airbyte.api.model.ConnectionReadList;
@@ -22,6 +23,8 @@
2223
import io.airbyte.api.model.WorkspaceIdRequestBody;
2324
import io.airbyte.commons.enums.Enums;
2425
import io.airbyte.commons.features.FeatureFlags;
26+
import io.airbyte.commons.json.Jsons;
27+
import io.airbyte.config.ActorCatalog;
2528
import io.airbyte.config.DestinationConnection;
2629
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
2730
import io.airbyte.config.Schedule;
@@ -47,6 +50,7 @@
4750
import java.util.Collections;
4851
import java.util.HashSet;
4952
import java.util.List;
53+
import java.util.Optional;
5054
import java.util.UUID;
5155
import java.util.concurrent.TimeUnit;
5256
import java.util.function.Supplier;
@@ -269,6 +273,17 @@ public ConnectionRead getConnection(final UUID connectionId)
269273
return buildConnectionRead(connectionId);
270274
}
271275

276+
public Optional<AirbyteCatalog> getConnectionAirbyteCatalog(final UUID connectionId)
277+
throws JsonValidationException, ConfigNotFoundException, IOException {
278+
final StandardSync connection = configRepository.getStandardSync(connectionId);
279+
if (connection.getSourceCatalogId() == null) {
280+
return Optional.empty();
281+
}
282+
final ActorCatalog catalog = configRepository.getActorCatalogById(connection.getSourceCatalogId());
283+
return Optional.of(CatalogConverter.toApi(Jsons.object(catalog.getCatalog(),
284+
io.airbyte.protocol.models.AirbyteCatalog.class)));
285+
}
286+
272287
public ConnectionReadList searchConnections(final ConnectionSearch connectionSearch)
273288
throws JsonValidationException, IOException, ConfigNotFoundException {
274289
final List<ConnectionRead> reads = Lists.newArrayList();

airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import java.util.Collections;
5656
import java.util.List;
5757
import java.util.Map;
58+
import java.util.Optional;
5859
import java.util.Set;
5960
import java.util.UUID;
6061
import java.util.function.Predicate;
@@ -198,19 +199,24 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti
198199

199200
final ConnectionRead connection = connectionsHandler.getConnection(connectionIdRequestBody.getConnectionId());
200201

202+
final Optional<AirbyteCatalog> discovered;
201203
if (MoreBooleans.isTruthy(webBackendConnectionRequestBody.getWithRefreshedCatalog())) {
202204
final SourceDiscoverSchemaRequestBody discoverSchemaReadReq =
203205
new SourceDiscoverSchemaRequestBody().sourceId(connection.getSourceId()).disableCache(true);
204206
final SourceDiscoverSchemaRead discoverSchema = schedulerHandler.discoverSchemaForSourceFromSourceId(discoverSchemaReadReq);
205207

206-
final AirbyteCatalog original = connection.getSyncCatalog();
207-
final AirbyteCatalog discovered = discoverSchema.getCatalog();
208-
final AirbyteCatalog combined = updateSchemaWithDiscovery(original, discovered);
209-
208+
discovered = Optional.of(discoverSchema.getCatalog());
210209
connection.setSourceCatalogId(discoverSchema.getCatalogId());
210+
} else {
211+
discovered = connectionsHandler.getConnectionAirbyteCatalog(webBackendConnectionRequestBody.getConnectionId());
212+
}
213+
final AirbyteCatalog original = connection.getSyncCatalog();
214+
if (discovered.isPresent()) {
215+
final AirbyteCatalog combined = updateSchemaWithDiscovery(original, discovered.get());
211216
connection.setSyncCatalog(combined);
217+
} else {
218+
connection.setSyncCatalog(original);
212219
}
213-
214220
return buildWebBackendConnectionRead(connection);
215221
}
216222

@@ -252,9 +258,10 @@ protected static AirbyteCatalog updateSchemaWithDiscovery(final AirbyteCatalog o
252258
}
253259

254260
outputStreamConfig.setAliasName(originalStreamConfig.getAliasName());
255-
outputStreamConfig.setSelected(originalStreamConfig.getSelected());
261+
outputStreamConfig.setSelected(true);
256262
} else {
257263
outputStreamConfig = s.getConfig();
264+
outputStreamConfig.setSelected(false);
258265
}
259266
final AirbyteStreamAndConfiguration outputStream = new AirbyteStreamAndConfiguration()
260267
.stream(Jsons.clone(stream))

airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -680,7 +680,8 @@ public void testUpdateSchemaWithDiscoveryFromEmpty() {
680680
.cursorField(Collections.emptyList())
681681
.destinationSyncMode(DestinationSyncMode.OVERWRITE)
682682
.primaryKey(Collections.emptyList())
683-
.aliasName("stream1");
683+
.aliasName("stream1")
684+
.setSelected(false);
684685

685686
final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithDiscovery(original, discovered);
686687

@@ -729,7 +730,8 @@ public void testUpdateSchemaWithDiscoveryResetStream() {
729730
.cursorField(Collections.emptyList())
730731
.destinationSyncMode(DestinationSyncMode.OVERWRITE)
731732
.primaryKey(Collections.emptyList())
732-
.aliasName("stream1");
733+
.aliasName("stream1")
734+
.setSelected(false);
733735

734736
final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithDiscovery(original, discovered);
735737

@@ -791,7 +793,8 @@ public void testUpdateSchemaWithDiscoveryMergeNewStream() {
791793
.cursorField(List.of("field1"))
792794
.destinationSyncMode(DestinationSyncMode.APPEND)
793795
.primaryKey(Collections.emptyList())
794-
.aliasName("renamed_stream");
796+
.aliasName("renamed_stream")
797+
.setSelected(true);
795798
final AirbyteStreamAndConfiguration expectedNewStream = ConnectionHelpers.generateBasicApiCatalog().getStreams().get(0);
796799
expectedNewStream.getStream()
797800
.name("stream2")
@@ -803,7 +806,8 @@ public void testUpdateSchemaWithDiscoveryMergeNewStream() {
803806
.cursorField(Collections.emptyList())
804807
.destinationSyncMode(DestinationSyncMode.OVERWRITE)
805808
.primaryKey(Collections.emptyList())
806-
.aliasName("stream2");
809+
.aliasName("stream2")
810+
.setSelected(false);
807811
expected.getStreams().add(expectedNewStream);
808812

809813
final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithDiscovery(original, discovered);

0 commit comments

Comments
 (0)