Skip to content

Commit 44e8f6f

Browse files
authored
Auto-upgrade connectors when they are in use only a patch version update (#10515)
* auto-upgrade connectors there are in use with patch version only * update check version docstring * remove try/catch from hasNewPatchVersion * refactor write std defs function * run format * add unit test and change exception * update airbyte version function name to be more clear * correct unit test in migration tests * run format
1 parent ba4e86f commit 44e8f6f

File tree

5 files changed

+87
-30
lines changed

5 files changed

+87
-30
lines changed

airbyte-commons/src/main/java/io/airbyte/commons/version/AirbyteVersion.java

+18
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,24 @@ public int patchVersionCompareTo(final AirbyteVersion another) {
114114
return compareVersion(patch, another.patch);
115115
}
116116

117+
/**
118+
* Compares two Airbyte Version to check if only the patch version was updated.
119+
*/
120+
public boolean checkOnlyPatchVersionIsUpdatedComparedTo(final AirbyteVersion another) {
121+
if (version.equals(DEV_VERSION) || another.version.equals(DEV_VERSION)) {
122+
return false;
123+
}
124+
final int majorDiff = compareVersion(major, another.major);
125+
if (majorDiff > 0) {
126+
return false;
127+
}
128+
final int minorDiff = compareVersion(minor, another.minor);
129+
if (minorDiff > 0) {
130+
return false;
131+
}
132+
return compareVersion(patch, another.patch) > 0;
133+
}
134+
117135
public boolean isDev() {
118136
return version.equals(DEV_VERSION);
119137
}

airbyte-commons/src/test/java/io/airbyte/commons/version/AirbyteVersionTest.java

+8
Original file line numberDiff line numberDiff line change
@@ -101,4 +101,12 @@ public void testCheckVersion() {
101101
assertThrows(IllegalStateException.class, () -> AirbyteVersion.assertIsCompatible(new AirbyteVersion("1.2.3"), new AirbyteVersion("3.2.1")));
102102
}
103103

104+
@Test
105+
public void testCheckOnlyPatchVersion() {
106+
assertFalse(new AirbyteVersion("6.7.8").checkOnlyPatchVersionIsUpdatedComparedTo(new AirbyteVersion("6.7.8")));
107+
assertFalse(new AirbyteVersion("6.9.8").checkOnlyPatchVersionIsUpdatedComparedTo(new AirbyteVersion("6.8.9")));
108+
assertFalse(new AirbyteVersion("7.7.8").checkOnlyPatchVersionIsUpdatedComparedTo(new AirbyteVersion("6.7.11")));
109+
assertTrue(new AirbyteVersion("6.7.9").checkOnlyPatchVersionIsUpdatedComparedTo(new AirbyteVersion("6.7.8")));
110+
}
111+
104112
}

airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java

