Skip to content

Commit 0db33b8

Browse files
authored
Revert "source-postgres: connect with adaptiveFetch=true" (#38365)
1 parent 471c8b3 commit 0db33b8

File tree

5 files changed

+261
-309
lines changed

5 files changed

+261
-309
lines changed

airbyte-integrations/connectors/source-postgres/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
12-
dockerImageTag: 3.4.6
12+
dockerImageTag: 3.4.7
1313
dockerRepository: airbyte/source-postgres
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
1515
githubIssueLabel: source-postgres

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java

+14-26
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,6 @@
3939
import static io.airbyte.integrations.source.postgres.xmin.XminCtidUtils.reclassifyCategorisedCtidStreams;
4040
import static java.util.stream.Collectors.toList;
4141
import static java.util.stream.Collectors.toSet;
42-
import static org.postgresql.PGProperty.ADAPTIVE_FETCH;
43-
import static org.postgresql.PGProperty.CURRENT_SCHEMA;
44-
import static org.postgresql.PGProperty.DEFAULT_ROW_FETCH_SIZE;
45-
import static org.postgresql.PGProperty.MAX_RESULT_BUFFER;
46-
import static org.postgresql.PGProperty.PREPARE_THRESHOLD;
4742

4843
import com.fasterxml.jackson.databind.JsonNode;
4944
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -57,6 +52,7 @@
5752
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
5853
import io.airbyte.cdk.db.jdbc.JdbcUtils;
5954
import io.airbyte.cdk.db.jdbc.StreamingJdbcDatabase;
55+
import io.airbyte.cdk.db.jdbc.streaming.AdaptiveStreamingQueryConfig;
6056
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility;
6157
import io.airbyte.cdk.integrations.base.IntegrationRunner;
6258
import io.airbyte.cdk.integrations.base.Source;
@@ -126,7 +122,6 @@
126122
import java.util.stream.Stream;
127123
import javax.sql.DataSource;
128124
import org.apache.commons.lang3.StringUtils;
129-
import org.postgresql.PGProperty;
130125
import org.slf4j.Logger;
131126
import org.slf4j.LoggerFactory;
132127

@@ -151,14 +146,6 @@ public class PostgresSource extends AbstractJdbcSource<PostgresType> implements
151146
public static final String SSL_MODE_DISABLE = "disable";
152147
public static final String SSL_MODE_REQUIRE = "require";
153148

154-
public static final Map<PGProperty, String> JDBC_CONNECTION_PARAMS = ImmutableMap.of(
155-
// Initialize parameters with prepareThreshold=0 to mitigate pgbouncer errors
156-
// https://github.com/airbytehq/airbyte/issues/24796
157-
PREPARE_THRESHOLD, "0",
158-
DEFAULT_ROW_FETCH_SIZE, "1",
159-
ADAPTIVE_FETCH, "true",
160-
MAX_RESULT_BUFFER, "10percent");
161-
162149
private List<String> schemas;
163150

164151
private Set<AirbyteStreamNameNamespacePair> publicizedTablesInCdc;
@@ -170,7 +157,7 @@ public static Source sshWrappedSource(PostgresSource source) {
170157
}
171158

172159
PostgresSource() {
173-
super(DRIVER_CLASS, PostgresStreamingQueryConfig::new, new PostgresSourceOperations());
160+
super(DRIVER_CLASS, AdaptiveStreamingQueryConfig::new, new PostgresSourceOperations());
174161
this.stateEmissionFrequency = INTERMEDIATE_STATE_EMISSION_FREQUENCY;
175162
}
176163

@@ -189,9 +176,9 @@ public ConnectorSpecification spec() throws Exception {
189176
@Override
190177
public JsonNode toDatabaseConfig(final JsonNode config) {
191178
final List<String> additionalParameters = new ArrayList<>();
192-
for (var e : JDBC_CONNECTION_PARAMS.entrySet()) {
193-
additionalParameters.add(e.getKey().getName() + EQUALS + e.getValue());
194-
}
179+
// Initialize parameters with prepareThreshold=0 to mitigate pgbouncer errors
180+
// https://github.com/airbytehq/airbyte/issues/24796
181+
additionalParameters.add("prepareThreshold=0");
195182

196183
final String encodedDatabaseName = URLEncoder.encode(config.get(JdbcUtils.DATABASE_KEY).asText(), StandardCharsets.UTF_8);
197184

@@ -201,7 +188,7 @@ public JsonNode toDatabaseConfig(final JsonNode config) {
201188
encodedDatabaseName));
202189

203190
if (config.get(JdbcUtils.JDBC_URL_PARAMS_KEY) != null && !config.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText().isEmpty()) {
204-
additionalParameters.add(config.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText());
191+
jdbcUrl.append(config.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText()).append(AMPERSAND);
205192
}
206193

207194
final Map<String, String> sslParameters = parseSSLConfig(config);
@@ -219,10 +206,12 @@ public JsonNode toDatabaseConfig(final JsonNode config) {
219206
}
220207

221208
if (schemas != null && !schemas.isEmpty()) {
222-
additionalParameters.add(CURRENT_SCHEMA.getName() + EQUALS + String.join(",", schemas));
209+
additionalParameters.add("currentSchema=" + String.join(",", schemas));
223210
}
224-
additionalParameters.addAll(toJDBCQueryParams(sslParameters));
225-
jdbcUrl.append(String.join(AMPERSAND, additionalParameters));
211+
212+
additionalParameters.forEach(x -> jdbcUrl.append(x).append("&"));
213+
214+
jdbcUrl.append(toJDBCQueryParams(sslParameters));
226215
LOGGER.debug("jdbc url: {}", jdbcUrl);
227216
final ImmutableMap.Builder<Object, Object> configBuilder = ImmutableMap.builder()
228217
.put(JdbcUtils.USERNAME_KEY, config.get(JdbcUtils.USERNAME_KEY).asText())
@@ -236,9 +225,8 @@ public JsonNode toDatabaseConfig(final JsonNode config) {
236225
return Jsons.jsonNode(configBuilder.build());
237226
}
238227

239-
public List<String> toJDBCQueryParams(final Map<String, String> sslParams) {
240-
return Objects.isNull(sslParams)
241-
? List.of()
228+
public String toJDBCQueryParams(final Map<String, String> sslParams) {
229+
return Objects.isNull(sslParams) ? ""
242230
: sslParams.entrySet()
243231
.stream()
244232
.map((entry) -> {
@@ -255,7 +243,7 @@ public List<String> toJDBCQueryParams(final Map<String, String> sslParams) {
255243
}
256244
})
257245
.filter(s -> Objects.nonNull(s) && !s.isEmpty())
258-
.toList();
246+
.collect(Collectors.joining(JdbcUtils.AMPERSAND));
259247
}
260248

261249
@Override

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresStreamingQueryConfig.java

-25
This file was deleted.

airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java

+2-13
Original file line numberDiff line numberDiff line change
@@ -815,21 +815,10 @@ private ConfiguredAirbyteStream createTableWithInvalidCursorType(final Database
815815
@Test
816816
void testJdbcUrlWithEscapedDatabaseName() {
817817
final JsonNode jdbcConfig = source().toDatabaseConfig(buildConfigEscapingNeeded());
818-
assertEquals("jdbc:postgresql://localhost:1111/db%2Ffoo?" + EXPECTED_DEFAULT_PARAMS,
819-
jdbcConfig.get(JdbcUtils.JDBC_URL_KEY).asText());
818+
assertEquals(EXPECTED_JDBC_ESCAPED_URL, jdbcConfig.get(JdbcUtils.JDBC_URL_KEY).asText());
820819
}
821820

822-
@Test
823-
void testJdbcUrlWithSchemas() {
824-
final JsonNode sourceConfig = buildConfigEscapingNeeded();
825-
((ObjectNode) sourceConfig).set("schemas", Jsons.arrayNode().add("bar").add("baz"));
826-
final JsonNode jdbcConfig = source().toDatabaseConfig(sourceConfig);
827-
assertEquals("jdbc:postgresql://localhost:1111/db%2Ffoo?" + EXPECTED_DEFAULT_PARAMS + "&currentSchema=bar,baz",
828-
jdbcConfig.get(JdbcUtils.JDBC_URL_KEY).asText());
829-
}
830-
831-
private static final String EXPECTED_DEFAULT_PARAMS =
832-
"prepareThreshold=0&defaultRowFetchSize=1&adaptiveFetch=true&maxResultBuffer=10percent";
821+
private static final String EXPECTED_JDBC_ESCAPED_URL = "jdbc:postgresql://localhost:1111/db%2Ffoo?prepareThreshold=0&";
833822

834823
private JsonNode buildConfigEscapingNeeded() {
835824
return Jsons.jsonNode(ImmutableMap.of(

0 commit comments

Comments
 (0)