4
4
5
5
package io .airbyte .integrations .source .postgres ;
6
6
7
+ import static io .airbyte .db .jdbc .JdbcUtils .AMPERSAND ;
8
+ import static io .airbyte .db .jdbc .JdbcUtils .EQUALS ;
7
9
import static io .airbyte .integrations .debezium .AirbyteDebeziumHandler .shouldUseCDC ;
8
10
import static io .airbyte .integrations .source .jdbc .JdbcSSLConnectionUtils .PARAM_CA_CERTIFICATE ;
9
11
import static io .airbyte .integrations .util .PostgresSslConnectionUtils .DISABLE ;
10
12
import static io .airbyte .integrations .util .PostgresSslConnectionUtils .PARAM_SSL_MODE ;
11
- import static io .airbyte .integrations .util .PostgresSslConnectionUtils .obtainConnectionOptions ;
12
13
import static java .util .stream .Collectors .toList ;
13
14
import static java .util .stream .Collectors .toSet ;
14
15
72
73
import java .util .Set ;
73
74
import java .util .function .Supplier ;
74
75
import java .util .stream .Collectors ;
75
- import org .postgresql .ssl .DefaultJavaSSLFactory ;
76
- import org .postgresql .ssl .LibPQFactory ;
76
+ import org .postgresql .jdbc .SslMode ;
77
77
import org .slf4j .Logger ;
78
78
import org .slf4j .LoggerFactory ;
79
79
@@ -82,7 +82,16 @@ public class PostgresSource extends AbstractJdbcSource<JDBCType> implements Sour
82
82
private static final Logger LOGGER = LoggerFactory .getLogger (PostgresSource .class );
83
83
private static final int INTERMEDIATE_STATE_EMISSION_FREQUENCY = 10_000 ;
84
84
85
+ public static final String PARAM_SSLMODE = "sslmode" ;
86
+ public static final String PARAM_SSL = "ssl" ;
87
+ public static final String PARAM_SSL_TRUE = "true" ;
88
+ public static final String PARAM_SSL_FALSE = "false" ;
89
+ public static final String SSL_ROOT_CERT = "sslrootcert" ;
90
+
85
91
static final String DRIVER_CLASS = DatabaseDriver .POSTGRESQL .getDriverClassName ();
92
+ public static final String CA_CERTIFICATE_PATH = "ca_certificate_path" ;
93
+ public static final String SSL_KEY = "sslkey" ;
94
+ public static final String SSL_PASSWORD = "sslpassword" ;
86
95
static final Map <String , String > SSL_JDBC_PARAMETERS = ImmutableMap .of (
87
96
"ssl" , "true" ,
88
97
"sslmode" , "require" );
@@ -117,38 +126,15 @@ public JsonNode toDatabaseConfig(final JsonNode config) {
117
126
config .get (JdbcUtils .DATABASE_KEY ).asText ()));
118
127
119
128
if (config .get (JdbcUtils .JDBC_URL_PARAMS_KEY ) != null && !config .get (JdbcUtils .JDBC_URL_PARAMS_KEY ).asText ().isEmpty ()) {
120
- jdbcUrl .append (config .get (JdbcUtils .JDBC_URL_PARAMS_KEY ).asText ()).append ("&" );
129
+ jdbcUrl .append (config .get (JdbcUtils .JDBC_URL_PARAMS_KEY ).asText ()).append (AMPERSAND );
121
130
}
122
131
123
132
final Map <String , String > sslParameters = parseSSLConfig (config );
124
133
if (config .has (PARAM_SSL_MODE ) && config .get (PARAM_SSL_MODE ).has (PARAM_CA_CERTIFICATE )) {
125
- LOGGER .info ("*** saving CA cert to file" );
126
- sslParameters .put ("ca_certificate_path" , JdbcSSLConnectionUtils .fileFromCertPem (config .get (PARAM_SSL_MODE ).get (PARAM_CA_CERTIFICATE ).asText ()).toString ());
127
- LOGGER .info ("*** crt file: {}" , sslParameters .get ("ca_certificate_path" ));
134
+ sslParameters .put (CA_CERTIFICATE_PATH , JdbcSSLConnectionUtils .fileFromCertPem (config .get (PARAM_SSL_MODE ).get (PARAM_CA_CERTIFICATE ).asText ()).toString ());
135
+ LOGGER .debug ("root ssl ca crt file: {}" , sslParameters .get (CA_CERTIFICATE_PATH ));
128
136
}
129
137
130
- // System.setProperty("javax.net.ssl.trustStore", sslParameters.get(TRUST_KEY_STORE_URL));
131
- // System.setProperty("javax.net.ssl.trustStorePassword", sslParameters.get(TRUST_KEY_STORE_PASS));
132
-
133
- // // assume ssl if not explicitly mentioned.
134
- // if (!config.has(PARAM_SSL) || config.get(PARAM_SSL).asBoolean()) {
135
- // if (config.has(PARAM_SSL_MODE)) {
136
- // if (DISABLE.equals(config.get(PARAM_SSL_MODE).get(PARAM_MODE).asText())) {
137
- // additionalParameters.add("sslmode=disable");
138
- // } else {
139
- // final var parametersList = obtainConnectionOptions(config.get(PARAM_SSL_MODE))
140
- // .entrySet()
141
- // .stream()
142
- // .map(e -> e.getKey() + "=" + e.getValue())
143
- // .toList();
144
- // additionalParameters.addAll(parametersList);
145
- // }
146
- // } else {
147
- // additionalParameters.add("ssl=true");
148
- // additionalParameters.add("sslmode=require");
149
- // }
150
- // }
151
-
152
138
if (config .has (JdbcUtils .SCHEMAS_KEY ) && config .get (JdbcUtils .SCHEMAS_KEY ).isArray ()) {
153
139
schemas = new ArrayList <>();
154
140
for (final JsonNode schema : config .get (JdbcUtils .SCHEMAS_KEY )) {
@@ -163,8 +149,7 @@ public JsonNode toDatabaseConfig(final JsonNode config) {
163
149
additionalParameters .forEach (x -> jdbcUrl .append (x ).append ("&" ));
164
150
165
151
jdbcUrl .append (toJDBCQueryParams (sslParameters ));
166
- // jdbcUrl.append("&sslfactory=" + DefaultJavaSSLFactory.class.getCanonicalName());
167
- LOGGER .info ("jdbc url: {}" , jdbcUrl .toString ());
152
+ LOGGER .debug ("jdbc url: {}" , jdbcUrl .toString ());
168
153
final Builder <Object , Object > configBuilder = ImmutableMap .builder ()
169
154
.put (JdbcUtils .USERNAME_KEY , config .get (JdbcUtils .USERNAME_KEY ).asText ())
170
155
.put (JdbcUtils .JDBC_URL_KEY , jdbcUrl .toString ());
@@ -178,11 +163,6 @@ public JsonNode toDatabaseConfig(final JsonNode config) {
178
163
return Jsons .jsonNode (configBuilder .build ());
179
164
}
180
165
181
- public static final String PARAM_SSLMODE = "sslmode" ;
182
- public static final String PARAM_SSL = "ssl" ;
183
- public static final String PARAM_SSL_TRUE = "true" ;
184
- public static final String PARAM_SSL_FALSE = "false" ;
185
-
186
166
@ Override
187
167
public String toJDBCQueryParams (final Map <String , String > sslParams ) {
188
168
return Objects .isNull (sslParams ) ? ""
@@ -191,11 +171,11 @@ public String toJDBCQueryParams(final Map<String, String> sslParams) {
191
171
.map ((entry ) -> {
192
172
try {
193
173
final String result = switch (entry .getKey ()) {
194
- case SSL_MODE -> PARAM_SSLMODE + "=" + toSslJdbcParam (SslMode .valueOf (entry .getValue ()))
195
- + JdbcUtils .AMPERSAND + PARAM_SSL + "=" + (entry .getValue () == DISABLE ? PARAM_SSL_FALSE : PARAM_SSL_TRUE );
196
- case "ca_certificate_path" -> "sslrootcert" + "=" + entry .getValue ();
197
- case CLIENT_KEY_STORE_URL -> "sslkey" + "=" + Path .of (new URI (entry .getValue ()));
198
- case CLIENT_KEY_STORE_PASS -> "sslpassword" + "=" + entry .getValue ();
174
+ case SSL_MODE -> PARAM_SSLMODE + EQUALS + toSslJdbcParam (SslMode .valueOf (entry .getValue ()))
175
+ + JdbcUtils .AMPERSAND + PARAM_SSL + EQUALS + (entry .getValue () == DISABLE ? PARAM_SSL_FALSE : PARAM_SSL_TRUE );
176
+ case CA_CERTIFICATE_PATH -> SSL_ROOT_CERT + EQUALS + entry .getValue ();
177
+ case CLIENT_KEY_STORE_URL -> SSL_KEY + EQUALS + Path .of (new URI (entry .getValue ()));
178
+ case CLIENT_KEY_STORE_PASS -> SSL_PASSWORD + EQUALS + entry .getValue ();
199
179
default -> "" ;
200
180
};
201
181
return result ;
@@ -344,7 +324,6 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
344
324
final Map <String , TableInfo <CommonField <JDBCType >>> tableNameToTable ,
345
325
final StateManager stateManager ,
346
326
final Instant emittedAt ) {
347
- LOGGER .info ("*** getIncrementalIterators" );
348
327
final JsonNode sourceConfig = database .getSourceConfig ();
349
328
if (PostgresUtils .isCdc (sourceConfig ) && shouldUseCDC (catalog )) {
350
329
final Duration firstRecordWaitTime = PostgresUtils .getFirstRecordWaitTime (sourceConfig );
@@ -511,12 +490,12 @@ protected String toSslJdbcParam(final SslMode sslMode) {
511
490
512
491
protected static String toSslJdbcParamInternal (final SslMode sslMode ) {
513
492
final var result = switch (sslMode ) {
514
- case DISABLED -> "disable" ;
515
- case ALLOWED -> "allow" ;
516
- case PREFERRED -> "prefer" ;
517
- case REQUIRED -> "require" ;
518
- case VERIFY_CA -> "verify-ca" ;
519
- case VERIFY_IDENTITY -> "verify-full" ;
493
+ case DISABLED -> org . postgresql . jdbc . SslMode . DISABLE . value ;
494
+ case ALLOWED -> org . postgresql . jdbc . SslMode . ALLOW . value ;
495
+ case PREFERRED -> org . postgresql . jdbc . SslMode . PREFER . value ;
496
+ case REQUIRED -> org . postgresql . jdbc . SslMode . REQUIRE . value ;
497
+ case VERIFY_CA -> org . postgresql . jdbc . SslMode . VERIFY_CA . value ;
498
+ case VERIFY_IDENTITY -> org . postgresql . jdbc . SslMode . VERIFY_FULL . value ;
520
499
default -> throw new IllegalArgumentException ("unexpected ssl mode" );
521
500
};
522
501
LOGGER .info ("{} toSslJdbcParam {}" , sslMode .name (), result );
0 commit comments