+28-29
Original file line numberDiff line numberDiff line change
@@ -1829,13 +1829,7 @@ <T> ConnectorCounter updateConnectorDefinitions(final DSLContext ctx,
18291829
// Add new connector
18301830
if (!connectorRepositoryToIdVersionMap.containsKey(repository)) {
18311831
LOGGER.info("Adding new connector {}: {}", repository, latestDefinition);
1832-
if (configType == ConfigSchema.STANDARD_SOURCE_DEFINITION) {
1833-
writeStandardSourceDefinition(Collections.singletonList(Jsons.object(latestDefinition, StandardSourceDefinition.class)), ctx);
1834-
} else if (configType == ConfigSchema.STANDARD_DESTINATION_DEFINITION) {
1835-
writeStandardDestinationDefinition(Collections.singletonList(Jsons.object(latestDefinition, StandardDestinationDefinition.class)), ctx);
1836-
} else {
1837-
throw new RuntimeException("Unknown config type " + configType);
1838-
}
1832+
writeOrUpdateStandardDefinition(ctx, configType, latestDefinition);
18391833
newCount++;
18401834
continue;
18411835
}
@@ -1853,19 +1847,19 @@ <T> ConnectorCounter updateConnectorDefinitions(final DSLContext ctx,
18531847

18541848
// Process connector in use
18551849
if (connectorRepositoriesInUse.contains(repository)) {
1856-
if (newFields.size() == 0) {
1850+
final String latestImageTag = latestDefinition.get("dockerImageTag").asText();
1851+
if (hasNewPatchVersion(connectorInfo.dockerImageTag, latestImageTag)) {
1852+
// Update connector to the latest patch version
1853+
LOGGER.info("Connector {} needs update: {} vs {}", repository, connectorInfo.dockerImageTag, latestImageTag);
1854+
writeOrUpdateStandardDefinition(ctx, configType, latestDefinition);
1855+
updatedCount++;
1856+
} else if (newFields.size() == 0) {
18571857
LOGGER.info("Connector {} is in use and has all fields; skip updating", repository);
18581858
} else {
18591859
// Add new fields to the connector definition
18601860
final JsonNode definitionToUpdate = getDefinitionWithNewFields(currentDefinition, latestDefinition, newFields);
18611861
LOGGER.info("Connector {} has new fields: {}", repository, String.join(", ", newFields));
1862-
if (configType == ConfigSchema.STANDARD_SOURCE_DEFINITION) {
1863-
writeStandardSourceDefinition(Collections.singletonList(Jsons.object(definitionToUpdate, StandardSourceDefinition.class)), ctx);
1864-
} else if (configType == ConfigSchema.STANDARD_DESTINATION_DEFINITION) {
1865-
writeStandardDestinationDefinition(Collections.singletonList(Jsons.object(definitionToUpdate, StandardDestinationDefinition.class)), ctx);
1866-
} else {
1867-
throw new RuntimeException("Unknown config type " + configType);
1868-
}
1862+
writeOrUpdateStandardDefinition(ctx, configType, definitionToUpdate);
18691863
updatedCount++;
18701864
}
18711865
continue;
@@ -1876,25 +1870,13 @@ <T> ConnectorCounter updateConnectorDefinitions(final DSLContext ctx,
18761870
if (hasNewVersion(connectorInfo.dockerImageTag, latestImageTag)) {
18771871
// Update connector to the latest version
18781872
LOGGER.info("Connector {} needs update: {} vs {}", repository, connectorInfo.dockerImageTag, latestImageTag);
1879-
if (configType == ConfigSchema.STANDARD_SOURCE_DEFINITION) {
1880-
writeStandardSourceDefinition(Collections.singletonList(Jsons.object(latestDefinition, StandardSourceDefinition.class)), ctx);
1881-
} else if (configType == ConfigSchema.STANDARD_DESTINATION_DEFINITION) {
1882-
writeStandardDestinationDefinition(Collections.singletonList(Jsons.object(latestDefinition, StandardDestinationDefinition.class)), ctx);
1883-
} else {
1884-
throw new RuntimeException("Unknown config type " + configType);
1885-
}
1873+
writeOrUpdateStandardDefinition(ctx, configType, latestDefinition);
18861874
updatedCount++;
18871875
} else if (newFields.size() > 0) {
18881876
// Add new fields to the connector definition
18891877
final JsonNode definitionToUpdate = getDefinitionWithNewFields(currentDefinition, latestDefinition, newFields);
18901878
LOGGER.info("Connector {} has new fields: {}", repository, String.join(", ", newFields));
1891-
if (configType == ConfigSchema.STANDARD_SOURCE_DEFINITION) {
1892-
writeStandardSourceDefinition(Collections.singletonList(Jsons.object(definitionToUpdate, StandardSourceDefinition.class)), ctx);
1893-
} else if (configType == ConfigSchema.STANDARD_DESTINATION_DEFINITION) {
1894-
writeStandardDestinationDefinition(Collections.singletonList(Jsons.object(definitionToUpdate, StandardDestinationDefinition.class)), ctx);
1895-
} else {
1896-
throw new RuntimeException("Unknown config type " + configType);
1897-
}
1879+
writeOrUpdateStandardDefinition(ctx, configType, definitionToUpdate);
18981880
updatedCount++;
18991881
} else {
19001882
LOGGER.info("Connector {} does not need update: {}", repository, connectorInfo.dockerImageTag);
@@ -1904,6 +1886,18 @@ <T> ConnectorCounter updateConnectorDefinitions(final DSLContext ctx,
19041886
return new ConnectorCounter(newCount, updatedCount);
19051887
}
19061888

1889+
private void writeOrUpdateStandardDefinition(final DSLContext ctx,
1890+
final AirbyteConfig configType,
1891+
final JsonNode definition) {
1892+
if (configType == ConfigSchema.STANDARD_SOURCE_DEFINITION) {
1893+
writeStandardSourceDefinition(Collections.singletonList(Jsons.object(definition, StandardSourceDefinition.class)), ctx);
1894+
} else if (configType == ConfigSchema.STANDARD_DESTINATION_DEFINITION) {
1895+
writeStandardDestinationDefinition(Collections.singletonList(Jsons.object(definition, StandardDestinationDefinition.class)), ctx);
1896+
} else {
1897+
throw new IllegalArgumentException("Unknown config type " + configType);
1898+
}
1899+
}
1900+
19071901
@VisibleForTesting
19081902
static Set<String> getNewFields(final JsonNode currentDefinition, final JsonNode latestDefinition) {
19091903
final Set<String> currentFields = MoreIterators.toSet(currentDefinition.fieldNames());
@@ -1931,6 +1925,11 @@ static boolean hasNewVersion(final String currentVersion, final String latestVer
19311925
}
19321926
}
19331927

1928+
@VisibleForTesting
1929+
static boolean hasNewPatchVersion(final String currentVersion, final String latestVersion) {
1930+
return new AirbyteVersion(latestVersion).checkOnlyPatchVersionIsUpdatedComparedTo(new AirbyteVersion(currentVersion));
1931+
}
1932+
19341933
static class ConnectorInfo {
19351934

19361935
final String definitionId;

airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceUpdateConnectorDefinitionsTest.java

+26
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,32 @@ public void testOldConnectorInUseWithMissingFields() throws Exception {
9393
Collections.singletonList(currentSourceWithNewFields));
9494
}
9595

96+
@Test
97+
@DisplayName("When a old connector is in use and there is a new patch version, update its version")
98+
public void testOldConnectorInUseWithMinorVersion() throws Exception {
99+
final StandardSourceDefinition currentSource = getSource().withDockerImageTag("0.1.0");
100+
final StandardSourceDefinition latestSource = getSource().withDockerImageTag("0.1.9");
101+
102+
assertUpdateConnectorDefinition(
103+
Collections.singletonList(currentSource),
104+
Collections.singletonList(currentSource),
105+
Collections.singletonList(latestSource),
106+
Collections.singletonList(latestSource));
107+
}
108+
109+
@Test
110+
@DisplayName("When a old connector is in use and there is a new minor version, do not update its version")
111+
public void testOldConnectorInUseWithPathVersion() throws Exception {
112+
final StandardSourceDefinition currentSource = getSource().withDockerImageTag("0.1.0");
113+
final StandardSourceDefinition latestSource = getSource().withDockerImageTag("0.2.0");
114+
115+
assertUpdateConnectorDefinition(
116+
Collections.singletonList(currentSource),
117+
Collections.singletonList(currentSource),
118+
Collections.singletonList(latestSource),
119+
Collections.singletonList(currentSource));
120+
}
121+
96122
@Test
97123
@DisplayName("When an unused connector has a new version, update it")
98124
public void testUnusedConnectorWithOldVersion() throws Exception {

airbyte-tests/src/automaticMigrationAcceptanceTest/java/io/airbyte/test/automaticMigrationAcceptance/MigrationAcceptanceTest.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.airbyte.commons.concurrency.WaitingUtils;
3232
import io.airbyte.commons.resources.MoreResources;
3333
import io.airbyte.commons.util.MoreProperties;
34+
import io.airbyte.commons.version.AirbyteVersion;
3435
import io.airbyte.test.airbyte_test_container.AirbyteTestContainer;
3536
import java.io.File;
3637
import java.net.URISyntaxException;
@@ -238,7 +239,12 @@ private static void assertDestinationDefinitionInformation(final ApiClient apiCl
238239
foundPostgresDestinationDefinition = true;
239240
}
240241
case "8be1cf83-fde1-477f-a4ad-318d23c9f3c6" -> {
241-
assertEquals("0.2.0", destinationDefinitionRead.getDockerImageTag());
242+
final String tag = destinationDefinitionRead.getDockerImageTag();
243+
final AirbyteVersion currentVersion = new AirbyteVersion(tag);
244+
final AirbyteVersion previousVersion = new AirbyteVersion("0.2.0");
245+
final AirbyteVersion finalVersion =
246+
(currentVersion.checkOnlyPatchVersionIsUpdatedComparedTo(previousVersion) ? currentVersion : previousVersion);
247+
assertEquals(finalVersion.toString(), currentVersion.toString());
242248
assertTrue(destinationDefinitionRead.getName().contains("Local CSV"));
243249
foundLocalCSVDestinationDefinition = true;
244250
}

0 commit comments

Comments
 (0)