Skip to content

Commit e2862df

Browse files
author
Marius Posta
committed
Revert "Revert "source-postgres: connect with adaptiveFetch=true" (#38365)"
This reverts commit 0db33b8.
1 parent a281691 commit e2862df

File tree

5 files changed

+309
-261
lines changed

5 files changed

+309
-261
lines changed

airbyte-integrations/connectors/source-postgres/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
12-
dockerImageTag: 3.4.7
12+
dockerImageTag: 3.4.6
1313
dockerRepository: airbyte/source-postgres
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
1515
githubIssueLabel: source-postgres

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java

+26-14
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@
3939
import static io.airbyte.integrations.source.postgres.xmin.XminCtidUtils.reclassifyCategorisedCtidStreams;
4040
import static java.util.stream.Collectors.toList;
4141
import static java.util.stream.Collectors.toSet;
42+
import static org.postgresql.PGProperty.ADAPTIVE_FETCH;
43+
import static org.postgresql.PGProperty.CURRENT_SCHEMA;
44+
import static org.postgresql.PGProperty.DEFAULT_ROW_FETCH_SIZE;
45+
import static org.postgresql.PGProperty.MAX_RESULT_BUFFER;
46+
import static org.postgresql.PGProperty.PREPARE_THRESHOLD;
4247

4348
import com.fasterxml.jackson.databind.JsonNode;
4449
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -52,7 +57,6 @@
5257
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
5358
import io.airbyte.cdk.db.jdbc.JdbcUtils;
5459
import io.airbyte.cdk.db.jdbc.StreamingJdbcDatabase;
55-
import io.airbyte.cdk.db.jdbc.streaming.AdaptiveStreamingQueryConfig;
5660
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility;
5761
import io.airbyte.cdk.integrations.base.IntegrationRunner;
5862
import io.airbyte.cdk.integrations.base.Source;
@@ -122,6 +126,7 @@
122126
import java.util.stream.Stream;
123127
import javax.sql.DataSource;
124128
import org.apache.commons.lang3.StringUtils;
129+
import org.postgresql.PGProperty;
125130
import org.slf4j.Logger;
126131
import org.slf4j.LoggerFactory;
127132

@@ -146,6 +151,14 @@ public class PostgresSource extends AbstractJdbcSource<PostgresType> implements
146151
public static final String SSL_MODE_DISABLE = "disable";
147152
public static final String SSL_MODE_REQUIRE = "require";
148153

154+
public static final Map<PGProperty, String> JDBC_CONNECTION_PARAMS = ImmutableMap.of(
155+
// Initialize parameters with prepareThreshold=0 to mitigate pgbouncer errors
156+
// https://github.com/airbytehq/airbyte/issues/24796
157+
PREPARE_THRESHOLD, "0",
158+
DEFAULT_ROW_FETCH_SIZE, "1",
159+
ADAPTIVE_FETCH, "true",
160+
MAX_RESULT_BUFFER, "10percent");
161+
149162
private List<String> schemas;
150163

151164
private Set<AirbyteStreamNameNamespacePair> publicizedTablesInCdc;
@@ -157,7 +170,7 @@ public static Source sshWrappedSource(PostgresSource source) {
157170
}
158171

159172
PostgresSource() {
160-
super(DRIVER_CLASS, AdaptiveStreamingQueryConfig::new, new PostgresSourceOperations());
173+
super(DRIVER_CLASS, PostgresStreamingQueryConfig::new, new PostgresSourceOperations());
161174
this.stateEmissionFrequency = INTERMEDIATE_STATE_EMISSION_FREQUENCY;
162175
}
163176

