Skip to content

🐛 Postgres Source Strict Encrypt: Allow connections with sslmodes 'allow' and 'prefer' if SSH tunnel established #19551

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -1185,7 +1185,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 1.0.25
dockerImageTag: 1.0.26
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10956,7 +10956,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:1.0.25"
- dockerImage: "airbyte/source-postgres:1.0.26"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.25
LABEL io.airbyte.version=1.0.26
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.25
LABEL io.airbyte.version=1.0.26
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,6 @@ public class PostgresSource extends AbstractJdbcSource<JDBCType> implements Sour
public static final String SSL_KEY = "sslkey";
public static final String SSL_PASSWORD = "sslpassword";
public static final String MODE = "mode";
static final Map<String, String> SSL_JDBC_PARAMETERS = ImmutableMap.of(
"ssl", "true",
"sslmode", "require");
private List<String> schemas;
private final FeatureFlags featureFlags;
private static final Set<String> INVALID_CDC_SSL_MODES = ImmutableSet.of("allow", "prefer");
Expand All @@ -109,11 +106,7 @@ public static Source sshWrappedSource() {

@Override
protected Map<String, String> getDefaultConnectionProperties(final JsonNode config) {
if (JdbcUtils.useSsl(config)) {
return SSL_JDBC_PARAMETERS;
} else {
return Collections.emptyMap();
}
return Collections.emptyMap();
}

@Override
Expand Down Expand Up @@ -174,7 +167,7 @@ public String toJDBCQueryParams(final Map<String, String> sslParams) {
.map((entry) -> {
try {
final String result = switch (entry.getKey()) {
case SSL_MODE -> PARAM_SSLMODE + EQUALS + toSslJdbcParam(SslMode.valueOf(entry.getValue()))
case AbstractJdbcSource.SSL_MODE -> PARAM_SSLMODE + EQUALS + toSslJdbcParam(SslMode.valueOf(entry.getValue()))
+ JdbcUtils.AMPERSAND + PARAM_SSL + EQUALS + (entry.getValue() == DISABLE ? PARAM_SSL_FALSE : PARAM_SSL_TRUE);
case CA_CERTIFICATE_PATH -> SSL_ROOT_CERT + EQUALS + entry.getValue();
case CLIENT_KEY_STORE_URL -> SSL_KEY + EQUALS + Path.of(new URI(entry.getValue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,45 +25,112 @@

public class PostgresSourceStrictEncryptTest {

private final PostgresSourceStrictEncrypt source = new PostgresSourceStrictEncrypt();
private final PostgreSQLContainer<?> postgreSQLContainerNoSSL = new PostgreSQLContainer<>("postgres:13-alpine");
private final PostgreSQLContainer<?> postgreSQLContainerWithSSL =
new PostgreSQLContainer<>(DockerImageName.parse("marcosmarxm/postgres-ssl:dev").asCompatibleSubstituteFor("postgres"))
.withCommand("postgres -c ssl=on -c ssl_cert_file=/var/lib/postgresql/server.crt -c ssl_key_file=/var/lib/postgresql/server.key");
private static final List<String> NON_STRICT_SSL_MODES = List.of("disable", "allow", "prefer");
private static final String SSL_MODE_REQUIRE = "require";

private static final SshBastionContainer bastion = new SshBastionContainer();
private static final Network network = Network.newNetwork();

@Test
void testCheckWithSSlModeDisable() throws Exception {
void testSSlModesDisableAllowPreferWithTunnelIfServerDoesNotSupportSSL() throws Exception {

try (PostgreSQLContainer<?> db = postgreSQLContainerNoSSL.withNetwork(network)) {
bastion.initAndStartBastion(network);
db.start();

for (String sslmode : NON_STRICT_SSL_MODES) {
final AirbyteConnectionStatus connectionStatus = checkWithTunnel(db, sslmode);
assertEquals(AirbyteConnectionStatus.Status.SUCCEEDED, connectionStatus.getStatus());
}

} finally {
bastion.stopAndClose();
}
}

@Test
void testSSlModesDisableAllowPreferWithTunnelIfServerSupportSSL() throws Exception {
try (PostgreSQLContainer<?> db = postgreSQLContainerWithSSL.withNetwork(network)) {

bastion.initAndStartBastion(network);
db.start();
for (String sslmode : NON_STRICT_SSL_MODES) {

final AirbyteConnectionStatus connectionStatus = checkWithTunnel(db, sslmode);
assertEquals(AirbyteConnectionStatus.Status.SUCCEEDED, connectionStatus.getStatus());
}
} finally {
bastion.stopAndClose();
}
}

@Test
void testSSlModesDisableAllowPreferWithFailedTunnelIfServerSupportSSL() throws Exception {
try (PostgreSQLContainer<?> db = postgreSQLContainerWithSSL) {

try (PostgreSQLContainer<?> db = new PostgreSQLContainer<>("postgres:13-alpine").withNetwork(network)) {
bastion.initAndStartBastion(network);
db.start();
for (String sslmode : NON_STRICT_SSL_MODES) {

// stop to enforce ssl for ssl_mode disable
final ImmutableMap.Builder<Object, Object> builderWithSSLModeDisable = getDatabaseConfigBuilderWithSSLMode(db, "disable");
final JsonNode configWithSSLModeDisable = bastion.getTunnelConfig(SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH, builderWithSSLModeDisable);
final AirbyteConnectionStatus connectionStatusForDisabledMode = new PostgresSourceStrictEncrypt().check(configWithSSLModeDisable);
assertEquals(AirbyteConnectionStatus.Status.SUCCEEDED, connectionStatusForDisabledMode.getStatus());
final AirbyteConnectionStatus connectionStatus = checkWithTunnel(db, sslmode);
assertEquals(AirbyteConnectionStatus.Status.FAILED, connectionStatus.getStatus());
assertTrue(connectionStatus.getMessage().contains("Connection is not available"));

}
} finally {
bastion.stopAndClose();
}
}

@Test
void testCheckWithSSlModePrefer() throws Exception {
void testSSlRequiredWithTunnelIfServerDoesNotSupportSSL() throws Exception {

try (PostgreSQLContainer<?> db = new PostgreSQLContainer<>("postgres:13-alpine").withNetwork(network)) {
try (PostgreSQLContainer<?> db = postgreSQLContainerNoSSL.withNetwork(network)) {
bastion.initAndStartBastion(network);
db.start();
// continue to enforce ssl because ssl mode is prefer
final ImmutableMap.Builder<Object, Object> builderWithSSLModePrefer = getDatabaseConfigBuilderWithSSLMode(db, "prefer");
final JsonNode configWithSSLModePrefer = bastion.getTunnelConfig(SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH, builderWithSSLModePrefer);
final AirbyteConnectionStatus connectionStatusForPreferredMode = new PostgresSourceStrictEncrypt().check(configWithSSLModePrefer);
assertEquals(AirbyteConnectionStatus.Status.FAILED, connectionStatusForPreferredMode.getStatus());
assertEquals("State code: 08004; Message: The server does not support SSL.", connectionStatusForPreferredMode.getMessage());
final AirbyteConnectionStatus connectionStatus = checkWithTunnel(db, SSL_MODE_REQUIRE);
assertEquals(AirbyteConnectionStatus.Status.FAILED, connectionStatus.getStatus());
assertEquals("State code: 08004; Message: The server does not support SSL.", connectionStatus.getMessage());

} finally {
bastion.stopAndClose();
}
}

@Test
void testSSlRequiredNoTunnelIfServerSupportSSL() throws Exception {

try (PostgreSQLContainer<?> db = postgreSQLContainerWithSSL) {
db.start();

final ImmutableMap<Object, Object> configBuilderWithSSLMode = getDatabaseConfigBuilderWithSSLMode(db, SSL_MODE_REQUIRE).build();
final JsonNode config = Jsons.jsonNode(configBuilderWithSSLMode);
addNoTunnel((ObjectNode) config);
final AirbyteConnectionStatus connectionStatus = source.check(config);
assertEquals(AirbyteConnectionStatus.Status.SUCCEEDED, connectionStatus.getStatus());
}
}

@Test
void testStrictSSLSecuredWithTunnel() throws Exception {

try (PostgreSQLContainer<?> db = postgreSQLContainerWithSSL.withNetwork(network)) {

bastion.initAndStartBastion(network);
db.start();

final AirbyteConnectionStatus connectionStatus = checkWithTunnel(db, SSL_MODE_REQUIRE);
assertEquals(AirbyteConnectionStatus.Status.SUCCEEDED, connectionStatus.getStatus());
} finally {
bastion.stopAndClose();
}
}

private ImmutableMap.Builder<Object, Object> getDatabaseConfigBuilderWithSSLMode(PostgreSQLContainer<?> db, String sslMode) {
return ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, Objects.requireNonNull(db.getContainerInfo()
Expand Down Expand Up @@ -94,53 +161,26 @@ private JsonNode getMockedSSLConfig(String sslMode) {

@Test
void testSslModesUnsecuredNoTunnel() throws Exception {
for (String sslMode : List.of("disable", "allow", "prefer")) {
for (String sslMode : NON_STRICT_SSL_MODES) {
final JsonNode config = getMockedSSLConfig(sslMode);
((ObjectNode) config).putIfAbsent("tunnel_method", Jsons.jsonNode(ImmutableMap.builder()
.put("tunnel_method", "NO_TUNNEL")
.build()));
addNoTunnel((ObjectNode) config);

final AirbyteConnectionStatus actual = new PostgresSourceStrictEncrypt().check(config);
assertEquals(AirbyteConnectionStatus.Status.FAILED, actual.getStatus());
assertTrue(actual.getMessage().contains("Unsecured connection not allowed"));
final AirbyteConnectionStatus connectionStatus = source.check(config);
assertEquals(AirbyteConnectionStatus.Status.FAILED, connectionStatus.getStatus());
assertTrue(connectionStatus.getMessage().contains("Unsecured connection not allowed"));
}
}

@Test
void testSslModeRequiredNoTunnel() throws Exception {

try (PostgreSQLContainer<?> db =
new PostgreSQLContainer<>(DockerImageName.parse("marcosmarxm/postgres-ssl:dev").asCompatibleSubstituteFor("postgres"))
.withCommand("postgres -c ssl=on -c ssl_cert_file=/var/lib/postgresql/server.crt -c ssl_key_file=/var/lib/postgresql/server.key")) {
db.start();

final ImmutableMap<Object, Object> configBuilderWithSslModeRequire = getDatabaseConfigBuilderWithSSLMode(db, "require").build();
final JsonNode config = Jsons.jsonNode(configBuilderWithSslModeRequire);
((ObjectNode) config).putIfAbsent("tunnel_method", Jsons.jsonNode(ImmutableMap.builder()
.put("tunnel_method", "NO_TUNNEL")
.build()));
final AirbyteConnectionStatus connectionStatusForPreferredMode = new PostgresSourceStrictEncrypt().check(config);
assertEquals(AirbyteConnectionStatus.Status.SUCCEEDED, connectionStatusForPreferredMode.getStatus());
}
private AirbyteConnectionStatus checkWithTunnel(PostgreSQLContainer<?> db, String sslmode) throws Exception {
final ImmutableMap.Builder<Object, Object> configBuilderWithSSLMode = getDatabaseConfigBuilderWithSSLMode(db, sslmode);
final JsonNode configWithSSLModeDisable = bastion.getTunnelConfig(SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH, configBuilderWithSSLMode);
return source.check(configWithSSLModeDisable);
}

@Test
void testStrictSSLSecuredWithTunnel() throws Exception {
try (PostgreSQLContainer<?> db =
new PostgreSQLContainer<>(DockerImageName.parse("marcosmarxm/postgres-ssl:dev").asCompatibleSubstituteFor("postgres"))
.withCommand("postgres -c ssl=on -c ssl_cert_file=/var/lib/postgresql/server.crt -c ssl_key_file=/var/lib/postgresql/server.key")
.withNetwork(network)) {

bastion.initAndStartBastion(network);
db.start();

final ImmutableMap.Builder<Object, Object> builderWithSSLModePrefer = getDatabaseConfigBuilderWithSSLMode(db, "require");
final JsonNode configWithSslAndSsh = bastion.getTunnelConfig(SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH, builderWithSSLModePrefer);
final AirbyteConnectionStatus connectionStatusForPreferredMode = new PostgresSourceStrictEncrypt().check(configWithSslAndSsh);
assertEquals(AirbyteConnectionStatus.Status.SUCCEEDED, connectionStatusForPreferredMode.getStatus());
} finally {
bastion.stopAndClose();
}
private static void addNoTunnel(ObjectNode config) {
config.putIfAbsent("tunnel_method", Jsons.jsonNode(ImmutableMap.builder()
.put("tunnel_method", "NO_TUNNEL")
.build()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -199,18 +199,6 @@ private JsonNode getConfig(final PostgreSQLContainer<?> psqlDb, final String dbN
.build());
}

private JsonNode getConfigWithSsl(final PostgreSQLContainer<?> psqlDb, final String dbName) {
return Jsons.jsonNode(ImmutableMap.builder()
.put("host", psqlDb.getHost())
.put("port", psqlDb.getFirstMappedPort())
.put("database", dbName)
.put("schemas", List.of(SCHEMA_NAME))
.put("username", psqlDb.getUsername())
.put("password", psqlDb.getPassword())
.put("ssl", true)
.build());
}

private JsonNode getConfig(final PostgreSQLContainer<?> psqlDb, final String dbName, final String user, final String password) {
return Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, psqlDb.getHost())
Expand Down Expand Up @@ -481,22 +469,6 @@ void testIsCdc() {
assertTrue(PostgresUtils.isCdc(config));
}

@Test
void testGetDefaultConnectionPropertiesWithoutSsl() {
final JsonNode config = getConfig(PSQL_DB, dbName);
final Map<String, String> defaultConnectionProperties = new PostgresSource().getDefaultConnectionProperties(config);
assertEquals(defaultConnectionProperties, Collections.emptyMap());
};

@Test
void testGetDefaultConnectionPropertiesWithSsl() {
final JsonNode config = getConfigWithSsl(PSQL_DB, dbName);
final Map<String, String> defaultConnectionProperties = new PostgresSource().getDefaultConnectionProperties(config);
assertEquals(defaultConnectionProperties, ImmutableMap.of(
"ssl", "true",
"sslmode", "require"));
};

@Test
void testGetUsername() {
final String username = "airbyte-user";
Expand Down Expand Up @@ -568,7 +540,8 @@ private JsonNode buildConfigEscapingNeeded() {
JdbcUtils.HOST_KEY, "localhost",
JdbcUtils.PORT_KEY, 1111,
JdbcUtils.USERNAME_KEY, "user",
JdbcUtils.DATABASE_KEY, "db/foo"));
JdbcUtils.DATABASE_KEY, "db/foo",
JdbcUtils.SSL_KEY, "false"));
}

}
1 change: 1 addition & 0 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ The root causes is that the WALs needed for the incremental sync has been remove

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-------------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.0.26 | 2022-11-18 | [19551](https://github.com/airbytehq/airbyte/pull/19551) | Fixes bug with ssl modes |
| 1.0.25 | 2022-11-16 | [19004](https://github.com/airbytehq/airbyte/pull/19004) | Use Debezium heartbeats to improve CDC replication of large databases. |
| 1.0.24 | 2022-11-07 | [19291](https://github.com/airbytehq/airbyte/pull/19291) | Default timeout is reduced from 1 min to 10sec |
| 1.0.23 | 2022-11-07 | [19025](https://github.com/airbytehq/airbyte/pull/19025) | Stop enforce SSL if ssl mode is disabled |
Expand Down