Skip to content

Commit 5edf3fe

Browse files
🚨 Validate source JDBC url parameters 🚨 (#14586)
* Moved toDatabaseConfigStatic subfunction into the main toDatabaseConfig function * Moved Postgres JDBC_URL_PARAMS_KEY into JdbcUtils to reduce redundancy * Migrated useSsl method into JdbcUtils to remove duplication * Created constants for config parameters to match PostgresDestination * Add validation check for overwritten connection properties * Added test coverage for useSsl and clarifying javadoc comments * Updated useSsl tests to match linter and javadoc comments for useSsl * Fixed isCDC method with proper PostgresUtil function and corresponding imports * Bumping Dockerfile and adding changelog description * auto-bump connector version Co-authored-by: Octavia Squidington III <[email protected]>
1 parent 43eb5d0 commit 5edf3fe

File tree

13 files changed

+267
-110
lines changed

13 files changed

+267
-110
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -754,7 +754,7 @@
754754
- name: Postgres
755755
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
756756
dockerRepository: airbyte/source-postgres
757-
dockerImageTag: 0.4.32
757+
dockerImageTag: 0.4.33
758758
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
759759
icon: postgresql.svg
760760
sourceType: database

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6960,7 +6960,7 @@
69606960
supportsNormalization: false
69616961
supportsDBT: false
69626962
supported_destination_sync_modes: []
6963-
- dockerImage: "airbyte/source-postgres:0.4.32"
6963+
- dockerImage: "airbyte/source-postgres:0.4.33"
69646964
spec:
69656965
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
69666966
connectionSpecification:

airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212

1313
public class JdbcUtils {
1414

15+
public static final String JDBC_URL_PARAMS_KEY = "jdbc_url_params";
16+
public static final String SSL_KEY = "ssl";
17+
1518
private static final JdbcSourceOperations defaultSourceOperations = new JdbcSourceOperations();
1619

1720
private static final JSONFormat defaultJSONFormat = new JSONFormat().recordFormat(JSONFormat.RecordFormat.OBJECT);
@@ -62,4 +65,15 @@ public static Map<String, String> parseJdbcParameters(final String jdbcPropertie
6265
return parameters;
6366
}
6467

68+
/**
69+
* Checks that SSL_KEY has not been set or that an SSL_KEY is set and value can be mapped to true
70+
* (e.g. non-zero integers, string true, etc)
71+
*
72+
* @param config A configuration used to check Jdbc connection
73+
* @return true: if ssl has not been set or it has been set with true, false: in all other cases
74+
*/
75+
public static boolean useSsl(final JsonNode config) {
76+
return !config.has(SSL_KEY) || config.get(SSL_KEY).asBoolean();
77+
}
78+
6579
}

airbyte-db/db-lib/src/test/java/io/airbyte/db/jdbc/TestJdbcUtils.java

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
package io.airbyte.db.jdbc;
66

77
import static org.junit.jupiter.api.Assertions.assertEquals;
8+
import static org.junit.jupiter.api.Assertions.assertFalse;
9+
import static org.junit.jupiter.api.Assertions.assertTrue;
810

911
import com.fasterxml.jackson.databind.JsonNode;
1012
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -41,6 +43,8 @@
4143

4244
public class TestJdbcUtils {
4345

46+
private String dbName;
47+
4448
private static final List<JsonNode> RECORDS_AS_JSON = Lists.newArrayList(
4549
Jsons.jsonNode(ImmutableMap.of("id", 1, "name", "picard")),
4650
Jsons.jsonNode(ImmutableMap.of("id", 2, "name", "crusher")),
@@ -60,7 +64,7 @@ static void init() {
6064

6165
@BeforeEach
6266
void setup() throws Exception {
63-
final String dbName = Strings.addRandomSuffix("db", "_", 10);
67+
dbName = Strings.addRandomSuffix("db", "_", 10);
6468

6569
final JsonNode config = getConfig(PSQL_DB, dbName);
6670

@@ -95,6 +99,18 @@ private JsonNode getConfig(final PostgreSQLContainer<?> psqlDb, final String dbN
9599
.build());
96100
}
97101

102+
// Takes in a generic sslValue because useSsl maps sslValue to a boolean
103+
private <T> JsonNode getConfigWithSsl(final PostgreSQLContainer<?> psqlDb, final String dbName, final T sslValue) {
104+
return Jsons.jsonNode(ImmutableMap.builder()
105+
.put("host", psqlDb.getHost())
106+
.put("port", psqlDb.getFirstMappedPort())
107+
.put("database", dbName)
108+
.put("username", psqlDb.getUsername())
109+
.put("password", psqlDb.getPassword())
110+
.put("ssl", sslValue)
111+
.build());
112+
}
113+
98114
@Test
99115
void testRowToJson() throws SQLException {
100116
try (final Connection connection = dataSource.getConnection()) {
@@ -157,6 +173,41 @@ void testSetStatementField() throws SQLException {
157173
}
158174
}
159175

176+
@Test
177+
void testUseSslWithSslNotSet() {
178+
final JsonNode config = getConfig(PSQL_DB, dbName);
179+
final boolean sslSet = JdbcUtils.useSsl(config);
180+
assertTrue(sslSet);
181+
}
182+
183+
@Test
184+
void testUseSslWithSslSetAndValueStringFalse() {
185+
final JsonNode config = getConfigWithSsl(PSQL_DB, dbName, "false");
186+
final boolean sslSet = JdbcUtils.useSsl(config);
187+
assertFalse(sslSet);
188+
}
189+
190+
@Test
191+
void testUseSslWithSslSetAndValueIntegerFalse() {
192+
final JsonNode config = getConfigWithSsl(PSQL_DB, dbName, 0);
193+
final boolean sslSet = JdbcUtils.useSsl(config);
194+
assertFalse(sslSet);
195+
}
196+
197+
@Test
198+
void testUseSslWithSslSetAndValueStringTrue() {
199+
final JsonNode config = getConfigWithSsl(PSQL_DB, dbName, "true");
200+
final boolean sslSet = JdbcUtils.useSsl(config);
201+
assertTrue(sslSet);
202+
}
203+
204+
@Test
205+
void testUssSslWithSslSetAndValueIntegerTrue() {
206+
final JsonNode config = getConfigWithSsl(PSQL_DB, dbName, 3);
207+
final boolean sslSet = JdbcUtils.useSsl(config);
208+
assertTrue(sslSet);
209+
}
210+
160211
private static void createTableWithAllTypes(final Connection connection) throws SQLException {
161212
// jdbctype not included because they are not directly supported in postgres: TINYINT, LONGVARCHAR,
162213
// VARBINAR, LONGVARBINARY

airbyte-integrations/connectors/destination-clickhouse/src/main/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestination.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.airbyte.db.factory.DataSourceFactory;
1111
import io.airbyte.db.factory.DatabaseDriver;
1212
import io.airbyte.db.jdbc.JdbcDatabase;
13+
import io.airbyte.db.jdbc.JdbcUtils;
1314
import io.airbyte.integrations.base.Destination;
1415
import io.airbyte.integrations.base.IntegrationRunner;
1516
import io.airbyte.integrations.base.ssh.SshWrappedDestination;
@@ -65,9 +66,6 @@ public JsonNode toJdbcConfig(final JsonNode config) {
6566
return Jsons.jsonNode(configBuilder.build());
6667
}
6768

68-
private boolean useSsl(final JsonNode config) {
69-
return !config.has("ssl") || config.get("ssl").asBoolean();
70-
}
7169

7270
@Override
7371
public AirbyteConnectionStatus check(final JsonNode config) {
@@ -94,7 +92,7 @@ public AirbyteConnectionStatus check(final JsonNode config) {
9492

9593
@Override
9694
protected Map<String, String> getDefaultConnectionProperties(final JsonNode config) {
97-
if (useSsl(config)) {
95+
if (JdbcUtils.useSsl(config)) {
9896
return SSL_JDBC_PARAMETERS;
9997
} else {
10098
// No need for any parameters if the connection doesn't use SSL except socket_timeout

airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@ public abstract class AbstractJdbcDestination extends BaseConnector implements D
3131

3232
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractJdbcDestination.class);
3333

34-
public static final String JDBC_URL_PARAMS_KEY = "jdbc_url_params";
35-
3634
private final String driverClass;
3735
private final NamingConventionTransformer namingResolver;
3836
private final SqlOperations sqlOperations;
@@ -105,7 +103,7 @@ protected JdbcDatabase getDatabase(final DataSource dataSource) {
105103
}
106104

107105
protected Map<String, String> getConnectionProperties(final JsonNode config) {
108-
final Map<String, String> customProperties = JdbcUtils.parseJdbcParameters(config, JDBC_URL_PARAMS_KEY);
106+
final Map<String, String> customProperties = JdbcUtils.parseJdbcParameters(config, JdbcUtils.JDBC_URL_PARAMS_KEY);
109107
final Map<String, String> defaultProperties = getDefaultConnectionProperties(config);
110108
assertCustomParametersDontOverwriteDefaultParameters(customProperties, defaultProperties);
111109
return MoreMaps.merge(customProperties, defaultProperties);

airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.airbyte.db.factory.DataSourceFactory;
1212
import io.airbyte.db.factory.DatabaseDriver;
1313
import io.airbyte.db.jdbc.JdbcDatabase;
14+
import io.airbyte.db.jdbc.JdbcUtils;
1415
import io.airbyte.integrations.base.Destination;
1516
import io.airbyte.integrations.base.IntegrationRunner;
1617
import io.airbyte.integrations.base.ssh.SshWrappedDestination;
@@ -99,16 +100,13 @@ public MySQLDestination() {
99100

100101
@Override
101102
protected Map<String, String> getDefaultConnectionProperties(final JsonNode config) {
102-
if (useSSL(config)) {
103+
if (JdbcUtils.useSsl(config)) {
103104
return DEFAULT_SSL_JDBC_PARAMETERS;
104105
} else {
105106
return DEFAULT_JDBC_PARAMETERS;
106107
}
107108
}
108109

109-
private boolean useSSL(final JsonNode config) {
110-
return !config.has(SSL_KEY) || config.get(SSL_KEY).asBoolean();
111-
}
112110

113111
@Override
114112
public JsonNode toJdbcConfig(final JsonNode config) {

airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.google.common.collect.ImmutableMap;
99
import io.airbyte.commons.json.Jsons;
1010
import io.airbyte.db.factory.DatabaseDriver;
11+
import io.airbyte.db.jdbc.JdbcUtils;
1112
import io.airbyte.integrations.base.Destination;
1213
import io.airbyte.integrations.base.IntegrationRunner;
1314
import io.airbyte.integrations.base.ssh.SshWrappedDestination;
@@ -28,9 +29,7 @@ public class PostgresDestination extends AbstractJdbcDestination implements Dest
2829
public static final List<String> PORT_KEY = List.of("port");
2930
public static final String DATABASE_KEY = "database";
3031
public static final String JDBC_URL_KEY = "jdbc_url";
31-
public static final String JDBC_URL_PARAMS_KEY = "jdbc_url_params";
3232
public static final String PASSWORD_KEY = "password";
33-
public static final String SSL_KEY = "ssl";
3433
public static final String USERNAME_KEY = "username";
3534
public static final String SCHEMA_KEY = "schema";
3635

@@ -48,7 +47,7 @@ public PostgresDestination() {
4847

4948
@Override
5049
protected Map<String, String> getDefaultConnectionProperties(final JsonNode config) {
51-
if (useSsl(config)) {
50+
if (JdbcUtils.useSsl(config)) {
5251
return SSL_JDBC_PARAMETERS;
5352
} else {
5453
// No need for any parameters if the connection doesn't use SSL
@@ -74,16 +73,13 @@ public JsonNode toJdbcConfig(final JsonNode config) {
7473
configBuilder.put(PASSWORD_KEY, config.get(PASSWORD_KEY).asText());
7574
}
7675

77-
if (config.has(JDBC_URL_PARAMS_KEY)) {
78-
configBuilder.put(JDBC_URL_PARAMS_KEY, config.get(JDBC_URL_PARAMS_KEY).asText());
76+
if (config.has(JdbcUtils.JDBC_URL_PARAMS_KEY)) {
77+
configBuilder.put(JdbcUtils.JDBC_URL_PARAMS_KEY, config.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText());
7978
}
8079

8180
return Jsons.jsonNode(configBuilder.build());
8281
}
8382

84-
private boolean useSsl(final JsonNode config) {
85-
return !config.has(SSL_KEY) || config.get(SSL_KEY).asBoolean();
86-
}
8783

8884
public static void main(final String[] args) throws Exception {
8985
final Destination destination = PostgresDestination.sshWrappedDestination();

airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.common.collect.ImmutableMap;
2424
import io.airbyte.commons.functional.CheckedConsumer;
2525
import io.airbyte.commons.json.Jsons;
26+
import io.airbyte.commons.map.MoreMaps;
2627
import io.airbyte.commons.util.AutoCloseableIterator;
2728
import io.airbyte.commons.util.AutoCloseableIterators;
2829
import io.airbyte.db.JdbcCompatibleSourceOperations;
@@ -49,6 +50,7 @@
4950
import java.util.HashSet;
5051
import java.util.List;
5152
import java.util.Map;
53+
import java.util.Objects;
5254
import java.util.Set;
5355
import java.util.function.Predicate;
5456
import java.util.function.Supplier;
@@ -297,7 +299,7 @@ protected DataSource createDataSource(final JsonNode config) {
297299
jdbcConfig.has("password") ? jdbcConfig.get("password").asText() : null,
298300
driverClass,
299301
jdbcConfig.get("jdbc_url").asText(),
300-
JdbcUtils.parseJdbcParameters(jdbcConfig, "connection_properties", getJdbcParameterDelimiter()));
302+
getConnectionProperties(config));
301303
// Record the data source so that it can be closed.
302304
dataSources.add(dataSource);
303305
return dataSource;
@@ -316,6 +318,48 @@ public JdbcDatabase createDatabase(final JsonNode config) throws SQLException {
316318
return database;
317319
}
318320

321+
/**
322+
* Retrieves connection_properties from config and also validates if custom
323+
* jdbc_url parameters overlap with the default properties
324+
*
325+
* @param config A configuration used to check Jdbc connection
326+
* @return A mapping of connection properties
327+
*/
328+
protected Map<String, String> getConnectionProperties(final JsonNode config) {
329+
final Map<String, String> customProperties = JdbcUtils.parseJdbcParameters(config, JdbcUtils.JDBC_URL_PARAMS_KEY);
330+
final Map<String, String> defaultProperties = getDefaultConnectionProperties(config);
331+
assertCustomParametersDontOverwriteDefaultParameters(customProperties, defaultProperties);
332+
return MoreMaps.merge(customProperties, defaultProperties);
333+
}
334+
335+
/**
336+
* Validates for duplication parameters
337+
*
338+
* @param customParameters custom connection properties map as specified by each Jdbc source
339+
* @param defaultParameters connection properties map as specified by each Jdbc source
340+
* @throws IllegalArgumentException
341+
*/
342+
private void assertCustomParametersDontOverwriteDefaultParameters(final Map<String, String> customParameters,
343+
final Map<String, String> defaultParameters) {
344+
for (final String key : defaultParameters.keySet()) {
345+
if (customParameters.containsKey(key) && !Objects.equals(customParameters.get(key), defaultParameters.get(key))) {
346+
throw new IllegalArgumentException("Cannot overwrite default JDBC parameter " + key);
347+
}
348+
}
349+
}
350+
351+
/**
352+
* Retrieves default connection_properties from config
353+
*
354+
* TODO: make this method abstract and add parity features to
355+
* destination connectors
356+
* @param config A configuration used to check Jdbc connection
357+
* @return A mapping of the default connection properties
358+
*/
359+
protected Map<String, String> getDefaultConnectionProperties(final JsonNode config) {
360+
return JdbcUtils.parseJdbcParameters(config, "connection_properties", getJdbcParameterDelimiter());
361+
};
362+
319363
protected String getJdbcParameterDelimiter() {
320364
return "&";
321365
}

airbyte-integrations/connectors/source-postgres/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION source-postgres
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=0.4.32
19+
LABEL io.airbyte.version=0.4.33
2020
LABEL io.airbyte.name=airbyte/source-postgres

0 commit comments

Comments
 (0)