Skip to content

Commit 3b85a74

Browse files
Dual write old and new schedule schemas (#15039)
* dual write old and new schedule schemas * validate that the old and new schedule types match
1 parent e7b9e00 commit 3b85a74

File tree

7 files changed

+87
-0
lines changed

7 files changed

+87
-0
lines changed

airbyte-config/config-models/src/main/java/io/airbyte/config/helpers/ScheduleHelpers.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
import io.airbyte.config.BasicSchedule;
88
import io.airbyte.config.Schedule;
9+
import io.airbyte.config.StandardSync;
10+
import io.airbyte.config.StandardSync.ScheduleType;
911
import java.util.concurrent.TimeUnit;
1012

1113
@SuppressWarnings("PMD.AvoidThrowingRawExceptionTypes")
@@ -53,4 +55,12 @@ public static Long getIntervalInSecond(final BasicSchedule schedule) {
5355
return getSecondsInUnit(schedule.getTimeUnit()) * schedule.getUnits();
5456
}
5557

58+
public static boolean isScheduleTypeMismatch(final StandardSync standardSync) {
59+
if (standardSync.getScheduleType() == null) {
60+
return false;
61+
}
62+
return (standardSync.getManual() && standardSync.getScheduleType() != ScheduleType.MANUAL) || (!standardSync.getManual()
63+
&& standardSync.getScheduleType() == ScheduleType.MANUAL);
64+
}
65+
5666
}

airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import io.airbyte.config.StandardWorkspace;
4848
import io.airbyte.config.State;
4949
import io.airbyte.config.WorkspaceServiceAccount;
50+
import io.airbyte.config.helpers.ScheduleHelpers;
5051
import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor;
5152
import io.airbyte.db.Database;
5253
import io.airbyte.db.ExceptionWrappingDatabase;
@@ -617,6 +618,9 @@ private List<ConfigWithMetadata<StandardSync>> listStandardSyncWithMetadata(fina
617618
final List<ConfigWithMetadata<StandardSync>> standardSyncs = new ArrayList<>();
618619
for (final Record record : result) {
619620
final StandardSync standardSync = DbConverter.buildStandardSync(record, connectionOperationIds(record.get(CONNECTION.ID)));
621+
if (ScheduleHelpers.isScheduleTypeMismatch(standardSync)) {
622+
throw new RuntimeException("unexpected schedule type mismatch");
623+
}
620624
standardSyncs.add(new ConfigWithMetadata<>(
621625
record.get(CONNECTION.ID).toString(),
622626
ConfigSchema.STANDARD_SYNC.name(),
@@ -1080,6 +1084,10 @@ private void writeStandardSync(final List<StandardSync> configs, final DSLContex
10801084
.from(CONNECTION)
10811085
.where(CONNECTION.ID.eq(standardSync.getConnectionId())));
10821086

1087+
if (ScheduleHelpers.isScheduleTypeMismatch(standardSync)) {
1088+
throw new RuntimeException("unexpected schedule type mismatch");
1089+
}
1090+
10831091
if (isExistingConfig) {
10841092
ctx.update(CONNECTION)
10851093
.set(CONNECTION.ID, standardSync.getConnectionId())
@@ -1096,6 +1104,11 @@ private void writeStandardSync(final List<StandardSync> configs, final DSLContex
10961104
io.airbyte.db.instance.configs.jooq.generated.enums.StatusType.class).orElseThrow())
10971105
.set(CONNECTION.SCHEDULE, JSONB.valueOf(Jsons.serialize(standardSync.getSchedule())))
10981106
.set(CONNECTION.MANUAL, standardSync.getManual())
1107+
.set(CONNECTION.SCHEDULE_TYPE,
1108+
standardSync.getScheduleType() == null ? null
1109+
: Enums.toEnum(standardSync.getScheduleType().value(), io.airbyte.db.instance.configs.jooq.generated.enums.ScheduleType.class)
1110+
.orElseThrow())
1111+
.set(CONNECTION.SCHEDULE_DATA, JSONB.valueOf(Jsons.serialize(standardSync.getScheduleData())))
10991112
.set(CONNECTION.RESOURCE_REQUIREMENTS, JSONB.valueOf(Jsons.serialize(standardSync.getResourceRequirements())))
11001113
.set(CONNECTION.UPDATED_AT, timestamp)
11011114
.set(CONNECTION.SOURCE_CATALOG_ID, standardSync.getSourceCatalogId())
@@ -1130,6 +1143,11 @@ private void writeStandardSync(final List<StandardSync> configs, final DSLContex
11301143
io.airbyte.db.instance.configs.jooq.generated.enums.StatusType.class).orElseThrow())
11311144
.set(CONNECTION.SCHEDULE, JSONB.valueOf(Jsons.serialize(standardSync.getSchedule())))
11321145
.set(CONNECTION.MANUAL, standardSync.getManual())
1146+
.set(CONNECTION.SCHEDULE_TYPE,
1147+
standardSync.getScheduleType() == null ? null
1148+
: Enums.toEnum(standardSync.getScheduleType().value(), io.airbyte.db.instance.configs.jooq.generated.enums.ScheduleType.class)
1149+
.orElseThrow())
1150+
.set(CONNECTION.SCHEDULE_DATA, JSONB.valueOf(Jsons.serialize(standardSync.getScheduleData())))
11331151
.set(CONNECTION.RESOURCE_REQUIREMENTS, JSONB.valueOf(Jsons.serialize(standardSync.getResourceRequirements())))
11341152
.set(CONNECTION.SOURCE_CATALOG_ID, standardSync.getSourceCatalogId())
11351153
.set(CONNECTION.CREATED_AT, timestamp)

airbyte-server/src/main/java/io/airbyte/server/converters/ApiPojoConverters.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import io.airbyte.api.model.generated.JobTypeResourceLimit;
1414
import io.airbyte.api.model.generated.ResourceRequirements;
1515
import io.airbyte.commons.enums.Enums;
16+
import io.airbyte.config.BasicSchedule;
1617
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
1718
import io.airbyte.config.Schedule;
1819
import io.airbyte.config.StandardSync;
@@ -164,4 +165,8 @@ public static Schedule.TimeUnit toPersistenceTimeUnit(final ConnectionSchedule.T
164165
return Enums.convertTo(apiTimeUnit, Schedule.TimeUnit.class);
165166
}
166167

168+
public static BasicSchedule.TimeUnit toBasicScheduleTimeUnit(final ConnectionSchedule.TimeUnitEnum apiTimeUnit) {
169+
return Enums.convertTo(apiTimeUnit, BasicSchedule.TimeUnit.class);
170+
}
171+
167172
}

airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,16 @@
2626
import io.airbyte.commons.enums.Enums;
2727
import io.airbyte.commons.json.Jsons;
2828
import io.airbyte.config.ActorCatalog;
29+
import io.airbyte.config.BasicSchedule;
2930
import io.airbyte.config.DestinationConnection;
3031
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
3132
import io.airbyte.config.Schedule;
33+
import io.airbyte.config.ScheduleData;
3234
import io.airbyte.config.SourceConnection;
3335
import io.airbyte.config.StandardDestinationDefinition;
3436
import io.airbyte.config.StandardSourceDefinition;
3537
import io.airbyte.config.StandardSync;
38+
import io.airbyte.config.StandardSync.ScheduleType;
3639
import io.airbyte.config.helpers.ScheduleHelpers;
3740
import io.airbyte.config.persistence.ConfigNotFoundException;
3841
import io.airbyte.config.persistence.ConfigRepository;
@@ -141,11 +144,20 @@ public ConnectionRead createConnection(final ConnectionCreate connectionCreate)
141144
final Schedule schedule = new Schedule()
142145
.withTimeUnit(ApiPojoConverters.toPersistenceTimeUnit(connectionCreate.getSchedule().getTimeUnit()))
143146
.withUnits(connectionCreate.getSchedule().getUnits());
147+
// Populate the legacy field.
148+
// TODO(https://github.com/airbytehq/airbyte/issues/11432): remove.
144149
standardSync
145150
.withManual(false)
146151
.withSchedule(schedule);
152+
// Also write into the new field. This one will be consumed if populated.
153+
standardSync
154+
.withScheduleType(ScheduleType.BASIC_SCHEDULE);
155+
standardSync.withScheduleData(new ScheduleData().withBasicSchedule(
156+
new BasicSchedule().withTimeUnit(ApiPojoConverters.toBasicScheduleTimeUnit(connectionCreate.getSchedule().getTimeUnit()))
157+
.withUnits(connectionCreate.getSchedule().getUnits())));
147158
} else {
148159
standardSync.withManual(true);
160+
standardSync.withScheduleType(ScheduleType.MANUAL);
149161
}
150162

151163
configRepository.writeStandardSync(standardSync);

airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import io.airbyte.config.StandardDestinationDefinition;
4545
import io.airbyte.config.StandardSourceDefinition;
4646
import io.airbyte.config.StandardSync;
47+
import io.airbyte.config.StandardSync.ScheduleType;
4748
import io.airbyte.config.persistence.ConfigNotFoundException;
4849
import io.airbyte.config.persistence.ConfigRepository;
4950
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
@@ -116,6 +117,8 @@ void setUp() throws IOException, JsonValidationException, ConfigNotFoundExceptio
116117
.withOperationIds(List.of(operationId))
117118
.withManual(false)
118119
.withSchedule(ConnectionHelpers.generateBasicSchedule())
120+
.withScheduleType(ScheduleType.BASIC_SCHEDULE)
121+
.withScheduleData(ConnectionHelpers.generateBasicScheduleData())
119122
.withResourceRequirements(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS)
120123
.withSourceCatalogId(UUID.randomUUID());
121124
standardSyncDeleted = new StandardSync()
@@ -328,6 +331,7 @@ void testUpdateConnection() throws JsonValidationException, ConfigNotFoundExcept
328331
.withStatus(StandardSync.Status.INACTIVE)
329332
.withCatalog(configuredCatalog)
330333
.withManual(true)
334+
.withScheduleType(ScheduleType.MANUAL)
331335
.withResourceRequirements(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS)
332336
.withSourceCatalogId(newSourceCatalogId);
333337

airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717
import io.airbyte.api.model.generated.ResourceRequirements;
1818
import io.airbyte.api.model.generated.SyncMode;
1919
import io.airbyte.commons.text.Names;
20+
import io.airbyte.config.BasicSchedule;
2021
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
2122
import io.airbyte.config.Schedule;
2223
import io.airbyte.config.Schedule.TimeUnit;
24+
import io.airbyte.config.ScheduleData;
2325
import io.airbyte.config.StandardSync;
2426
import io.airbyte.protocol.models.CatalogHelpers;
2527
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
@@ -41,6 +43,8 @@ public class ConnectionHelpers {
4143
private static final String FIELD_NAME = "id";
4244
private static final String BASIC_SCHEDULE_TIME_UNIT = "days";
4345
private static final long BASIC_SCHEDULE_UNITS = 1L;
46+
private static final String BASIC_SCHEDULE_DATA_TIME_UNITS = "days";
47+
private static final long BASIC_SCHEDULE_DATA_UNITS = 1L;
4448

4549
public static final StreamDescriptor STREAM_DESCRIPTOR = new StreamDescriptor().withName(STREAM_NAME);
4650

@@ -100,6 +104,12 @@ public static Schedule generateBasicSchedule() {
100104
.withUnits(BASIC_SCHEDULE_UNITS);
101105
}
102106

107+
public static ScheduleData generateBasicScheduleData() {
108+
return new ScheduleData().withBasicSchedule(new BasicSchedule()
109+
.withTimeUnit(BasicSchedule.TimeUnit.fromValue((BASIC_SCHEDULE_DATA_TIME_UNITS)))
110+
.withUnits(BASIC_SCHEDULE_DATA_UNITS));
111+
}
112+
103113
public static ConnectionRead generateExpectedConnectionRead(final UUID connectionId,
104114
final UUID sourceId,
105115
final UUID destinationId,

airbyte-workers/src/main/java/io/airbyte/workers/helper/ConnectionHelper.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@
77
import com.google.common.base.Preconditions;
88
import io.airbyte.commons.enums.Enums;
99
import io.airbyte.commons.json.Jsons;
10+
import io.airbyte.config.BasicSchedule;
1011
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
1112
import io.airbyte.config.Schedule;
13+
import io.airbyte.config.ScheduleData;
1214
import io.airbyte.config.StandardSync;
15+
import io.airbyte.config.StandardSync.ScheduleType;
1316
import io.airbyte.config.persistence.ConfigNotFoundException;
1417
import io.airbyte.config.persistence.ConfigRepository;
1518
import io.airbyte.scheduler.persistence.WorkspaceHelper;
@@ -89,8 +92,15 @@ public static StandardSync updateConnectionObject(final WorkspaceHelper workspac
8992
.withTimeUnit(update.getSchedule().getTimeUnit())
9093
.withUnits(update.getSchedule().getUnits());
9194
newConnection.withManual(false).withSchedule(newSchedule);
95+
// Also write into the new field. This one will be consumed if populated.
96+
newConnection
97+
.withScheduleType(ScheduleType.BASIC_SCHEDULE);
98+
newConnection.withScheduleData(new ScheduleData().withBasicSchedule(
99+
new BasicSchedule().withTimeUnit(convertTimeUnitSchema(update.getSchedule().getTimeUnit()))
100+
.withUnits(update.getSchedule().getUnits())));
92101
} else {
93102
newConnection.withManual(true).withSchedule(null);
103+
newConnection.withScheduleType(ScheduleType.MANUAL).withScheduleData(null);
94104
}
95105

96106
return newConnection;
@@ -124,4 +134,22 @@ public static void validateWorkspace(final WorkspaceHelper workspaceHelper,
124134
}
125135
}
126136

137+
// Helper method to convert between TimeUnit enums for old and new schedule schemas.
138+
private static BasicSchedule.TimeUnit convertTimeUnitSchema(Schedule.TimeUnit timeUnit) {
139+
switch (timeUnit) {
140+
case MINUTES:
141+
return BasicSchedule.TimeUnit.MINUTES;
142+
case HOURS:
143+
return BasicSchedule.TimeUnit.HOURS;
144+
case DAYS:
145+
return BasicSchedule.TimeUnit.DAYS;
146+
case WEEKS:
147+
return BasicSchedule.TimeUnit.WEEKS;
148+
case MONTHS:
149+
return BasicSchedule.TimeUnit.MONTHS;
150+
default:
151+
throw new RuntimeException("Unhandled TimeUnitEnum: " + timeUnit);
152+
}
153+
}
154+
127155
}

0 commit comments

Comments
 (0)