@@ -176,9 +189,9 @@ public ConnectorSpecification spec() throws Exception {
176189
@Override
177190
public JsonNode toDatabaseConfig(final JsonNode config) {
178191
final List<String> additionalParameters = new ArrayList<>();
179-
// Initialize parameters with prepareThreshold=0 to mitigate pgbouncer errors
180-
// https://github.com/airbytehq/airbyte/issues/24796
181-
additionalParameters.add("prepareThreshold=0");
192+
for (var e : JDBC_CONNECTION_PARAMS.entrySet()) {
193+
additionalParameters.add(e.getKey().getName() + EQUALS + e.getValue());
194+
}
182195

183196
final String encodedDatabaseName = URLEncoder.encode(config.get(JdbcUtils.DATABASE_KEY).asText(), StandardCharsets.UTF_8);
184197

@@ -188,7 +201,7 @@ public JsonNode toDatabaseConfig(final JsonNode config) {
188201
encodedDatabaseName));
189202

190203
if (config.get(JdbcUtils.JDBC_URL_PARAMS_KEY) != null && !config.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText().isEmpty()) {
191-
jdbcUrl.append(config.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText()).append(AMPERSAND);
204+
additionalParameters.add(config.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText());
192205
}
193206

194207
final Map<String, String> sslParameters = parseSSLConfig(config);
@@ -206,12 +219,10 @@ public JsonNode toDatabaseConfig(final JsonNode config) {
206219
}
207220

208221
if (schemas != null && !schemas.isEmpty()) {
209-
additionalParameters.add("currentSchema=" + String.join(",", schemas));
222+
additionalParameters.add(CURRENT_SCHEMA.getName() + EQUALS + String.join(",", schemas));
210223
}
211-
212-
additionalParameters.forEach(x -> jdbcUrl.append(x).append("&"));
213-
214-
jdbcUrl.append(toJDBCQueryParams(sslParameters));
224+
additionalParameters.addAll(toJDBCQueryParams(sslParameters));
225+
jdbcUrl.append(String.join(AMPERSAND, additionalParameters));
215226
LOGGER.debug("jdbc url: {}", jdbcUrl);
216227
final ImmutableMap.Builder<Object, Object> configBuilder = ImmutableMap.builder()
217228
.put(JdbcUtils.USERNAME_KEY, config.get(JdbcUtils.USERNAME_KEY).asText())
@@ -225,8 +236,9 @@ public JsonNode toDatabaseConfig(final JsonNode config) {
225236
return Jsons.jsonNode(configBuilder.build());
226237
}
227238

228-
public String toJDBCQueryParams(final Map<String, String> sslParams) {
229-
return Objects.isNull(sslParams) ? ""
239+
public List<String> toJDBCQueryParams(final Map<String, String> sslParams) {
240+
return Objects.isNull(sslParams)
241+
? List.of()
230242
: sslParams.entrySet()
231243
.stream()
232244
.map((entry) -> {
@@ -243,7 +255,7 @@ public String toJDBCQueryParams(final Map<String, String> sslParams) {
243255
}
244256
})
245257
.filter(s -> Objects.nonNull(s) && !s.isEmpty())
246-
.collect(Collectors.joining(JdbcUtils.AMPERSAND));
258+
.toList();
247259
}
248260

249261
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.source.postgres;
6+
7+
import io.airbyte.cdk.db.jdbc.streaming.JdbcStreamingQueryConfig;
8+
import java.sql.Connection;
9+
import java.sql.ResultSet;
10+
import java.sql.SQLException;
11+
import java.sql.Statement;
12+
import org.jetbrains.annotations.NotNull;
13+
14+
public class PostgresStreamingQueryConfig implements JdbcStreamingQueryConfig {
15+
16+
@Override
17+
public void initialize(final Connection connection, final @NotNull Statement preparedStatement) throws SQLException {
18+
connection.setAutoCommit(false);
19+
// Nothing else to do, adaptive streaming is enabled via JDBC connection parameters.
20+
}
21+
22+
@Override
23+
public void accept(ResultSet resultSet, Object o) {}
24+
25+
}

airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -815,10 +815,21 @@ private ConfiguredAirbyteStream createTableWithInvalidCursorType(final Database
815815
@Test
816816
void testJdbcUrlWithEscapedDatabaseName() {
817817
final JsonNode jdbcConfig = source().toDatabaseConfig(buildConfigEscapingNeeded());
818-
assertEquals(EXPECTED_JDBC_ESCAPED_URL, jdbcConfig.get(JdbcUtils.JDBC_URL_KEY).asText());
818+
assertEquals("jdbc:postgresql://localhost:1111/db%2Ffoo?" + EXPECTED_DEFAULT_PARAMS,
819+
jdbcConfig.get(JdbcUtils.JDBC_URL_KEY).asText());
819820
}
820821

821-
private static final String EXPECTED_JDBC_ESCAPED_URL = "jdbc:postgresql://localhost:1111/db%2Ffoo?prepareThreshold=0&";
822+
@Test
823+
void testJdbcUrlWithSchemas() {
824+
final JsonNode sourceConfig = buildConfigEscapingNeeded();
825+
((ObjectNode) sourceConfig).set("schemas", Jsons.arrayNode().add("bar").add("baz"));
826+
final JsonNode jdbcConfig = source().toDatabaseConfig(sourceConfig);
827+
assertEquals("jdbc:postgresql://localhost:1111/db%2Ffoo?" + EXPECTED_DEFAULT_PARAMS + "&currentSchema=bar,baz",
828+
jdbcConfig.get(JdbcUtils.JDBC_URL_KEY).asText());
829+
}
830+
831+
private static final String EXPECTED_DEFAULT_PARAMS =
832+
"prepareThreshold=0&defaultRowFetchSize=1&adaptiveFetch=true&maxResultBuffer=10percent";
822833

823834
private JsonNode buildConfigEscapingNeeded() {
824835
return Jsons.jsonNode(ImmutableMap.of(

0 commit comments

Comments
 (0)