Skip to content

Commit df614d4

Browse files
fix-postgres-source: get rid of short lived objects (#21634)
* fix-postgres-source: get rid of short lived objects * cache column info as well * get rid of constructor * upgrade version * auto-bump connector version * update definition --------- Co-authored-by: Octavia Squidington III <[email protected]>
1 parent b87647d commit df614d4

File tree

10 files changed

+67
-46
lines changed

10 files changed

+67
-46
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
- name: AlloyDB for PostgreSQL
4646
sourceDefinitionId: 1fa90628-2b9e-11ed-a261-0242ac120002
4747
dockerRepository: airbyte/source-alloydb
48-
dockerImageTag: 1.0.36
48+
dockerImageTag: 1.0.43
4949
documentationUrl: https://docs.airbyte.com/integrations/sources/alloydb
5050
icon: alloydb.svg
5151
sourceType: database
@@ -1362,7 +1362,7 @@
13621362
- name: Postgres
13631363
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
13641364
dockerRepository: airbyte/source-postgres
1365-
dockerImageTag: 1.0.42
1365+
dockerImageTag: 1.0.43
13661366
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
13671367
icon: postgresql.svg
13681368
sourceType: database

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@
370370
supportsNormalization: false
371371
supportsDBT: false
372372
supported_destination_sync_modes: []
373-
- dockerImage: "airbyte/source-alloydb:1.0.36"
373+
- dockerImage: "airbyte/source-alloydb:1.0.43"
374374
spec:
375375
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
376376
connectionSpecification:
@@ -11606,7 +11606,7 @@
1160611606
supportsNormalization: false
1160711607
supportsDBT: false
1160811608
supported_destination_sync_modes: []
11609-
- dockerImage: "airbyte/source-postgres:1.0.42"
11609+
- dockerImage: "airbyte/source-postgres:1.0.43"
1161011610
spec:
1161111611
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
1161211612
connectionSpecification:

airbyte-integrations/connectors/source-alloydb-strict-encrypt/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION source-alloydb-strict-encrypt
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=1.0.36
19+
LABEL io.airbyte.version=1.0.43
2020
LABEL io.airbyte.name=airbyte/source-alloydb-strict-encrypt

airbyte-integrations/connectors/source-alloydb/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION source-alloydb
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=1.0.36
19+
LABEL io.airbyte.version=1.0.43
2020
LABEL io.airbyte.name=airbyte/source-alloydb

airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=1.0.42
19+
LABEL io.airbyte.version=1.0.43
2020
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt

airbyte-integrations/connectors/source-postgres/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION source-postgres
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=1.0.42
19+
LABEL io.airbyte.version=1.0.43
2020
LABEL io.airbyte.name=airbyte/source-postgres

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java

Lines changed: 50 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,10 @@
4040
import java.time.OffsetDateTime;
4141
import java.time.OffsetTime;
4242
import java.time.format.DateTimeParseException;
43+
import java.util.Arrays;
4344
import java.util.Collections;
45+
import java.util.HashMap;
46+
import java.util.Map;
4447
import org.postgresql.geometric.PGbox;
4548
import org.postgresql.geometric.PGcircle;
4649
import org.postgresql.geometric.PGline;
@@ -60,6 +63,12 @@ public class PostgresSourceOperations extends AbstractJdbcCompatibleSourceOperat
6063
private static final String TIMESTAMPTZ = "timestamptz";
6164
private static final String TIMETZ = "timetz";
6265
private static final ObjectMapper OBJECT_MAPPER = MoreMappers.initMapper();
66+
private static final Map<Integer, PostgresType> POSTGRES_TYPE_DICT = new HashMap<>();
67+
private final Map<String, Map<String, ColumnInfo>> streamColumnInfo = new HashMap<>();
68+
69+
static {
70+
Arrays.stream(PostgresType.class.getEnumConstants()).forEach(c -> POSTGRES_TYPE_DICT.put(c.type, c));
71+
}
6372

6473
@Override
6574
public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
@@ -69,27 +78,6 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
6978
final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap());
7079

