Skip to content

Commit 8510835

Browse files
alovewjhammarstedt
authored andcommitted
Add SchemaChange to WebBackendConnectionRead object (airbytehq#17969)
* add schemaChange field to WebBackendConnectionRead and breakingChange to ConnectionRead
1 parent 4e2f1be commit 8510835

File tree

18 files changed

+344
-143
lines changed

18 files changed

+344
-143
lines changed

airbyte-api/src/main/openapi/config.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3397,6 +3397,7 @@ components:
33973397
- destinationId
33983398
- syncCatalog
33993399
- status
3400+
- breakingChange
34003401
properties:
34013402
connectionId:
34023403
$ref: "#/components/schemas/ConnectionId"
@@ -3437,6 +3438,14 @@ components:
34373438
format: uuid
34383439
geography:
34393440
$ref: "#/components/schemas/Geography"
3441+
breakingChange:
3442+
type: boolean
3443+
SchemaChange:
3444+
enum:
3445+
- no_change
3446+
- non_breaking
3447+
- breaking
3448+
type: string
34403449
ConnectionSearch:
34413450
type: object
34423451
properties:
@@ -4677,6 +4686,7 @@ components:
46774686
- source
46784687
- destination
46794688
- isSyncing
4689+
- schemaChange
46804690
properties:
46814691
connectionId:
46824692
$ref: "#/components/schemas/ConnectionId"
@@ -4733,6 +4743,8 @@ components:
47334743
$ref: "#/components/schemas/CatalogDiff"
47344744
geography:
47354745
$ref: "#/components/schemas/Geography"
4746+
schemaChange:
4747+
$ref: "#/components/schemas/SchemaChange"
47364748
WebBackendConnectionReadList:
47374749
type: object
47384750
required:

airbyte-config/config-models/src/main/resources/types/StandardSync.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ required:
1212
- manual
1313
- namespaceDefinition
1414
- geography
15+
- breakingChange
1516
additionalProperties: false
1617
properties:
1718
namespaceDefinition:
@@ -117,3 +118,5 @@ properties:
117118
"$ref": ResourceRequirements.yaml
118119
geography:
119120
"$ref": Geography.yaml
121+
breakingChange:
122+
type: boolean

airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.airbyte.commons.lang.MoreBooleans;
3030
import io.airbyte.commons.version.AirbyteProtocolVersion;
3131
import io.airbyte.config.ActorCatalog;
32+
import io.airbyte.config.ActorCatalogFetchEvent;
3233
import io.airbyte.config.AirbyteConfig;
3334
import io.airbyte.config.ConfigSchema;
3435
import io.airbyte.config.DestinationConnection;
@@ -143,10 +144,7 @@ public Optional<StandardWorkspace> getWorkspaceBySlugOptional(final String slug,
143144
.where(WORKSPACE.SLUG.eq(slug)).andNot(WORKSPACE.TOMBSTONE)).fetch();
144145
}
145146

146-
if (result.size() == 0) {
147-
return Optional.empty();
148-
}
149-
return Optional.of(DbConverter.buildStandardWorkspace(result.get(0)));
147+
return result.stream().findFirst().map(DbConverter::buildStandardWorkspace);
150148
}
151149

152150
public StandardWorkspace getWorkspaceBySlug(final String slug, final boolean includeTombstone)
@@ -788,10 +786,7 @@ public Optional<SourceOAuthParameter> getSourceOAuthParamByDefinitionIdOptional(
788786
ACTOR_OAUTH_PARAMETER.ACTOR_DEFINITION_ID.eq(sourceDefinitionId)).fetch();
789787
});
790788

791-
if (result.size() == 0) {
792-
return Optional.empty();
793-
}
794-
return Optional.of(DbConverter.buildSourceOAuthParameter(result.get(0)));
789+
return result.stream().findFirst().map(DbConverter::buildSourceOAuthParameter);
795790
}
796791

797792
public void writeSourceOAuthParam(final SourceOAuthParameter sourceOAuthParameter) throws JsonValidationException, IOException {
@@ -817,10 +812,7 @@ public Optional<DestinationOAuthParameter> getDestinationOAuthParamByDefinitionI
817812
ACTOR_OAUTH_PARAMETER.ACTOR_DEFINITION_ID.eq(destinationDefinitionId)).fetch();
818813
});
819814

