diff --git a/airbyte-config/db-persistence/build.gradle b/airbyte-config/db-persistence/build.gradle new file mode 100644 index 0000000000000..9686f2dcc41ef --- /dev/null +++ b/airbyte-config/db-persistence/build.gradle @@ -0,0 +1,11 @@ +dependencies { + implementation 'commons-io:commons-io:2.7' + + implementation project(':airbyte-db') + implementation project(':airbyte-commons') + implementation project(':airbyte-config:models') + implementation project(':airbyte-config:persistence') + implementation project(":airbyte-json-validation") + + testImplementation "org.testcontainers:postgresql:1.15.1" +} diff --git a/airbyte-config/db-persistence/src/main/java/io.airbyte.config.dbPersistence/DatabaseConfigPersistence.java b/airbyte-config/db-persistence/src/main/java/io.airbyte.config.dbPersistence/DatabaseConfigPersistence.java new file mode 100644 index 0000000000000..67b6c0c5ef941 --- /dev/null +++ b/airbyte-config/db-persistence/src/main/java/io.airbyte.config.dbPersistence/DatabaseConfigPersistence.java @@ -0,0 +1,118 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.config.dbPersistence; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.ConfigSchema; +import io.airbyte.config.persistence.ConfigNotFoundException; +import io.airbyte.config.persistence.ConfigPersistence; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.validation.json.JsonSchemaValidator; +import io.airbyte.validation.json.JsonValidationException; +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class DatabaseConfigPersistence implements ConfigPersistence { + + private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseConfigPersistence.class); + + protected final JdbcDatabase database; + private final JsonSchemaValidator jsonSchemaValidator; + + protected DatabaseConfigPersistence(JdbcDatabase db, JsonSchemaValidator validator) { + database = db; + jsonSchemaValidator = validator; + } + + // Create a table named CONFIG with three columns: CONFIG_ID (PK UUID/string), CONFIG_TYPE (string), + // CONFIG_DATA (JSON/string) + public abstract void Setup() throws SQLException; + + @Override + public T getConfig(ConfigSchema configType, UUID configId, Class clazz) + throws ConfigNotFoundException, JsonValidationException, IOException { + try { + Optional data = database.querySingle("SELECT CONFIG_DATA FROM CONFIG WHERE CONFIG_TYPE = ? AND CONFIG_ID = ?", + r -> r.getString("CONFIG_DATA"), configType.toString(), configId); + if (!data.isPresent()) { + throw new ConfigNotFoundException(configType, configId); + } + + final T config = Jsons.deserialize(data.get(), clazz); + validateJson(config, configType); + return config; + } catch (SQLException e) { + throw new IOException(String.format("Failed to get config type %s item %s. Reason: %s", configType, configId, e.getMessage()), e); + } + } + + @Override + public List listConfigs(ConfigSchema configType, Class clazz) throws JsonValidationException, IOException { + try { + List results = database.query(c -> { + var stmt = c.prepareStatement("SELECT CONFIG_DATA FROM CONFIG WHERE CONFIG_TYPE = ?"); + stmt.setString(1, configType.toString()); + return stmt; + }, r -> r.getString("CONFIG_DATA")) + .map(s -> (T) Jsons.deserialize(s, clazz)) + .collect(Collectors.toList()); + return results; + } catch (SQLException e) { + throw new IOException(String.format("Failed to get config type %s listing. Reason: %s", configType, e.getMessage()), e); + } + } + + @Override + public void writeConfig(ConfigSchema configType, UUID configId, T config) throws JsonValidationException, IOException { + // validate config with schema + validateJson(Jsons.jsonNode(config), configType); + final String data = Jsons.serialize(config); + try { + database.execute(c -> writeConfigQuery(c, configType, configId, data).execute()); + } catch (SQLException e) { + throw new IOException(String.format("Failed to write config type %s item %s. Reason: %s", configType, configId, e.getMessage()), e); + } + } + + // Made abstract because what we want for this is an upsert operation, which different databases + // handle with different syntax + // Overrides need to return a prepared statement with all 3 data elements added + protected abstract PreparedStatement writeConfigQuery(Connection conn, ConfigSchema configType, UUID configId, String data) throws SQLException; + + private void validateJson(T config, ConfigSchema configType) throws JsonValidationException { + JsonNode schema = JsonSchemaValidator.getSchema(configType.getFile()); + jsonSchemaValidator.ensure(schema, Jsons.jsonNode(config)); + } + +} diff --git a/airbyte-config/db-persistence/src/main/java/io.airbyte.config.dbPersistence/PostgresConfigPersistence.java b/airbyte-config/db-persistence/src/main/java/io.airbyte.config.dbPersistence/PostgresConfigPersistence.java new file mode 100644 index 0000000000000..114d5838ffc91 --- /dev/null +++ b/airbyte-config/db-persistence/src/main/java/io.airbyte.config.dbPersistence/PostgresConfigPersistence.java @@ -0,0 +1,71 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.config.dbPersistence; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.config.ConfigSchema; +import io.airbyte.db.Databases; +import io.airbyte.validation.json.JsonSchemaValidator; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.UUID; + +public class PostgresConfigPersistence extends DatabaseConfigPersistence { + + public PostgresConfigPersistence(String username, String password, String connectionString) { + this(username, password, connectionString, new JsonSchemaValidator()); + } + + public PostgresConfigPersistence(JsonNode config) { + this(config, new JsonSchemaValidator()); + } + + public PostgresConfigPersistence(JsonNode config, JsonSchemaValidator validator) { + this(config.get("username").asText(), config.get("password").asText(), Databases.getPostgresJdbcUrl(config), validator); + } + + public PostgresConfigPersistence(String username, String password, String connectionString, JsonSchemaValidator validator) { + super(Databases.createJdbcDatabase(username, password, connectionString, Databases.POSTGRES_DRIVER), validator); + } + + @Override + // Create a table named CONFIG with three columns: CONFIG_ID (PK UUID/string), CONFIG_TYPE (string), + // CONFIG_DATA (JSON/string) + public void Setup() throws SQLException { + database.execute("CREATE TABLE CONFIG (CONFIG_ID UUID PRIMARY KEY, CONFIG_TYPE VARCHAR(32) NOT NULL, CONFIG_DATA JSONB NOT NULL)"); + } + + @Override + protected PreparedStatement writeConfigQuery(Connection conn, ConfigSchema configType, UUID configId, String data) throws SQLException { + var result = conn.prepareStatement( + "INSERT INTO CONFIG (CONFIG_ID, CONFIG_TYPE, CONFIG_DATA) VALUES (?, ?, CAST(? as jsonb)) ON CONFLICT (CONFIG_ID) DO UPDATE SET CONFIG_DATA = EXCLUDED.CONFIG_DATA"); + result.setObject(1, configId); + result.setString(2, configType.toString()); + result.setString(3, data); + return result; + } + +} diff --git a/airbyte-config/db-persistence/src/test/java/io/airbyte/config/dbPersistence/PostgresConfigPersistenceTest.java b/airbyte-config/db-persistence/src/test/java/io/airbyte/config/dbPersistence/PostgresConfigPersistenceTest.java new file mode 100644 index 0000000000000..7eb7abb741740 --- /dev/null +++ b/airbyte-config/db-persistence/src/test/java/io/airbyte/config/dbPersistence/PostgresConfigPersistenceTest.java @@ -0,0 +1,140 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.config.dbPersistence; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; + +import com.google.common.collect.Sets; +import io.airbyte.config.ConfigSchema; +import io.airbyte.config.StandardSourceDefinition; +import io.airbyte.config.StandardSync; +import io.airbyte.config.persistence.ConfigNotFoundException; +import io.airbyte.validation.json.JsonSchemaValidator; +import io.airbyte.validation.json.JsonValidationException; +import java.io.IOException; +import java.sql.SQLException; +import java.util.List; +import java.util.UUID; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.PostgreSQLContainer; + +public class PostgresConfigPersistenceTest { + + public static final UUID UUID_1 = new UUID(0, 1); + public static final StandardSourceDefinition SOURCE_1 = new StandardSourceDefinition(); + + static { + SOURCE_1.withSourceDefinitionId(UUID_1) + .withName("apache storm"); + } + + public static final UUID UUID_2 = new UUID(0, 2); + public static final StandardSourceDefinition SOURCE_2 = new StandardSourceDefinition(); + + static { + SOURCE_2.withSourceDefinitionId(UUID_2) + .withName("apache storm"); + } + + private JsonSchemaValidator schemaValidator; + + private PostgresConfigPersistence configPersistence; + + private static final String IMAGE_NAME = "postgres:13-alpine"; + private PostgreSQLContainer db; + + @BeforeEach + void setUp() throws SQLException { + schemaValidator = mock(JsonSchemaValidator.class); + db = new PostgreSQLContainer<>(IMAGE_NAME); + db.start(); + configPersistence = new PostgresConfigPersistence(db.getUsername(), db.getPassword(), db.getJdbcUrl(), schemaValidator); + configPersistence.Setup(); + } + + @AfterEach + void tearDown() { + db.stop(); + db.close(); + } + + @Test + void testReadWriteConfig() throws IOException, JsonValidationException, ConfigNotFoundException { + configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, UUID_1, SOURCE_1); + + assertEquals( + SOURCE_1, + configPersistence.getConfig( + ConfigSchema.STANDARD_SOURCE_DEFINITION, + UUID_1, + StandardSourceDefinition.class)); + } + + @Test + void testListConfigs() throws JsonValidationException, IOException { + configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, UUID_1, SOURCE_1); + configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, UUID_2, SOURCE_2); + + assertEquals( + Sets.newHashSet(SOURCE_1, SOURCE_2), + Sets.newHashSet(configPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class))); + } + + @Test + void writeConfigWithJsonSchemaRef() throws JsonValidationException, IOException, ConfigNotFoundException { + final StandardSync standardSync = new StandardSync() + .withName("sync") + .withPrefix("sync") + .withConnectionId(UUID_1) + .withSourceId(UUID.randomUUID()) + .withDestinationId(UUID.randomUUID()) + .withOperationIds(List.of(UUID.randomUUID())); + + configPersistence.writeConfig(ConfigSchema.STANDARD_SYNC, UUID_1, standardSync); + + assertEquals( + standardSync, + configPersistence.getConfig(ConfigSchema.STANDARD_SYNC, UUID_1, StandardSync.class)); + } + + @Test + void writeConfigInvalidConfig() throws JsonValidationException { + StandardSourceDefinition standardSourceDefinition = SOURCE_1.withName(null); + + doThrow(new JsonValidationException("error")).when(schemaValidator).ensure(any(), any()); + + assertThrows(JsonValidationException.class, () -> configPersistence.writeConfig( + ConfigSchema.STANDARD_SOURCE_DEFINITION, + UUID_1, + standardSourceDefinition)); + } + +} diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigNotFoundException.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigNotFoundException.java index a77e5176daba0..b37dad5497f2b 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigNotFoundException.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigNotFoundException.java @@ -25,16 +25,17 @@ package io.airbyte.config.persistence; import io.airbyte.config.ConfigSchema; +import java.util.UUID; public class ConfigNotFoundException extends Exception { private ConfigSchema type; private final String configId; - public ConfigNotFoundException(ConfigSchema type, String configId) { - super(String.format("config type: %s id: %s", type, configId)); + public ConfigNotFoundException(ConfigSchema type, UUID configId) { + super(String.format("config type: %s id: %s", type, configId.toString())); this.type = type; - this.configId = configId; + this.configId = configId.toString(); } public ConfigSchema getType() { diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistence.java index 8ad20e5760042..dc82f0348b95f 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistence.java @@ -28,13 +28,14 @@ import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; import java.util.List; +import java.util.UUID; public interface ConfigPersistence { - T getConfig(ConfigSchema configType, String configId, Class clazz) throws ConfigNotFoundException, JsonValidationException, IOException; + T getConfig(ConfigSchema configType, UUID configId, Class clazz) throws ConfigNotFoundException, JsonValidationException, IOException; List listConfigs(ConfigSchema configType, Class clazz) throws JsonValidationException, IOException; - void writeConfig(ConfigSchema configType, String configId, T config) throws JsonValidationException, IOException; + void writeConfig(ConfigSchema configType, UUID configId, T config) throws JsonValidationException, IOException; } diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index f50cf5636327e..a6ca9250feeca 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -49,13 +49,13 @@ public ConfigRepository(ConfigPersistence persistence) { public StandardWorkspace getStandardWorkspace(final UUID workspaceId, boolean includeTombstone) throws JsonValidationException, IOException, ConfigNotFoundException { - StandardWorkspace workspace = persistence.getConfig(ConfigSchema.STANDARD_WORKSPACE, workspaceId.toString(), StandardWorkspace.class); + StandardWorkspace workspace = persistence.getConfig(ConfigSchema.STANDARD_WORKSPACE, workspaceId, StandardWorkspace.class); if (!MoreBooleans.isTruthy(workspace.getTombstone()) || includeTombstone) { return workspace; } - throw new ConfigNotFoundException(ConfigSchema.STANDARD_WORKSPACE, workspaceId.toString()); + throw new ConfigNotFoundException(ConfigSchema.STANDARD_WORKSPACE, workspaceId); } public StandardWorkspace getWorkspaceBySlug(String slug, boolean includeTombstone) @@ -66,7 +66,7 @@ public StandardWorkspace getWorkspaceBySlug(String slug, boolean includeTombston } } - throw new ConfigNotFoundException(ConfigSchema.STANDARD_WORKSPACE, slug); + throw new ConfigNotFoundException(ConfigSchema.STANDARD_WORKSPACE, UUID.fromString(slug)); } public List listStandardWorkspaces(boolean includeTombstone) @@ -84,12 +84,12 @@ public List listStandardWorkspaces(boolean includeTombstone) } public void writeStandardWorkspace(final StandardWorkspace workspace) throws JsonValidationException, IOException { - persistence.writeConfig(ConfigSchema.STANDARD_WORKSPACE, workspace.getWorkspaceId().toString(), workspace); + persistence.writeConfig(ConfigSchema.STANDARD_WORKSPACE, workspace.getWorkspaceId(), workspace); } public StandardSourceDefinition getStandardSourceDefinition(final UUID sourceDefinitionId) throws JsonValidationException, IOException, ConfigNotFoundException { - return persistence.getConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, sourceDefinitionId.toString(), StandardSourceDefinition.class); + return persistence.getConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, sourceDefinitionId, StandardSourceDefinition.class); } public StandardSourceDefinition getSourceDefinitionFromSource(UUID sourceId) { @@ -115,12 +115,12 @@ public List listStandardSources() throws JsonValidatio } public void writeStandardSource(final StandardSourceDefinition source) throws JsonValidationException, IOException { - persistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, source.getSourceDefinitionId().toString(), source); + persistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, source.getSourceDefinitionId(), source); } public StandardDestinationDefinition getStandardDestinationDefinition(final UUID destinationDefinitionId) throws JsonValidationException, IOException, ConfigNotFoundException { - return persistence.getConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destinationDefinitionId.toString(), + return persistence.getConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destinationDefinitionId, StandardDestinationDefinition.class); } @@ -150,16 +150,16 @@ public void writeStandardDestinationDefinition(final StandardDestinationDefiniti throws JsonValidationException, IOException { persistence.writeConfig( ConfigSchema.STANDARD_DESTINATION_DEFINITION, - destinationDefinition.getDestinationDefinitionId().toString(), + destinationDefinition.getDestinationDefinitionId(), destinationDefinition); } public SourceConnection getSourceConnection(final UUID sourceId) throws JsonValidationException, IOException, ConfigNotFoundException { - return persistence.getConfig(ConfigSchema.SOURCE_CONNECTION, sourceId.toString(), SourceConnection.class); + return persistence.getConfig(ConfigSchema.SOURCE_CONNECTION, sourceId, SourceConnection.class); } public void writeSourceConnection(final SourceConnection source) throws JsonValidationException, IOException { - persistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, source.getSourceId().toString(), source); + persistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, source.getSourceId(), source); } public List listSourceConnection() throws JsonValidationException, IOException { @@ -168,11 +168,11 @@ public List listSourceConnection() throws JsonValidationExcept public DestinationConnection getDestinationConnection(final UUID destinationId) throws JsonValidationException, IOException, ConfigNotFoundException { - return persistence.getConfig(ConfigSchema.DESTINATION_CONNECTION, destinationId.toString(), DestinationConnection.class); + return persistence.getConfig(ConfigSchema.DESTINATION_CONNECTION, destinationId, DestinationConnection.class); } public void writeDestinationConnection(DestinationConnection destinationConnection) throws JsonValidationException, IOException { - persistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, destinationConnection.getDestinationId().toString(), destinationConnection); + persistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, destinationConnection.getDestinationId(), destinationConnection); } public List listDestinationConnection() throws JsonValidationException, IOException { @@ -180,11 +180,11 @@ public List listDestinationConnection() throws JsonValida } public StandardSync getStandardSync(final UUID connectionId) throws JsonValidationException, IOException, ConfigNotFoundException { - return persistence.getConfig(ConfigSchema.STANDARD_SYNC, connectionId.toString(), StandardSync.class); + return persistence.getConfig(ConfigSchema.STANDARD_SYNC, connectionId, StandardSync.class); } public void writeStandardSync(final StandardSync standardSync) throws JsonValidationException, IOException { - persistence.writeConfig(ConfigSchema.STANDARD_SYNC, standardSync.getConnectionId().toString(), standardSync); + persistence.writeConfig(ConfigSchema.STANDARD_SYNC, standardSync.getConnectionId(), standardSync); } public List listStandardSyncs() throws ConfigNotFoundException, IOException, JsonValidationException { @@ -192,11 +192,11 @@ public List listStandardSyncs() throws ConfigNotFoundException, IO } public StandardSyncOperation getStandardSyncOperation(final UUID operationId) throws JsonValidationException, IOException, ConfigNotFoundException { - return persistence.getConfig(ConfigSchema.STANDARD_SYNC_OPERATION, operationId.toString(), StandardSyncOperation.class); + return persistence.getConfig(ConfigSchema.STANDARD_SYNC_OPERATION, operationId, StandardSyncOperation.class); } public void writeStandardSyncOperation(final StandardSyncOperation standardSyncOperation) throws JsonValidationException, IOException { - persistence.writeConfig(ConfigSchema.STANDARD_SYNC_OPERATION, standardSyncOperation.getOperationId().toString(), standardSyncOperation); + persistence.writeConfig(ConfigSchema.STANDARD_SYNC_OPERATION, standardSyncOperation.getOperationId(), standardSyncOperation); } public List listStandardSyncOperations() throws IOException, JsonValidationException { diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DefaultConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DefaultConfigPersistence.java index 6841097e73322..984cdee958d2f 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DefaultConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DefaultConfigPersistence.java @@ -35,6 +35,7 @@ import java.nio.file.Path; import java.util.Collections; import java.util.List; +import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -58,7 +59,7 @@ public DefaultConfigPersistence(final Path storageRoot, final JsonSchemaValidato } @Override - public T getConfig(final ConfigSchema configType, final String configId, final Class clazz) + public T getConfig(final ConfigSchema configType, final UUID configId, final Class clazz) throws ConfigNotFoundException, JsonValidationException, IOException { synchronized (lock) { return getConfigInternal(configType, configId, clazz); @@ -73,13 +74,13 @@ public List listConfigs(ConfigSchema configType, Class clazz) throws J } @Override - public void writeConfig(ConfigSchema configType, String configId, T config) throws JsonValidationException, IOException { + public void writeConfig(ConfigSchema configType, UUID configId, T config) throws JsonValidationException, IOException { synchronized (lock) { writeConfigInternal(configType, configId, config); } } - private T getConfigInternal(ConfigSchema configType, String configId, Class clazz) + private T getConfigInternal(ConfigSchema configType, UUID configId, Class clazz) throws ConfigNotFoundException, JsonValidationException, IOException { // validate file with schema final Path configPath = buildConfigPath(configType, configId); @@ -108,7 +109,7 @@ private List listConfigsInternal(ConfigSchema configType, Class clazz) final List configs = Lists.newArrayList(); for (String id : ids) { try { - configs.add(getConfig(configType, id, clazz)); + configs.add(getConfig(configType, UUID.fromString(id), clazz)); } catch (ConfigNotFoundException e) { // should not happen since we just read the ids from disk. throw new IOException(e); @@ -119,7 +120,7 @@ private List listConfigsInternal(ConfigSchema configType, Class clazz) } } - private void writeConfigInternal(ConfigSchema configType, String configId, T config) throws JsonValidationException, IOException { + private void writeConfigInternal(ConfigSchema configType, UUID configId, T config) throws JsonValidationException, IOException { // validate config with schema validateJson(Jsons.jsonNode(config), configType); @@ -129,8 +130,8 @@ private void writeConfigInternal(ConfigSchema configType, String configId, T Files.writeString(configPath, Jsons.serialize(config)); } - private Path buildConfigPath(ConfigSchema type, String configId) { - return buildTypePath(type).resolve(String.format("%s.json", configId)); + private Path buildConfigPath(ConfigSchema type, UUID configId) { + return buildTypePath(type).resolve(String.format("%s.json", configId.toString())); } private Path buildTypePath(ConfigSchema type) { diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java index fb2be98d07bb9..3f28a59dd94bf 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java @@ -62,7 +62,7 @@ void testWorkspaceWithTrueTombstone() throws ConfigNotFoundException, IOExceptio } void assertReturnsWorkspace(StandardWorkspace workspace) throws ConfigNotFoundException, IOException, JsonValidationException { - when(configPersistence.getConfig(ConfigSchema.STANDARD_WORKSPACE, PersistenceConstants.DEFAULT_WORKSPACE_ID.toString(), StandardWorkspace.class)) + when(configPersistence.getConfig(ConfigSchema.STANDARD_WORKSPACE, PersistenceConstants.DEFAULT_WORKSPACE_ID, StandardWorkspace.class)) .thenReturn(workspace); assertEquals(workspace, configRepository.getStandardWorkspace(PersistenceConstants.DEFAULT_WORKSPACE_ID, true)); diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DefaultConfigPersistenceTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DefaultConfigPersistenceTest.java index b56a5899dcb82..118d06349b4db 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DefaultConfigPersistenceTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DefaultConfigPersistenceTest.java @@ -77,20 +77,20 @@ void setUp() throws IOException { @Test void testReadWriteConfig() throws IOException, JsonValidationException, ConfigNotFoundException { - configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, UUID_1.toString(), SOURCE_1); + configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, UUID_1, SOURCE_1); assertEquals( SOURCE_1, configPersistence.getConfig( ConfigSchema.STANDARD_SOURCE_DEFINITION, - UUID_1.toString(), + UUID_1, StandardSourceDefinition.class)); } @Test void testListConfigs() throws JsonValidationException, IOException { - configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, UUID_1.toString(), SOURCE_1); - configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, UUID_2.toString(), SOURCE_2); + configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, UUID_1, SOURCE_1); + configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, UUID_2, SOURCE_2); assertEquals( Sets.newHashSet(SOURCE_1, SOURCE_2), @@ -107,11 +107,11 @@ void writeConfigWithJsonSchemaRef() throws JsonValidationException, IOException, .withDestinationId(UUID.randomUUID()) .withOperationIds(List.of(UUID.randomUUID())); - configPersistence.writeConfig(ConfigSchema.STANDARD_SYNC, UUID_1.toString(), standardSync); + configPersistence.writeConfig(ConfigSchema.STANDARD_SYNC, UUID_1, standardSync); assertEquals( standardSync, - configPersistence.getConfig(ConfigSchema.STANDARD_SYNC, UUID_1.toString(), StandardSync.class)); + configPersistence.getConfig(ConfigSchema.STANDARD_SYNC, UUID_1, StandardSync.class)); } @Test @@ -122,7 +122,7 @@ void writeConfigInvalidConfig() throws JsonValidationException { assertThrows(JsonValidationException.class, () -> configPersistence.writeConfig( ConfigSchema.STANDARD_SOURCE_DEFINITION, - UUID_1.toString(), + UUID_1, standardSourceDefinition)); } diff --git a/airbyte-db/src/main/java/io/airbyte/db/Databases.java b/airbyte-db/src/main/java/io/airbyte/db/Databases.java index b38f2c47f2879..ffa052925642e 100644 --- a/airbyte-db/src/main/java/io/airbyte/db/Databases.java +++ b/airbyte-db/src/main/java/io/airbyte/db/Databases.java @@ -24,26 +24,33 @@ package io.airbyte.db; +import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.db.jdbc.DefaultJdbcDatabase; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcStreamingQueryConfiguration; import io.airbyte.db.jdbc.StreamingJdbcDatabase; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; import org.apache.commons.dbcp2.BasicDataSource; import org.jooq.SQLDialect; public class Databases { + public static final String POSTGRES_DRIVER = "org.postgresql.Driver"; + public static final String REDSHIFT_DRIVER = "com.amazon.redshift.jdbc.Driver"; + public static final String MSSQL_DRIVER = "com.microsoft.sqlserver.jdbc.SQLServerDriver"; + public static Database createPostgresDatabase(String username, String password, String jdbcConnectionString) { - return createDatabase(username, password, jdbcConnectionString, "org.postgresql.Driver", SQLDialect.POSTGRES); + return createDatabase(username, password, jdbcConnectionString, POSTGRES_DRIVER, SQLDialect.POSTGRES); } public static JdbcDatabase createRedshiftDatabase(String username, String password, String jdbcConnectionString) { - return createJdbcDatabase(username, password, jdbcConnectionString, "com.amazon.redshift.jdbc.Driver"); + return createJdbcDatabase(username, password, jdbcConnectionString, REDSHIFT_DRIVER); } public static Database createSqlServerDatabase(String username, String password, String jdbcConnectionString) { - return createDatabase(username, password, jdbcConnectionString, "com.microsoft.sqlserver.jdbc.SQLServerDriver", SQLDialect.DEFAULT); + return createDatabase(username, password, jdbcConnectionString, MSSQL_DRIVER, SQLDialect.DEFAULT); } public static Database createDatabase(final String username, @@ -109,4 +116,23 @@ private static BasicDataSource createBasicDataSource(final String username, return connectionPool; } + public static String getPostgresJdbcUrl(JsonNode config) { + List additionalParameters = new ArrayList<>(); + + final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:postgresql://%s:%s/%s?", + config.get("host").asText(), + config.get("port").asText(), + config.get("database").asText())); + + if (config.has("ssl") && config.get("ssl").asBoolean()) { + additionalParameters.add("ssl=true"); + additionalParameters.add("sslmode=require"); + } + + if (!additionalParameters.isEmpty()) { + additionalParameters.forEach(x -> jdbcUrl.append(x).append("&")); + } + return jdbcUrl.toString(); + } + } diff --git a/airbyte-db/src/main/java/io/airbyte/db/jdbc/JdbcDatabase.java b/airbyte-db/src/main/java/io/airbyte/db/jdbc/JdbcDatabase.java index 3f40bd384c23c..b15c8c94c628b 100644 --- a/airbyte-db/src/main/java/io/airbyte/db/jdbc/JdbcDatabase.java +++ b/airbyte-db/src/main/java/io/airbyte/db/jdbc/JdbcDatabase.java @@ -31,6 +31,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.List; +import java.util.Optional; import java.util.stream.Stream; /** @@ -50,6 +51,13 @@ default void execute(String sql) throws SQLException { execute(connection -> connection.createStatement().execute(sql)); } + default void executeParameterized(String sql, String... params) throws SQLException { + execute(connection -> { + final PreparedStatement stmt = connection.prepareStatement(sql, params); + stmt.execute(); + }); + } + default void executeWithinTransaction(List queries) throws SQLException { execute(connection -> { connection.setAutoCommit(false); @@ -116,4 +124,30 @@ Stream query(CheckedFunction CheckedFunction recordTransform) throws SQLException; + default Optional querySingle(String sql, CheckedFunction recordTransform, String... params) + throws SQLException { + return query(c -> { + var stmt = c.prepareStatement(sql); + int i = 1; + for (String param : params) { + stmt.setString(i, param); + ++i; + } + return stmt; + }, recordTransform).findFirst(); + } + + default Optional querySingle(String sql, CheckedFunction recordTransform, Object... params) + throws SQLException { + return query(c -> { + var stmt = c.prepareStatement(sql); + int i = 1; + for (Object param : params) { + stmt.setObject(i, param); + ++i; + } + return stmt; + }, recordTransform).findFirst(); + } + } diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java index 296d976c1ff6f..1d9f865c98b21 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java +++ b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java @@ -27,12 +27,11 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; +import io.airbyte.db.Databases; import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination; import io.airbyte.integrations.destination.jdbc.DefaultSqlOperations; -import java.util.ArrayList; -import java.util.List; import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,25 +50,9 @@ public PostgresDestination() { public JsonNode toJdbcConfig(JsonNode config) { final String schema = Optional.ofNullable(config.get("schema")).map(JsonNode::asText).orElse("public"); - List additionalParameters = new ArrayList<>(); - - final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:postgresql://%s:%s/%s?", - config.get("host").asText(), - config.get("port").asText(), - config.get("database").asText())); - - if (config.has("ssl") && config.get("ssl").asBoolean()) { - additionalParameters.add("ssl=true"); - additionalParameters.add("sslmode=require"); - } - - if (!additionalParameters.isEmpty()) { - additionalParameters.forEach(x -> jdbcUrl.append(x).append("&")); - } - final ImmutableMap.Builder configBuilder = ImmutableMap.builder() .put("username", config.get("username").asText()) - .put("jdbc_url", jdbcUrl.toString()) + .put("jdbc_url", Databases.getPostgresJdbcUrl(config)) .put("schema", schema); if (config.has("password")) { diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java index 3af7889e84aaa..94a1740b98b39 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java @@ -172,7 +172,7 @@ public DestinationRead getDestination(DestinationIdRequestBody destinationIdRequ final DestinationConnection dci = configRepository.getDestinationConnection(destinationId); if (dci.getTombstone()) { - throw new ConfigNotFoundException(ConfigSchema.DESTINATION_CONNECTION, destinationId.toString()); + throw new ConfigNotFoundException(ConfigSchema.DESTINATION_CONNECTION, destinationId); } return buildDestinationRead(destinationIdRequestBody.getDestinationId()); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/OperationsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/OperationsHandler.java index ddf3135137af9..c74cd41831636 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/OperationsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/OperationsHandler.java @@ -158,7 +158,7 @@ public void deleteOperation(OperationIdRequestBody operationIdRequestBody) standardSyncOperation.withTombstone(true); configRepository.writeStandardSyncOperation(standardSyncOperation); } else { - throw new ConfigNotFoundException(ConfigSchema.STANDARD_SYNC_OPERATION, operationId.toString()); + throw new ConfigNotFoundException(ConfigSchema.STANDARD_SYNC_OPERATION, operationId); } } @@ -168,7 +168,7 @@ private OperationRead buildOperationRead(UUID operationId) if (standardSyncOperation != null) { return buildOperationRead(standardSyncOperation); } else { - throw new ConfigNotFoundException(ConfigSchema.STANDARD_SYNC_OPERATION, operationId.toString()); + throw new ConfigNotFoundException(ConfigSchema.STANDARD_SYNC_OPERATION, operationId); } } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java index d2a4be90f74ec..52aa25cec4a35 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java @@ -132,7 +132,7 @@ public SourceRead getSource(SourceIdRequestBody sourceIdRequestBody) throws Json final SourceConnection sourceConnection = configRepository.getSourceConnection(sourceId); if (sourceConnection.getTombstone()) { - throw new ConfigNotFoundException(ConfigSchema.SOURCE_CONNECTION, sourceId.toString()); + throw new ConfigNotFoundException(ConfigSchema.SOURCE_CONNECTION, sourceId); } return buildSourceRead(sourceId); diff --git a/settings.gradle b/settings.gradle index f781d0d65417c..5715bb253df7c 100644 --- a/settings.gradle +++ b/settings.gradle @@ -21,6 +21,7 @@ include ':airbyte-commons' include ':airbyte-config:models' include ':airbyte-config:init' include ':airbyte-config:persistence' +include ':airbyte-config:db-persistence' include ':airbyte-db' include ':airbyte-e2e-testing' include ':airbyte-integrations:bases:airbyte-protocol'