7180
for (int i = 1; i <= columnCount; i++) {
72-
final String columnType = metadata.getColumnTypeName(i);
73-
// attempt to access the column. this allows us to know if it is null before we do type-specific
74-
// parsing. if it is null, we can move on. while awkward, this seems to be the agreed upon way of
75-
// checking for null values with jdbc.
76-
77-
if (columnType.equalsIgnoreCase("money")) {
78-
// when a column is of type MONEY, getObject will throw exception
79-
// this is a bug that will not be fixed:
80-
// https://github.com/pgjdbc/pgjdbc/issues/425
81-
// https://github.com/pgjdbc/pgjdbc/issues/1835
82-
queryContext.getString(i);
83-
} else if (columnType.equalsIgnoreCase("bit")) {
84-
// getObject will fail as it tries to parse the value as boolean
85-
queryContext.getString(i);
86-
} else if (columnType.equalsIgnoreCase("numeric") || columnType.equalsIgnoreCase("decimal")) {
87-
// getObject will fail when the value is 'infinity'
88-
queryContext.getDouble(i);
89-
} else {
90-
queryContext.getObject(i);
91-
}
92-
9381
// convert to java types that will convert into reasonable json.
9482
copyToJsonField(queryContext, i, jsonNode);
9583
}
@@ -166,14 +154,14 @@ protected void setDate(final PreparedStatement preparedStatement, final int para
166154
public void copyToJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException {
167155
final PgResultSetMetaData metadata = (PgResultSetMetaData) resultSet.getMetaData();
168156
final String columnName = metadata.getColumnName(colIndex);
169-
final String columnTypeName = metadata.getColumnTypeName(colIndex).toLowerCase();
170-
final PostgresType columnType = safeGetJdbcType(metadata.getColumnType(colIndex));
171-
if (resultSet.getString(colIndex) == null) {
157+
final ColumnInfo columnInfo = getColumnInfo(colIndex, metadata, columnName);
158+
final String value = resultSet.getString(colIndex);
159+
if (value == null) {
172160
json.putNull(columnName);
173161
} else {
174-
switch (columnTypeName) {
162+
switch (columnInfo.columnTypeName) {
175163
case "bool", "boolean" -> putBoolean(json, columnName, resultSet, colIndex);
176-
case "bytea" -> putString(json, columnName, resultSet, colIndex);
164+
case "bytea" -> json.put(columnName, value);
177165
case TIMETZ -> putTimeWithTimezone(json, columnName, resultSet, colIndex);
178166
case TIMESTAMPTZ -> putTimestampWithTimezone(json, columnName, resultSet, colIndex);
179167
case "hstore" -> putHstoreAsJson(json, columnName, resultSet, colIndex);
@@ -199,22 +187,22 @@ public void copyToJsonField(final ResultSet resultSet, final int colIndex, final
199187
case "_timetz" -> putTimeTzArray(json, columnName, resultSet, colIndex);
200188
case "_time" -> putTimeArray(json, columnName, resultSet, colIndex);
201189
default -> {
202-
switch (columnType) {
203-
case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex);
190+
switch (columnInfo.columnType) {
191+
case BOOLEAN -> json.put(columnName, value.equalsIgnoreCase("t"));
204192
case TINYINT, SMALLINT -> putShortInt(json, columnName, resultSet, colIndex);
205193
case INTEGER -> putInteger(json, columnName, resultSet, colIndex);
206194
case BIGINT -> putBigInt(json, columnName, resultSet, colIndex);
207195
case FLOAT, DOUBLE -> putDouble(json, columnName, resultSet, colIndex);
208196
case REAL -> putFloat(json, columnName, resultSet, colIndex);
209197
case NUMERIC, DECIMAL -> putBigDecimal(json, columnName, resultSet, colIndex);
210198
// BIT is a bit string in Postgres, e.g. '0100'
211-
case BIT, CHAR, VARCHAR, LONGVARCHAR -> putString(json, columnName, resultSet, colIndex);
199+
case BIT, CHAR, VARCHAR, LONGVARCHAR -> json.put(columnName, value);
212200
case DATE -> putDate(json, columnName, resultSet, colIndex);
213201
case TIME -> putTime(json, columnName, resultSet, colIndex);
214202
case TIMESTAMP -> putTimestamp(json, columnName, resultSet, colIndex);
215203
case BLOB, BINARY, VARBINARY, LONGVARBINARY -> putBinary(json, columnName, resultSet, colIndex);
216204
case ARRAY -> putArray(json, columnName, resultSet, colIndex);
217-
default -> putDefault(json, columnName, resultSet, colIndex);
205+
default -> json.put(columnName, value);
218206
}
219207
}
220208
}
@@ -412,7 +400,7 @@ public PostgresType getDatabaseFieldType(final JsonNode field) {
412400
case "bytea" -> PostgresType.VARCHAR;
413401
case TIMESTAMPTZ -> PostgresType.TIMESTAMP_WITH_TIMEZONE;
414402
case TIMETZ -> PostgresType.TIME_WITH_TIMEZONE;
415-
default -> PostgresType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt());
403+
default -> PostgresType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt(), POSTGRES_TYPE_DICT);
416404
};
417405
} catch (final IllegalArgumentException ex) {
418406
LOGGER.warn(String.format("Could not convert column: %s from table: %s.%s with type: %s. Casting to VARCHAR.",
@@ -582,4 +570,35 @@ public boolean isCursorType(final PostgresType type) {
582570
return PostgresUtils.ALLOWED_CURSOR_TYPES.contains(type);
583571
}
584572

573+
private ColumnInfo getColumnInfo(final int colIndex, final PgResultSetMetaData metadata, final String columnName) throws SQLException {
574+
final String tableName = metadata.getBaseTableName(colIndex);
575+
final String schemaName = metadata.getBaseSchemaName(colIndex);
576+
final String key = schemaName + tableName;
577+
if (!streamColumnInfo.containsKey(key)) {
578+
streamColumnInfo.clear();
579+
streamColumnInfo.put(key, new HashMap<>(metadata.getColumnCount()));
580+
}
581+
582+
final Map<String, ColumnInfo> stringColumnInfoMap = streamColumnInfo.get(key);
583+
if (stringColumnInfoMap.containsKey(columnName)) {
584+
return stringColumnInfoMap.get(columnName);
585+
} else {
586+
final PostgresType columnType = safeGetJdbcType(metadata.getColumnType(colIndex), POSTGRES_TYPE_DICT);
587+
final ColumnInfo columnInfo = new ColumnInfo(metadata.getColumnTypeName(colIndex).toLowerCase(), columnType);
588+
stringColumnInfoMap.put(columnName, columnInfo);
589+
return columnInfo;
590+
}
591+
}
592+
593+
private static class ColumnInfo {
594+
public String columnTypeName;
595+
public PostgresType columnType;
596+
597+
public ColumnInfo(final String columnTypeName, final PostgresType columnType) {
598+
this.columnTypeName = columnTypeName;
599+
this.columnType = columnType;
600+
}
601+
602+
}
603+
585604
}

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresType.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import java.sql.SQLType;
88
import java.sql.Types;
9+
import java.util.Map;
910

1011
public enum PostgresType implements SQLType {
1112

@@ -74,7 +75,7 @@ public enum PostgresType implements SQLType {
7475
/**
7576
* The Integer value for the JDBCType. It maps to a value in {@code Types.java}
7677
*/
77-
private Integer type;
78+
protected Integer type;
7879

7980
/**
8081
* Constructor to specify the data type value from {@code Types) for
@@ -121,18 +122,17 @@ public Integer getVendorTypeNumber() {
121122
* {@code Types} value
122123
* @see Types
123124
*/
124-
public static PostgresType valueOf(int type) {
125-
for (PostgresType sqlType : PostgresType.class.getEnumConstants()) {
126-
if (type == sqlType.type)
127-
return sqlType;
125+
public static PostgresType valueOf(final int type, final Map<Integer, PostgresType> postgresTypeMap) {
126+
if (postgresTypeMap.containsKey(type)) {
127+
return postgresTypeMap.get(type);
128128
}
129129
throw new IllegalArgumentException("Type:" + type + " is not a valid "
130130
+ "Types.java value.");
131131
}
132132

133-
public static PostgresType safeGetJdbcType(final int columnTypeInt) {
133+
public static PostgresType safeGetJdbcType(final int columnTypeInt, final Map<Integer, PostgresType> postgresTypeMap) {
134134
try {
135-
return PostgresType.valueOf(columnTypeInt);
135+
return PostgresType.valueOf(columnTypeInt, postgresTypeMap);
136136
} catch (final Exception e) {
137137
return PostgresType.VARCHAR;
138138
}

docs/integrations/sources/alloydb.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp
327327

328328
| Version | Date | Pull Request | Subject |
329329
|:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------|
330+
| 1.0.43 | 2022-02-06 | [21634](https://github.com/airbytehq/airbyte/pull/21634) | Improve Standard sync performance by caching objects.|
330331
| 1.0.36 | 2023-01-24 | [21825](https://github.com/airbytehq/airbyte/pull/21825) | Put back the original change that will cause an incremental sync to error if table contains a NULL value in cursor column.|
331332
| 1.0.35 | 2022-12-14 | [20436](https://github.com/airbytehq/airbyte/pull/20346) | Consolidate date/time values mapping for JDBC sources |
332333
| 1.0.34 | 2022-12-13 | [20378](https://github.com/airbytehq/airbyte/pull/20378) | Improve descriptions |

docs/integrations/sources/postgres.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,7 @@ The root causes is that the WALs needed for the incremental sync has been remove
411411

412412
| Version | Date | Pull Request | Subject |
413413
|:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
414+
| 1.0.43 | 2022-02-06 | [21634](https://github.com/airbytehq/airbyte/pull/21634) | Improve Standard sync performance by caching objects. |
414415
| 1.0.42 | 2022-01-23 | [21523](https://github.com/airbytehq/airbyte/pull/21523) | Check for null in cursor values before replacing. |
415416
| 1.0.41 | 2022-01-25 | [20939](https://github.com/airbytehq/airbyte/pull/20939) | Adjust batch selection memory limits databases. |
416417
| 1.0.40 | 2023-01-24 | [21825](https://github.com/airbytehq/airbyte/pull/21825) | Put back the original change that will cause an incremental sync to error if table contains a NULL value in cursor column. |

0 commit comments

Comments
 (0)