Skip to content

Commit dda50e1

Browse files
Update the sourceCatalogId field when the schema is updated (#12505)
* Set SourceCatalogId during connectionUpdate operation * Return catalogId when get a connection * Fix db operation of standardSync.sourceCatalogId - value is not set correctly during update operation - value is not read * UI modification to set the sourceCatalogId * remove sourceCatalogId from diff computation Co-authored-by: alafanechere <[email protected]>
1 parent 43470a2 commit dda50e1

File tree

13 files changed

+84
-35
lines changed

13 files changed

+84
-35
lines changed

airbyte-api/src/main/openapi/config.yaml

+12
Original file line numberDiff line numberDiff line change
@@ -3218,6 +3218,9 @@ components:
32183218
$ref: "#/components/schemas/ConnectionStatus"
32193219
resourceRequirements:
32203220
$ref: "#/components/schemas/ResourceRequirements"
3221+
sourceCatalogId:
3222+
type: string
3223+
format: uuid
32213224
WebBackendConnectionUpdate:
32223225
type: object
32233226
required:
@@ -3258,6 +3261,9 @@ components:
32583261
type: array
32593262
items:
32603263
$ref: "#/components/schemas/WebBackendOperationCreateOrUpdate"
3264+
sourceCatalogId:
3265+
type: string
3266+
format: uuid
32613267
ConnectionRead:
32623268
type: object
32633269
required:
@@ -3298,6 +3304,9 @@ components:
32983304
$ref: "#/components/schemas/ConnectionStatus"
32993305
resourceRequirements:
33003306
$ref: "#/components/schemas/ResourceRequirements"
3307+
sourceCatalogId:
3308+
type: string
3309+
format: uuid
33013310
ConnectionSearch:
33023311
type: object
33033312
properties:
@@ -4363,6 +4372,9 @@ components:
43634372
type: boolean
43644373
resourceRequirements:
43654374
$ref: "#/components/schemas/ResourceRequirements"
4375+
catalogId:
4376+
type: string
4377+
format: uuid
43664378
WebBackendConnectionReadList:
43674379
type: object
43684380
required:

airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java

+1
Original file line numberDiff line numberDiff line change
@@ -1093,6 +1093,7 @@ private void writeStandardSync(final List<StandardSync> configs, final DSLContex
10931093
.set(CONNECTION.MANUAL, standardSync.getManual())
10941094
.set(CONNECTION.RESOURCE_REQUIREMENTS, JSONB.valueOf(Jsons.serialize(standardSync.getResourceRequirements())))
10951095
.set(CONNECTION.UPDATED_AT, timestamp)
1096+
.set(CONNECTION.SOURCE_CATALOG_ID, standardSync.getSourceCatalogId())
10961097
.where(CONNECTION.ID.eq(standardSync.getConnectionId()))
10971098
.execute();
10981099

airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ public static StandardSync buildStandardSync(final Record record, final List<UUI
5656
.withSchedule(Jsons.deserialize(record.get(CONNECTION.SCHEDULE).data(), Schedule.class))
5757
.withManual(record.get(CONNECTION.MANUAL))
5858
.withOperationIds(connectionOperationId)
59-
.withResourceRequirements(Jsons.deserialize(record.get(CONNECTION.RESOURCE_REQUIREMENTS).data(), ResourceRequirements.class));
59+
.withResourceRequirements(Jsons.deserialize(record.get(CONNECTION.RESOURCE_REQUIREMENTS).data(), ResourceRequirements.class))
60+
.withSourceCatalogId(record.get(CONNECTION.SOURCE_CATALOG_ID));
6061
}
6162

6263
public static StandardWorkspace buildStandardWorkspace(final Record record) {

airbyte-server/src/main/java/io/airbyte/server/converters/ApiPojoConverters.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ public static io.airbyte.config.StandardSync connectionUpdateToInternal(final Co
8585
.withPrefix(update.getPrefix())
8686
.withOperationIds(update.getOperationIds())
8787
.withCatalog(CatalogConverter.toProtocol(update.getSyncCatalog()))
88-
.withStatus(toPersistenceStatus(update.getStatus()));
88+
.withStatus(toPersistenceStatus(update.getStatus()))
89+
.withSourceCatalogId(update.getSourceCatalogId());
8990

9091
if (update.getName() != null) {
9192
newConnection.withName(update.getName());

airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java

+3
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ private WebBackendConnectionRead buildWebBackendConnectionRead(final ConnectionR
120120
final Predicate<JobRead> hasRunningJob = (JobRead job) -> !TERMINAL_STATUSES.contains(job.getStatus());
121121
WebBackendConnectionRead.setIsSyncing(syncJobReadList.getJobs().stream().map(JobWithAttemptsRead::getJob).anyMatch(hasRunningJob));
122122
setLatestSyncJobProperties(WebBackendConnectionRead, syncJobReadList);
123+
WebBackendConnectionRead.setCatalogId(connectionRead.getSourceCatalogId());
123124
return WebBackendConnectionRead;
124125
}
125126

@@ -206,6 +207,7 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti
206207
final AirbyteCatalog discovered = discoverSchema.getCatalog();
207208
final AirbyteCatalog combined = updateSchemaWithDiscovery(original, discovered);
208209

210+
connection.setSourceCatalogId(discoverSchema.getCatalogId());
209211
connection.setSyncCatalog(combined);
210212
}
211213

@@ -399,6 +401,7 @@ protected static ConnectionUpdate toConnectionUpdate(final WebBackendConnectionU
399401
connectionUpdate.schedule(webBackendConnectionUpdate.getSchedule());
400402
connectionUpdate.status(webBackendConnectionUpdate.getStatus());
401403
connectionUpdate.resourceRequirements(webBackendConnectionUpdate.getResourceRequirements());
404+
connectionUpdate.sourceCatalogId(webBackendConnectionUpdate.getSourceCatalogId());
402405

403406
return connectionUpdate;
404407
}

airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,8 @@ void testUpdateConnection(final boolean useNewScheduler) throws JsonValidationEx
321321
catalog.getStreams().get(0).getStream().setName("azkaban_users");
322322
catalog.getStreams().get(0).getConfig().setAliasName("azkaban_users");
323323

324+
final UUID newSourceCatalogId = UUID.randomUUID();
325+
324326
final ConnectionUpdate connectionUpdate = new ConnectionUpdate()
325327
.namespaceDefinition(Enums.convertTo(standardSync.getNamespaceDefinition(), NamespaceDefinitionType.class))
326328
.namespaceFormat(standardSync.getNamespaceFormat())
@@ -335,7 +337,8 @@ void testUpdateConnection(final boolean useNewScheduler) throws JsonValidationEx
335337
.cpuLimit(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS.getCpuLimit())
336338
.cpuRequest(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS.getCpuRequest())
337339
.memoryLimit(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS.getMemoryLimit())
338-
.memoryRequest(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS.getMemoryRequest()));
340+
.memoryRequest(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS.getMemoryRequest()))
341+
.sourceCatalogId(newSourceCatalogId);
339342

340343
final ConfiguredAirbyteCatalog configuredCatalog = ConnectionHelpers.generateBasicConfiguredAirbyteCatalog();
341344
configuredCatalog.getStreams().get(0).getStream().withName("azkaban_users");
@@ -352,7 +355,8 @@ void testUpdateConnection(final boolean useNewScheduler) throws JsonValidationEx
352355
.withStatus(StandardSync.Status.INACTIVE)
353356
.withCatalog(configuredCatalog)
354357
.withManual(true)
355-
.withResourceRequirements(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS);
358+
.withResourceRequirements(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS)
359+
.withSourceCatalogId(newSourceCatalogId);
356360

357361
when(configRepository.getStandardSync(standardSync.getConnectionId()))
358362
.thenReturn(standardSync)

airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,7 @@ public void testForConnectionCreateCompleteness() {
482482
public void testForConnectionUpdateCompleteness() {
483483
final Set<String> handledMethods =
484484
Set.of("schedule", "connectionId", "syncCatalog", "namespaceDefinition", "namespaceFormat", "prefix", "status", "operationIds",
485-
"resourceRequirements", "name");
485+
"resourceRequirements", "name", "sourceCatalogId");
486486

487487
final Set<String> methods = Arrays.stream(ConnectionUpdate.class.getMethods())
488488
.filter(method -> method.getReturnType() == ConnectionUpdate.class)
@@ -506,7 +506,8 @@ void testUpdateConnection() throws JsonValidationException, ConfigNotFoundExcept
506506
.connectionId(expected.getConnectionId())
507507
.schedule(expected.getSchedule())
508508
.status(expected.getStatus())
509-
.syncCatalog(expected.getSyncCatalog());
509+
.syncCatalog(expected.getSyncCatalog())
510+
.sourceCatalogId(expected.getCatalogId());
510511

511512
when(connectionsHandler.getConnection(expected.getConnectionId())).thenReturn(
512513
new ConnectionRead().connectionId(expected.getConnectionId()));

airbyte-webapp/src/core/domain/connection/types.ts

+1
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,5 @@ export interface Connection {
5656
source: Source;
5757
destination: Destination;
5858
operations: Operation[];
59+
catalogId: string;
5960
}

airbyte-webapp/src/hooks/services/useConnectionHook.tsx

+1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ type UpdateConnection = {
5757
schedule?: ScheduleProperties | null;
5858
operations?: Operation[];
5959
withRefreshedCatalog?: boolean;
60+
sourceCatalogId?: string;
6061
};
6162

6263
export type ListConnection = { connections: Connection[] };

airbyte-webapp/src/pages/ConnectionPage/pages/ConnectionItemPage/components/ReplicationView.tsx

+1
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ export const ReplicationView: React.FC<ReplicationViewProps> = ({ onAfterSaveSch
8282
connectionId,
8383
status: initialConnection.status || "",
8484
withRefreshedCatalog: activeUpdatingSchemaMode,
85+
sourceCatalogId: connection?.catalogId,
8586
});
8687

8788
setSaved(true);

airbyte-workers/src/main/java/io/airbyte/workers/helper/ConnectionHelper.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ public static StandardSync updateConnectionObject(final WorkspaceHelper workspac
6868
.withPrefix(update.getPrefix())
6969
.withOperationIds(update.getOperationIds())
7070
.withCatalog(update.getCatalog())
71-
.withStatus(update.getStatus());
71+
.withStatus(update.getStatus())
72+
.withSourceCatalogId(update.getSourceCatalogId());
7273

7374
// update name
7475
if (update.getName() != null) {

0 commit comments

Comments
 (0)