Skip to content

Commit 111d90f

Browse files
authored
🐞 Fix db config persistence initialization (#6220)
Resolves #5954.
1 parent 51fadf2 commit 111d90f

File tree

5 files changed

+216
-53
lines changed

5 files changed

+216
-53
lines changed

airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigSeedProvider.java

Lines changed: 0 additions & 45 deletions
This file was deleted.

airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import io.airbyte.config.AirbyteConfig;
3737
import io.airbyte.config.ConfigSchema;
3838
import io.airbyte.config.ConfigSchemaMigrationSupport;
39+
import io.airbyte.config.Configs;
3940
import io.airbyte.config.StandardDestinationDefinition;
4041
import io.airbyte.config.StandardSourceDefinition;
4142
import io.airbyte.db.Database;
@@ -73,13 +74,48 @@ public DatabaseConfigPersistence(Database database) {
7374
this.database = new ExceptionWrappingDatabase(database);
7475
}
7576

77+
/**
78+
* Initialize the config persistence.
79+
* <li>If the database has been initialized, update connector definition from YAML seed.</li>
80+
* <li>Otherwise, there are two possibilities.</li>
81+
* <li>The first case is a new deployment, which means there is no local config directory (because
82+
* we no longer create it in newer Airbyte versions). We can initialize the database by copying the
83+
* YAML seed.</li>
84+
* <li>The second case is a migration from an old version that relies on file system config
85+
* persistence. We need to copy the existing configs from local files, and then update connector
86+
* definitions from YAML seed.</li>
87+
*/
88+
public void initialize(Configs serverConfigs, ConfigPersistence yamlSeedPersistence) throws IOException {
89+
database.transaction(ctx -> {
90+
boolean isInitialized = ctx.fetchExists(AIRBYTE_CONFIGS);
91+
if (isInitialized) {
92+
LOGGER.info("Config persistence has been initialized; load YAML seed to update connector definitions");
93+
updateConfigsFromSeed(ctx, yamlSeedPersistence);
94+
return null;
95+
}
96+
97+
boolean hasExistingFileConfigs = FileSystemConfigPersistence.hasExistingConfigs(serverConfigs.getConfigRoot());
98+
if (hasExistingFileConfigs) {
99+
LOGGER.info("Config persistence needs initialization; load seed from existing local config directory, and update from YAML seed");
100+
ConfigPersistence fileSystemPersistence = new FileSystemConfigPersistence(serverConfigs.getConfigRoot());
101+
copyConfigsFromSeed(ctx, fileSystemPersistence);
102+
updateConfigsFromSeed(ctx, yamlSeedPersistence);
103+
return null;
104+
}
105+
106+
LOGGER.info("Config persistence needs initialization; there is no local config directory; load YAML seed");
107+
copyConfigsFromSeed(ctx, yamlSeedPersistence);
108+
return null;
109+
});
110+
}
111+
76112
/**
77113
* Load or update the configs from the seed.
78114
*/
79115
@Override
80116
public void loadData(ConfigPersistence seedConfigPersistence) throws IOException {
81117
database.transaction(ctx -> {
82-
boolean isInitialized = ctx.fetchExists(select().from(AIRBYTE_CONFIGS).where());
118+
boolean isInitialized = ctx.fetchExists(AIRBYTE_CONFIGS);
83119
if (isInitialized) {
84120
updateConfigsFromSeed(ctx, seedConfigPersistence);
85121
} else {
@@ -239,7 +275,7 @@ int updateConfigRecord(DSLContext ctx, OffsetDateTime timestamp, String configTy
239275

240276
@VisibleForTesting
241277
void copyConfigsFromSeed(DSLContext ctx, ConfigPersistence seedConfigPersistence) throws SQLException {
242-
LOGGER.info("Loading data to config database...");
278+
LOGGER.info("Loading seed data to config database...");
243279

244280
Map<String, Stream<JsonNode>> seedConfigs;
245281
try {
@@ -293,7 +329,7 @@ private ConnectorCounter(int newCount, int updateCount) {
293329

294330
@VisibleForTesting
295331
void updateConfigsFromSeed(DSLContext ctx, ConfigPersistence seedConfigPersistence) throws SQLException {
296-
LOGGER.info("Config database has been initialized; updating connector definitions from the seed if necessary...");
332+
LOGGER.info("Updating connector definitions from the seed if necessary...");
297333

298334
try {
299335
Set<String> connectorRepositoriesInUse = getConnectorRepositoriesInUse(ctx);
@@ -356,7 +392,7 @@ private <T> ConnectorCounter updateConnectorDefinitions(DSLContext ctx,
356392

357393
ConnectorInfo connectorInfo = connectorRepositoryToIdVersionMap.get(repository);
358394
String latestImageTag = configJson.get("dockerImageTag").asText();
359-
if (!latestImageTag.equals(connectorInfo.dockerImageTag)) {
395+
if (hasNewVersion(connectorInfo.dockerImageTag, latestImageTag)) {
360396
LOGGER.info("Connector {} needs update: {} vs {}", repository, connectorInfo.dockerImageTag, latestImageTag);
361397
updatedCount += updateConfigRecord(ctx, timestamp, configType.name(), configJson, connectorInfo.connectorDefinitionId);
362398
} else {
@@ -366,6 +402,15 @@ private <T> ConnectorCounter updateConnectorDefinitions(DSLContext ctx,
366402
return new ConnectorCounter(newCount, updatedCount);
367403
}
368404

405+
static boolean hasNewVersion(String currentVersion, String latestVersion) {
406+
try {
407+
return new AirbyteVersion(latestVersion).patchVersionCompareTo(new AirbyteVersion(currentVersion)) > 0;
408+
} catch (Exception e) {
409+
LOGGER.error("Failed to check version: {} vs {}", currentVersion, latestVersion);
410+
return false;
411+
}
412+
}
413+
369414
/**
370415
* @return A map about current connectors (both source and destination). It maps from connector
371416
* repository to its definition id and docker image tag. We identify a connector by its
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* MIT License
3+
*
4+
* Copyright (c) 2020 Airbyte
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
25+
package io.airbyte.config.persistence;
26+
27+
import static org.mockito.ArgumentMatchers.any;
28+
import static org.mockito.Mockito.inOrder;
29+
import static org.mockito.Mockito.mock;
30+
import static org.mockito.Mockito.never;
31+
import static org.mockito.Mockito.reset;
32+
import static org.mockito.Mockito.spy;
33+
import static org.mockito.Mockito.times;
34+
import static org.mockito.Mockito.verify;
35+
import static org.mockito.Mockito.when;
36+
37+
import io.airbyte.commons.json.Jsons;
38+
import io.airbyte.config.ConfigSchema;
39+
import io.airbyte.config.Configs;
40+
import io.airbyte.config.StandardDestinationDefinition;
41+
import io.airbyte.config.StandardSourceDefinition;
42+
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
43+
import java.nio.file.Files;
44+
import java.nio.file.Path;
45+
import java.util.List;
46+
import java.util.Map;
47+
import java.util.UUID;
48+
import org.jooq.DSLContext;
49+
import org.junit.jupiter.api.AfterAll;
50+
import org.junit.jupiter.api.BeforeAll;
51+
import org.junit.jupiter.api.BeforeEach;
52+
import org.junit.jupiter.api.DisplayName;
53+
import org.junit.jupiter.api.Test;
54+
import org.mockito.InOrder;
55+
56+
/**
57+
* Unit test for the {@link DatabaseConfigPersistence#initialize} method.
58+
*/
59+
public class DatabaseConfigPersistenceInitializeTest extends BaseDatabaseConfigPersistenceTest {
60+
61+
// mock YAML seed connector definitions
62+
private static final List<StandardSourceDefinition> SEED_SOURCES = List.of(SOURCE_GITHUB);
63+
private static final List<StandardDestinationDefinition> SEED_DESTINATIONS = List.of(DESTINATION_SNOWFLAKE);
64+
65+
private static final ConfigPersistence SEED_PERSISTENCE = mock(ConfigPersistence.class);
66+
private static Path ROOT_PATH;
67+
68+
private final Configs configs = mock(Configs.class);
69+
70+
@BeforeAll
71+
public static void setup() throws Exception {
72+
database = new ConfigsDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize();
73+
configPersistence = spy(new DatabaseConfigPersistence(database));
74+
75+
when(SEED_PERSISTENCE.dumpConfigs()).thenReturn(Map.of(
76+
ConfigSchema.STANDARD_SOURCE_DEFINITION.name(), SEED_SOURCES.stream().map(Jsons::jsonNode),
77+
ConfigSchema.STANDARD_DESTINATION_DEFINITION.name(), SEED_DESTINATIONS.stream().map(Jsons::jsonNode)));
78+
when(SEED_PERSISTENCE.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class))
79+
.thenReturn(SEED_SOURCES);
80+
when(SEED_PERSISTENCE.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class))
81+
.thenReturn(SEED_DESTINATIONS);
82+
}
83+
84+
@AfterAll
85+
public static void tearDown() throws Exception {
86+
database.close();
87+
}
88+
89+
@BeforeEach
90+
public void resetPersistence() throws Exception {
91+
ROOT_PATH = Files.createTempDirectory(
92+
Files.createDirectories(Path.of("/tmp/airbyte_tests")),
93+
DatabaseConfigPersistenceInitializeTest.class.getSimpleName() + UUID.randomUUID());
94+
95+
reset(configs);
96+
when(configs.getConfigRoot()).thenReturn(ROOT_PATH);
97+
98+
database.query(ctx -> ctx.truncateTable("airbyte_configs").execute());
99+
100+
reset(configPersistence);
101+
}
102+
103+
@Test
104+
@DisplayName("When database is not initialized, and there is no local config dir, copy from seed")
105+
public void testNewDeployment() throws Exception {
106+
configPersistence.initialize(configs, SEED_PERSISTENCE);
107+
108+
assertRecordCount(2);
109+
assertHasSource(SOURCE_GITHUB);
110+
assertHasDestination(DESTINATION_SNOWFLAKE);
111+
112+
verify(configPersistence, times(1)).copyConfigsFromSeed(any(DSLContext.class), any(ConfigPersistence.class));
113+
verify(configPersistence, never()).updateConfigsFromSeed(any(DSLContext.class), any(ConfigPersistence.class));
114+
}
115+
116+
@Test
117+
@DisplayName("When database is not initialized, and there is local config dir, copy from local and update from seed")
118+
public void testMigrationDeployment() throws Exception {
119+
prepareLocalFilePersistence();
120+
121+
configPersistence.initialize(configs, SEED_PERSISTENCE);
122+
123+
assertRecordCount(3);
124+
assertHasSource(SOURCE_GITHUB);
125+
assertHasDestination(DESTINATION_S3);
126+
assertHasDestination(DESTINATION_SNOWFLAKE);
127+
128+
InOrder callOrder = inOrder(configPersistence);
129+
callOrder.verify(configPersistence, times(1)).copyConfigsFromSeed(any(DSLContext.class), any(ConfigPersistence.class));
130+
callOrder.verify(configPersistence, times(1)).updateConfigsFromSeed(any(DSLContext.class), any(ConfigPersistence.class));
131+
}
132+
133+
@Test
134+
@DisplayName("When database has been initialized, ignore local config dir, and update from seed")
135+
public void testUpdateDeployment() throws Exception {
136+
prepareLocalFilePersistence();
137+
writeSource(configPersistence, SOURCE_GITHUB);
138+
139+
configPersistence.initialize(configs, SEED_PERSISTENCE);
140+
141+
assertRecordCount(2);
142+
assertHasSource(SOURCE_GITHUB);
143+
assertHasDestination(DESTINATION_SNOWFLAKE);
144+
145+
verify(configPersistence, never()).copyConfigsFromSeed(any(DSLContext.class), any(ConfigPersistence.class));
146+
verify(configPersistence, times(1)).updateConfigsFromSeed(any(DSLContext.class), any(ConfigPersistence.class));
147+
}
148+
149+
private void prepareLocalFilePersistence() throws Exception {
150+
Files.createDirectories(ROOT_PATH.resolve(FileSystemConfigPersistence.CONFIG_DIR));
151+
ConfigPersistence filePersistence = new FileSystemConfigPersistence(ROOT_PATH);
152+
writeSource(filePersistence, SOURCE_GITHUB);
153+
writeDestination(filePersistence, DESTINATION_S3);
154+
}
155+
156+
}

airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
package io.airbyte.config.persistence;
2626

2727
import static org.junit.jupiter.api.Assertions.assertEquals;
28+
import static org.junit.jupiter.api.Assertions.assertFalse;
2829
import static org.junit.jupiter.api.Assertions.assertThrows;
30+
import static org.junit.jupiter.api.Assertions.assertTrue;
2931
import static org.mockito.Mockito.spy;
3032

3133
import com.fasterxml.jackson.databind.JsonNode;
@@ -46,8 +48,8 @@
4648
import org.junit.jupiter.api.Test;
4749

4850
/**
49-
* The {@link DatabaseConfigPersistence#loadData} method is tested in
50-
* {@link DatabaseConfigPersistenceLoadDataTest}.
51+
* See {@link DatabaseConfigPersistenceLoadDataTest} and
52+
* {@link DatabaseConfigPersistenceInitializeTest} for testing of those methods.
5153
*/
5254
public class DatabaseConfigPersistenceTest extends BaseDatabaseConfigPersistenceTest {
5355

@@ -218,4 +220,10 @@ public void testUpdateConfigRecord() throws Exception {
218220
assertHasSource(SOURCE_GITHUB);
219221
}
220222

223+
@Test
224+
public void testHasNewVersion() {
225+
assertTrue(DatabaseConfigPersistence.hasNewVersion("0.1.99", "0.2.0"));
226+
assertFalse(DatabaseConfigPersistence.hasNewVersion("invalid_version", "0.2.0"));
227+
}
228+
221229
}

airbyte-server/src/main/java/io/airbyte/server/ServerApp.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import io.airbyte.config.StandardWorkspace;
3535
import io.airbyte.config.helpers.LogClientSingleton;
3636
import io.airbyte.config.persistence.ConfigRepository;
37-
import io.airbyte.config.persistence.ConfigSeedProvider;
3837
import io.airbyte.config.persistence.DatabaseConfigPersistence;
3938
import io.airbyte.config.persistence.YamlSeedConfigPersistence;
4039
import io.airbyte.db.Database;
@@ -182,7 +181,7 @@ public static ServerRunnable getServer(final ServerFactory apiFactory) throws Ex
182181
configs.getConfigDatabaseUrl())
183182
.getAndInitialize();
184183
final DatabaseConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase);
185-
configPersistence.loadData(ConfigSeedProvider.get(configs));
184+
configPersistence.initialize(configs, YamlSeedConfigPersistence.get());
186185
final ConfigRepository configRepository = new ConfigRepository(configPersistence.withValidation());
187186

188187
LOGGER.info("Creating Scheduler persistence...");

0 commit comments

Comments
 (0)