820-
if (result.size() == 0) {
821-
return Optional.empty();
822-
}
823-
return Optional.of(DbConverter.buildDestinationOAuthParameter(result.get(0)));
815+
return result.stream().findFirst().map(DbConverter::buildDestinationOAuthParameter);
824816
}
825817

826818
public void writeDestinationOAuthParam(final DestinationOAuthParameter destinationOAuthParameter) throws JsonValidationException, IOException {
@@ -917,6 +909,7 @@ public ActorCatalog getActorCatalogById(final UUID actorCatalogId)
917909
throws IOException, ConfigNotFoundException {
918910
final Result<Record> result = database.query(ctx -> ctx.select(ACTOR_CATALOG.asterisk())
919911
.from(ACTOR_CATALOG).where(ACTOR_CATALOG.ID.eq(actorCatalogId))).fetch();
912+
920913
if (result.size() > 0) {
921914
return DbConverter.buildActorCatalog(result.get(0));
922915
}
@@ -934,8 +927,8 @@ public ActorCatalog getActorCatalogById(final UUID actorCatalogId)
934927
* @return the db identifier for the cached catalog.
935928
*/
936929
private UUID getOrInsertActorCatalog(final AirbyteCatalog airbyteCatalog,
937-
final DSLContext context) {
938-
final OffsetDateTime timestamp = OffsetDateTime.now();
930+
final DSLContext context,
931+
final OffsetDateTime timestamp) {
939932
final HashFunction hashFunction = Hashing.murmur3_32_fixed();
940933
final String catalogHash = hashFunction.hashBytes(Jsons.serialize(airbyteCatalog).getBytes(
941934
Charsets.UTF_8)).toString();
@@ -969,11 +962,17 @@ public Optional<ActorCatalog> getActorCatalog(final UUID actorId,
969962
.and(ACTOR_CATALOG_FETCH_EVENT.CONFIG_HASH.eq(configHash))
970963
.orderBy(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT.desc()).limit(1)).fetch();
971964

972-
if (records.size() >= 1) {
973-
return Optional.of(DbConverter.buildActorCatalog(records.get(0)));
974-
}
975-
return Optional.empty();
965+
return records.stream().findFirst().map(DbConverter::buildActorCatalog);
966+
}
967+
968+
public Optional<ActorCatalogFetchEvent> getMostRecentActorCatalogFetchEventForSource(final UUID sourceId) throws IOException {
969+
970+
final Result<Record> records = database.query(ctx -> ctx.select(ACTOR_CATALOG_FETCH_EVENT.asterisk())
971+
.from(ACTOR_CATALOG_FETCH_EVENT)
972+
.where(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID.eq(sourceId))
973+
.orderBy(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT.desc()).limit(1).fetch());
976974

975+
return records.stream().findFirst().map(DbConverter::buildActorCatalogFetchEvent);
977976
}
978977

979978
/**
@@ -1001,7 +1000,7 @@ public UUID writeActorCatalogFetchEvent(final AirbyteCatalog catalog,
10011000
final OffsetDateTime timestamp = OffsetDateTime.now();
10021001
final UUID fetchEventID = UUID.randomUUID();
10031002
return database.transaction(ctx -> {
1004-
final UUID catalogId = getOrInsertActorCatalog(catalog, ctx);
1003+
final UUID catalogId = getOrInsertActorCatalog(catalog, ctx, timestamp);
10051004
ctx.insertInto(ACTOR_CATALOG_FETCH_EVENT)
10061005
.set(ACTOR_CATALOG_FETCH_EVENT.ID, fetchEventID)
10071006
.set(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID, actorId)

airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1104,6 +1104,7 @@ private void writeStandardSync(final List<StandardSync> configs, final DSLContex
11041104
JSONB.valueOf(Jsons.serialize(standardSync.getResourceRequirements())))
11051105
.set(CONNECTION.UPDATED_AT, timestamp)
11061106
.set(CONNECTION.SOURCE_CATALOG_ID, standardSync.getSourceCatalogId())
1107+
.set(CONNECTION.BREAKING_CHANGE, standardSync.getBreakingChange())
11071108
.set(CONNECTION.GEOGRAPHY, Enums.toEnum(standardSync.getGeography().value(),
11081109
io.airbyte.db.instance.configs.jooq.generated.enums.GeographyType.class).orElseThrow())
11091110
.where(CONNECTION.ID.eq(standardSync.getConnectionId()))
@@ -1148,6 +1149,7 @@ private void writeStandardSync(final List<StandardSync> configs, final DSLContex
11481149
.set(CONNECTION.SOURCE_CATALOG_ID, standardSync.getSourceCatalogId())
11491150
.set(CONNECTION.GEOGRAPHY, Enums.toEnum(standardSync.getGeography().value(),
11501151
io.airbyte.db.instance.configs.jooq.generated.enums.GeographyType.class).orElseThrow())
1152+
.set(CONNECTION.BREAKING_CHANGE, standardSync.getBreakingChange())
11511153
.set(CONNECTION.CREATED_AT, timestamp)
11521154
.set(CONNECTION.UPDATED_AT, timestamp)
11531155
.execute();

airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR;
88
import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR_CATALOG;
9+
import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR_CATALOG_FETCH_EVENT;
910
import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR_DEFINITION;
1011
import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR_OAUTH_PARAMETER;
1112
import static io.airbyte.db.instance.configs.jooq.generated.Tables.CONNECTION;
@@ -15,6 +16,7 @@
1516
import io.airbyte.commons.enums.Enums;
1617
import io.airbyte.commons.json.Jsons;
1718
import io.airbyte.config.ActorCatalog;
19+
import io.airbyte.config.ActorCatalogFetchEvent;
1820
import io.airbyte.config.ActorDefinitionResourceRequirements;
1921
import io.airbyte.config.DestinationConnection;
2022
import io.airbyte.config.DestinationOAuthParameter;
@@ -74,6 +76,7 @@ public static StandardSync buildStandardSync(final Record record, final List<UUI
7476
.withResourceRequirements(
7577
Jsons.deserialize(record.get(CONNECTION.RESOURCE_REQUIREMENTS).data(), ResourceRequirements.class))
7678
.withSourceCatalogId(record.get(CONNECTION.SOURCE_CATALOG_ID))
79+
.withBreakingChange(record.get(CONNECTION.BREAKING_CHANGE))
7780
.withGeography(Enums.toEnum(record.get(CONNECTION.GEOGRAPHY, String.class), Geography.class).orElseThrow());
7881
}
7982

@@ -193,6 +196,11 @@ public static ActorCatalog buildActorCatalog(final Record record) {
193196
.withCatalogHash(record.get(ACTOR_CATALOG.CATALOG_HASH));
194197
}
195198

199+
public static ActorCatalogFetchEvent buildActorCatalogFetchEvent(final Record record) {
200+
return new ActorCatalogFetchEvent()
201+
.withActorCatalogId(record.get(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID));
202+
}
203+
196204
public static WorkspaceServiceAccount buildWorkspaceServiceAccount(final Record record) {
197205
return new WorkspaceServiceAccount()
198206
.withWorkspaceId(record.get(WORKSPACE_SERVICE_ACCOUNT.WORKSPACE_ID))

airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package io.airbyte.config.persistence;
66

77
import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR_CATALOG;
8+
import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR_CATALOG_FETCH_EVENT;
89
import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR_DEFINITION_WORKSPACE_GRANT;
910
import static io.airbyte.db.instance.configs.jooq.generated.Tables.CONNECTION_OPERATION;
1011
import static org.assertj.core.api.Assertions.assertThat;
@@ -16,6 +17,8 @@
1617

1718
import io.airbyte.commons.json.Jsons;
1819
import io.airbyte.config.ActorCatalog;
20+
import io.airbyte.config.ActorCatalogFetchEvent;
21+
import io.airbyte.config.ConfigSchema;
1922
import io.airbyte.config.DestinationConnection;
2023
import io.airbyte.config.DestinationOAuthParameter;
2124
import io.airbyte.config.Geography;
@@ -46,6 +49,7 @@
4649
import io.airbyte.validation.json.JsonValidationException;
4750
import java.io.IOException;
4851
import java.sql.SQLException;
52+
import java.time.OffsetDateTime;
4953
import java.util.Collections;
5054
import java.util.List;
5155
import java.util.Map;
@@ -501,4 +505,52 @@ void testGetGeographyForConnection() throws IOException {
501505
assertEquals(expected, actual);
502506
}
503507

508+
@Test
509+
void testGetMostRecentActorCatalogFetchEventForSources() throws SQLException, IOException, JsonValidationException {
510+
for (final ActorCatalog actorCatalog : MockData.actorCatalogs()) {
511+
configPersistence.writeConfig(ConfigSchema.ACTOR_CATALOG, actorCatalog.getId().toString(), actorCatalog);
512+
}
513+
514+
OffsetDateTime now = OffsetDateTime.now();
515+
OffsetDateTime yesterday = now.minusDays(1l);
516+
517+
List<ActorCatalogFetchEvent> fetchEvents = MockData.actorCatalogFetchEventsSameSource();
518+
ActorCatalogFetchEvent fetchEvent1 = fetchEvents.get(0);
519+
ActorCatalogFetchEvent fetchEvent2 = fetchEvents.get(1);
520+
521+
database.transaction(ctx -> {
522+
insertCatalogFetchEvent(
523+
ctx,
524+
fetchEvent1.getActorId(),
525+
fetchEvent1.getActorCatalogId(),
526+
yesterday);
527+
insertCatalogFetchEvent(
528+
ctx,
529+
fetchEvent2.getActorId(),
530+
fetchEvent2.getActorCatalogId(),
531+
now);
532+
533+
return null;
534+
});
535+
536+
Optional<ActorCatalogFetchEvent> result =
537+
configRepository.getMostRecentActorCatalogFetchEventForSource(fetchEvent1.getActorId());
538+
539+
assertEquals(fetchEvent2.getActorCatalogId(), result.get().getActorCatalogId());
540+
}
541+
542+
private void insertCatalogFetchEvent(DSLContext ctx, UUID sourceId, UUID catalogId, OffsetDateTime creationDate) {
543+
ctx.insertInto(ACTOR_CATALOG_FETCH_EVENT)
544+
.columns(
545+
ACTOR_CATALOG_FETCH_EVENT.ID,
546+
ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID,
547+
ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID,
548+
ACTOR_CATALOG_FETCH_EVENT.CONFIG_HASH,
549+
ACTOR_CATALOG_FETCH_EVENT.ACTOR_VERSION,
550+
ACTOR_CATALOG_FETCH_EVENT.CREATED_AT,
551+
ACTOR_CATALOG_FETCH_EVENT.MODIFIED_AT)
552+
.values(UUID.randomUUID(), sourceId, catalogId, "", "", creationDate, creationDate)
553+
.execute();
554+
}
555+
504556
}

airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/MockData.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,8 @@ public static List<StandardSync> standardSyncs() {
475475
.withResourceRequirements(resourceRequirements)
476476
.withStatus(Status.ACTIVE)
477477
.withSchedule(schedule)
478-
.withGeography(Geography.AUTO);
478+
.withGeography(Geography.AUTO)
479+
.withBreakingChange(false);
479480

480481
final StandardSync standardSync2 = new StandardSync()
481482
.withOperationIds(Arrays.asList(OPERATION_ID_1, OPERATION_ID_2))
@@ -491,7 +492,8 @@ public static List<StandardSync> standardSyncs() {
491492
.withResourceRequirements(resourceRequirements)
492493
.withStatus(Status.ACTIVE)
493494
.withSchedule(schedule)
494-
.withGeography(Geography.AUTO);
495+
.withGeography(Geography.AUTO)
496+
.withBreakingChange(false);
495497

496498
final StandardSync standardSync3 = new StandardSync()
497499
.withOperationIds(Arrays.asList(OPERATION_ID_1, OPERATION_ID_2))
@@ -507,7 +509,8 @@ public static List<StandardSync> standardSyncs() {
507509
.withResourceRequirements(resourceRequirements)
508510
.withStatus(Status.ACTIVE)
509511
.withSchedule(schedule)
510-
.withGeography(Geography.AUTO);
512+
.withGeography(Geography.AUTO)
513+
.withBreakingChange(false);
511514

512515
final StandardSync standardSync4 = new StandardSync()
513516
.withOperationIds(Collections.emptyList())
@@ -523,7 +526,8 @@ public static List<StandardSync> standardSyncs() {
523526
.withResourceRequirements(resourceRequirements)
524527
.withStatus(Status.DEPRECATED)
525528
.withSchedule(schedule)
526-
.withGeography(Geography.AUTO);
529+
.withGeography(Geography.AUTO)
530+
.withBreakingChange(false);
527531

528532
final StandardSync standardSync5 = new StandardSync()
529533
.withOperationIds(Arrays.asList(OPERATION_ID_3))
@@ -539,7 +543,8 @@ public static List<StandardSync> standardSyncs() {
539543
.withResourceRequirements(resourceRequirements)
540544
.withStatus(Status.ACTIVE)
541545
.withSchedule(schedule)
542-
.withGeography(Geography.AUTO);
546+
.withGeography(Geography.AUTO)
547+
.withBreakingChange(false);
543548

544549
final StandardSync standardSync6 = new StandardSync()
545550
.withOperationIds(Arrays.asList())
@@ -555,7 +560,8 @@ public static List<StandardSync> standardSyncs() {
555560
.withResourceRequirements(resourceRequirements)
556561
.withStatus(Status.DEPRECATED)
557562
.withSchedule(schedule)
558-
.withGeography(Geography.AUTO);
563+
.withGeography(Geography.AUTO)
564+
.withBreakingChange(false);
559565

560566
return Arrays.asList(standardSync1, standardSync2, standardSync3, standardSync4, standardSync5, standardSync6);
561567
}
@@ -621,6 +627,22 @@ public static List<ActorCatalogFetchEvent> actorCatalogFetchEvents() {
621627
return Arrays.asList(actorCatalogFetchEvent1, actorCatalogFetchEvent2);
622628
}
623629

630+
public static List<ActorCatalogFetchEvent> actorCatalogFetchEventsSameSource() {
631+
final ActorCatalogFetchEvent actorCatalogFetchEvent1 = new ActorCatalogFetchEvent()
632+
.withId(ACTOR_CATALOG_FETCH_EVENT_ID_1)
633+
.withActorCatalogId(ACTOR_CATALOG_ID_1)
634+
.withActorId(SOURCE_ID_1)
635+
.withConfigHash("CONFIG_HASH")
636+
.withConnectorVersion("1.0.0");
637+
final ActorCatalogFetchEvent actorCatalogFetchEvent2 = new ActorCatalogFetchEvent()
638+
.withId(ACTOR_CATALOG_FETCH_EVENT_ID_2)
639+
.withActorCatalogId(ACTOR_CATALOG_ID_2)
640+
.withActorId(SOURCE_ID_1)
641+
.withConfigHash("1394")
642+
.withConnectorVersion("1.2.0");
643+
return Arrays.asList(actorCatalogFetchEvent1, actorCatalogFetchEvent2);
644+
}
645+
624646
public static List<WorkspaceServiceAccount> workspaceServiceAccounts() {
625647
final WorkspaceServiceAccount workspaceServiceAccount = new WorkspaceServiceAccount()
626648
.withWorkspaceId(WORKSPACE_ID_1)

airbyte-server/src/main/java/io/airbyte/server/converters/ApiPojoConverters.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ public static ConnectionRead internalToConnectionRead(final StandardSync standar
9393
.prefix(standardSync.getPrefix())
9494
.syncCatalog(CatalogConverter.toApi(standardSync.getCatalog()))
9595
.sourceCatalogId(standardSync.getSourceCatalogId())
96+
.breakingChange(standardSync.getBreakingChange())
9697
.geography(Enums.convertTo(standardSync.getGeography(), Geography.class));
9798

9899
if (standardSync.getResourceRequirements() != null) {

airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ public ConnectionRead createConnection(final ConnectionCreate connectionCreate)
141141
.withOperationIds(operationIds)
142142
.withStatus(ApiPojoConverters.toPersistenceStatus(connectionCreate.getStatus()))
143143
.withSourceCatalogId(connectionCreate.getSourceCatalogId())
144-
.withGeography(getGeographyFromConnectionCreateOrWorkspace(connectionCreate));
144+
.withGeography(getGeographyFromConnectionCreateOrWorkspace(connectionCreate))
145+
.withBreakingChange(false);
145146
if (connectionCreate.getResourceRequirements() != null) {
146147
standardSync.withResourceRequirements(ApiPojoConverters.resourceRequirementsToInternal(connectionCreate.getResourceRequirements()));
147148
}

0 commit comments

Comments
 (0)