Skip to content

Commit 1a0ea82

Browse files
authored
Change ConnectionUpdate to PATCH behavior (#16157)
* remove operationIds from WebBackendConnectionUpdate, just use operations * refactor connection updates to patch-style update, where null fields remain unchanged * better comment and arg name * format * make sure we are still 'dual-writing' to the old schedule column, even when the patch doesn't specify anything for it * update acceptance test to update with new schedule syntax * add catalog sorting to preserve stream order during patch, and more tests * format * add description, throw runtime exception for impossible branches, move streamReset to private helper * PR suggestions * add nested test classes and write a test for the catalog sorting method * format * add comment clarifying that the catalog sort is for UX, and isn't critical * format * format * update acceptance tests to send proper catalog patches instead of whole new catalog * format * format * simplify catalog patching - now, if a catalog is present on the request, replace the entire catalog with it. Otherwise, if catalog on the request is null, leave the catalog unchanged * format * format * Revert "update acceptance tests to send proper catalog patches instead of whole new catalog" This reverts commit 71922648b4e070f46ff6c468813b7ab8dd9d6651. * adjust description
1 parent ccd453f commit 1a0ea82

File tree

13 files changed

+767
-394
lines changed

13 files changed

+767
-394
lines changed

airbyte-api/src/main/openapi/config.yaml

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1294,6 +1294,11 @@ paths:
12941294
tags:
12951295
- connection
12961296
summary: Update a connection
1297+
description: |
1298+
Apply a patch-style update to a connection. Only fields present on the update request body will be updated.
1299+
Note that if a catalog is present in the request body, the connection's entire catalog will be replaced
1300+
with the catalog from the request. This means that to modify a single stream, the entire new catalog
1301+
containing the updated stream needs to be sent.
12971302
operationId: updateConnection
12981303
requestBody:
12991304
content:
@@ -1985,6 +1990,14 @@ paths:
19851990
tags:
19861991
- web_backend
19871992
summary: Update a connection
1993+
description: |
1994+
Apply a patch-style update to a connection. Only fields present on the update request body will be updated.
1995+
Any operations that lack an ID will be created. Then, the newly created operationId will be applied to the
1996+
connection along with the rest of the operationIds in the request body.
1997+
Apply a patch-style update to a connection. Only fields present on the update request body will be updated.
1998+
Note that if a catalog is present in the request body, the connection's entire catalog will be replaced
1999+
with the catalog from the request. This means that to modify a single stream, the entire new catalog
2000+
containing the updated stream needs to be sent.
19882001
operationId: webBackendUpdateConnection
19892002
requestBody:
19902003
content:
@@ -3248,10 +3261,9 @@ components:
32483261
$ref: "#/components/schemas/ConnectionState"
32493262
ConnectionUpdate:
32503263
type: object
3264+
description: Used to apply a patch-style update to a connection, which means that null properties remain unchanged
32513265
required:
32523266
- connectionId
3253-
- syncCatalog
3254-
- status
32553267
properties:
32563268
connectionId:
32573269
$ref: "#/components/schemas/ConnectionId"
@@ -3289,10 +3301,9 @@ components:
32893301
format: uuid
32903302
WebBackendConnectionUpdate:
32913303
type: object
3304+
description: Used to apply a patch-style update to a connection, which means that null properties remain unchanged
32923305
required:
32933306
- connectionId
3294-
- syncCatalog
3295-
- status
32963307
properties:
32973308
name:
32983309
type: string
@@ -3309,10 +3320,6 @@ components:
33093320
prefix:
33103321
type: string
33113322
description: Prefix that will be prepended to the name of each stream when it is written to the destination.
3312-
operationIds:
3313-
type: array
3314-
items:
3315-
$ref: "#/components/schemas/OperationId"
33163323
syncCatalog:
33173324
$ref: "#/components/schemas/AirbyteCatalog"
33183325
schedule:

airbyte-server/src/main/java/io/airbyte/server/converters/ApiPojoConverters.java

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,15 @@
1212
import io.airbyte.api.model.generated.ConnectionScheduleDataBasicSchedule;
1313
import io.airbyte.api.model.generated.ConnectionScheduleDataCron;
1414
import io.airbyte.api.model.generated.ConnectionStatus;
15-
import io.airbyte.api.model.generated.ConnectionUpdate;
1615
import io.airbyte.api.model.generated.JobType;
1716
import io.airbyte.api.model.generated.JobTypeResourceLimit;
1817
import io.airbyte.api.model.generated.ResourceRequirements;
1918
import io.airbyte.commons.enums.Enums;
2019
import io.airbyte.config.BasicSchedule;
21-
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
2220
import io.airbyte.config.Schedule;
2321
import io.airbyte.config.StandardSync;
2422
import io.airbyte.config.StandardSync.ScheduleType;
2523
import io.airbyte.server.handlers.helpers.CatalogConverter;
26-
import io.airbyte.server.handlers.helpers.ConnectionScheduleHelper;
27-
import io.airbyte.validation.json.JsonValidationException;
2824
import java.util.stream.Collectors;
2925

3026
public class ApiPojoConverters {
@@ -85,41 +81,6 @@ public static ResourceRequirements resourceRequirementsToApi(final io.airbyte.co
8581
.memoryLimit(resourceReqs.getMemoryLimit());
8682
}
8783

88-
public static io.airbyte.config.StandardSync connectionUpdateToInternal(final ConnectionUpdate update) throws JsonValidationException {
89-
90-
final StandardSync newConnection = new StandardSync()
91-
.withNamespaceDefinition(Enums.convertTo(update.getNamespaceDefinition(), NamespaceDefinitionType.class))
92-
.withNamespaceFormat(update.getNamespaceFormat())
93-
.withPrefix(update.getPrefix())
94-
.withOperationIds(update.getOperationIds())
95-
.withCatalog(CatalogConverter.toProtocol(update.getSyncCatalog()))
96-
.withStatus(toPersistenceStatus(update.getStatus()))
97-
.withSourceCatalogId(update.getSourceCatalogId());
98-
99-
if (update.getName() != null) {
100-
newConnection.withName(update.getName());
101-
}
102-
103-
// update Resource Requirements
104-
if (update.getResourceRequirements() != null) {
105-
newConnection.withResourceRequirements(resourceRequirementsToInternal(update.getResourceRequirements()));
106-
}
107-
108-
// update sync schedule
109-
if (update.getScheduleType() != null) {
110-
ConnectionScheduleHelper.populateSyncFromScheduleTypeAndData(newConnection, update.getScheduleType(), update.getScheduleData());
111-
} else if (update.getSchedule() != null) {
112-
final Schedule newSchedule = new Schedule()
113-
.withTimeUnit(toPersistenceTimeUnit(update.getSchedule().getTimeUnit()))
114-
.withUnits(update.getSchedule().getUnits());
115-
newConnection.withManual(false).withSchedule(newSchedule);
116-
} else {
117-
newConnection.withManual(true).withSchedule(null);
118-
}
119-
120-
return newConnection;
121-
}
122-
12384
public static ConnectionRead internalToConnectionRead(final StandardSync standardSync) {
12485
final ConnectionRead connectionRead = new ConnectionRead()
12586
.connectionId(standardSync.getConnectionId())

airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java

Lines changed: 114 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package io.airbyte.server.handlers;
66

77
import com.google.common.annotations.VisibleForTesting;
8+
import com.google.common.base.Preconditions;
89
import com.google.common.collect.ImmutableMap;
910
import com.google.common.collect.ImmutableMap.Builder;
1011
import com.google.common.collect.Lists;
@@ -116,7 +117,7 @@ public ConnectionRead createConnection(final ConnectionCreate connectionCreate)
116117
ConnectionHelper.validateWorkspace(workspaceHelper,
117118
connectionCreate.getSourceId(),
118119
connectionCreate.getDestinationId(),
119-
new HashSet<>(operationIds));
120+
operationIds);
120121

121122
final UUID connectionId = uuidGenerator.get();
122123

@@ -229,26 +230,123 @@ private Builder<String, Object> generateMetadata(final StandardSync standardSync
229230
return metadata;
230231
}
231232

232-
public ConnectionRead updateConnection(final ConnectionUpdate connectionUpdate)
233+
public ConnectionRead updateConnection(final ConnectionUpdate connectionPatch)
233234
throws ConfigNotFoundException, IOException, JsonValidationException {
234-
// retrieve and update sync
235-
final StandardSync persistedSync = configRepository.getStandardSync(connectionUpdate.getConnectionId());
236235

237-
final StandardSync newConnection = ConnectionHelper.updateConnectionObject(
238-
workspaceHelper,
239-
persistedSync,
240-
ApiPojoConverters.connectionUpdateToInternal(connectionUpdate));
241-
ConnectionHelper.validateWorkspace(
242-
workspaceHelper,
243-
persistedSync.getSourceId(),
244-
persistedSync.getDestinationId(),
245-
new HashSet<>(connectionUpdate.getOperationIds()));
236+
final UUID connectionId = connectionPatch.getConnectionId();
237+
238+
LOGGER.debug("Starting updateConnection for connectionId {}...", connectionId);
239+
LOGGER.debug("incoming connectionPatch: {}", connectionPatch);
240+
241+
final StandardSync sync = configRepository.getStandardSync(connectionId);
242+
LOGGER.debug("initial StandardSync: {}", sync);
243+
244+
validateConnectionPatch(workspaceHelper, sync, connectionPatch);
245+
246+
final ConnectionRead initialConnectionRead = ApiPojoConverters.internalToConnectionRead(sync);
247+
LOGGER.debug("initial ConnectionRead: {}", initialConnectionRead);
248+
249+
applyPatchToStandardSync(sync, connectionPatch);
250+
251+
LOGGER.debug("patched StandardSync before persisting: {}", sync);
252+
configRepository.writeStandardSync(sync);
253+
254+
eventRunner.update(connectionId);
255+
256+
final ConnectionRead updatedRead = buildConnectionRead(connectionId);
257+
LOGGER.debug("final connectionRead: {}", updatedRead);
258+
259+
return updatedRead;
260+
}
261+
262+
/**
263+
* Modifies the given StandardSync by applying changes from a partially-filled ConnectionUpdate
264+
* patch. Any fields that are null in the patch will be left unchanged.
265+
*/
266+
private static void applyPatchToStandardSync(final StandardSync sync, final ConnectionUpdate patch) throws JsonValidationException {
267+
// update the sync's schedule using the patch's scheduleType and scheduleData. validations occur in
268+
// the helper to ensure both fields
269+
// make sense together.
270+
if (patch.getScheduleType() != null) {
271+
ConnectionScheduleHelper.populateSyncFromScheduleTypeAndData(sync, patch.getScheduleType(), patch.getScheduleData());
272+
}
273+
274+
// the rest of the fields are straightforward to patch. If present in the patch, set the field to
275+
// the value
276+
// in the patch. Otherwise, leave the field unchanged.
277+
278+
if (patch.getSyncCatalog() != null) {
279+
sync.setCatalog(CatalogConverter.toProtocol(patch.getSyncCatalog()));
280+
}
246281

247-
configRepository.writeStandardSync(newConnection);
282+
if (patch.getName() != null) {
283+
sync.setName(patch.getName());
284+
}
285+
286+
if (patch.getNamespaceDefinition() != null) {
287+
sync.setNamespaceDefinition(Enums.convertTo(patch.getNamespaceDefinition(), NamespaceDefinitionType.class));
288+
}
289+
290+
if (patch.getNamespaceFormat() != null) {
291+
sync.setNamespaceFormat(patch.getNamespaceFormat());
292+
}
293+
294+
if (patch.getPrefix() != null) {
295+
sync.setPrefix(patch.getPrefix());
296+
}
297+
298+
if (patch.getOperationIds() != null) {
299+
sync.setOperationIds(patch.getOperationIds());
300+
}
301+
302+
if (patch.getStatus() != null) {
303+
sync.setStatus(ApiPojoConverters.toPersistenceStatus(patch.getStatus()));
304+
}
305+
306+
if (patch.getSourceCatalogId() != null) {
307+
sync.setSourceCatalogId(patch.getSourceCatalogId());
308+
}
309+
310+
if (patch.getResourceRequirements() != null) {
311+
sync.setResourceRequirements(ApiPojoConverters.resourceRequirementsToInternal(patch.getResourceRequirements()));
312+
}
313+
}
248314

249-
eventRunner.update(connectionUpdate.getConnectionId());
315+
private void validateConnectionPatch(final WorkspaceHelper workspaceHelper, final StandardSync persistedSync, final ConnectionUpdate patch) {
316+
// sanity check that we're updating the right connection
317+
Preconditions.checkArgument(persistedSync.getConnectionId().equals(patch.getConnectionId()));
250318

251-
return buildConnectionRead(connectionUpdate.getConnectionId());
319+
// make sure all operationIds belong to the same workspace as the connection
320+
ConnectionHelper.validateWorkspace(
321+
workspaceHelper, persistedSync.getSourceId(), persistedSync.getDestinationId(), patch.getOperationIds());
322+
323+
// make sure the incoming schedule update is sensible. Note that schedule details are further
324+
// validated in ConnectionScheduleHelper, this just
325+
// sanity checks that fields are populated when they should be.
326+
Preconditions.checkArgument(
327+
patch.getSchedule() == null,
328+
"ConnectionUpdate should only make changes to the schedule by setting scheduleType and scheduleData. 'schedule' is no longer supported.");
329+
330+
if (patch.getScheduleType() == null) {
331+
Preconditions.checkArgument(
332+
patch.getScheduleData() == null,
333+
"ConnectionUpdate should not include any scheduleData without also specifying a valid scheduleType.");
334+
} else {
335+
switch (patch.getScheduleType()) {
336+
case MANUAL -> Preconditions.checkArgument(
337+
patch.getScheduleData() == null,
338+
"ConnectionUpdate should not include any scheduleData when setting the Connection scheduleType to MANUAL.");
339+
case BASIC -> Preconditions.checkArgument(
340+
patch.getScheduleData() != null,
341+
"ConnectionUpdate should include scheduleData when setting the Connection scheduleType to BASIC.");
342+
case CRON -> Preconditions.checkArgument(
343+
patch.getScheduleData() != null,
344+
"ConnectionUpdate should include scheduleData when setting the Connection scheduleType to CRON.");
345+
346+
// shouldn't be possible to reach this case
347+
default -> throw new RuntimeException("Unrecognized scheduleType!");
348+
}
349+
}
252350
}
253351

254352
public ConnectionReadList listConnectionsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody)

0 commit comments

Comments
 (0)