Skip to content

Destination Postgres : Enable DAT and fix the data fetch. #12543

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ plugins {
id 'java-library'
}
dependencies {
implementation project(':airbyte-db:lib')
implementation project(':airbyte-config:models')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-protocol:models')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.standardtest.destination;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import java.util.Arrays;
import org.jooq.Record;

public abstract class JdbcDestinationAcceptanceTest extends DestinationAcceptanceTest {

protected final ObjectMapper mapper = new ObjectMapper();

protected JsonNode getJsonFromRecord(Record record) {
ObjectNode node = mapper.createObjectNode();

Arrays.stream(record.fields()).forEach(field -> {
var value = record.get(field);

switch (field.getDataType().getTypeName()) {
case "varchar", "jsonb", "other":
var stringValue = (value != null ? value.toString() : null);
if (stringValue != null && (stringValue.replaceAll("[^\\x00-\\x7F]", "").matches("^\\[.*\\]$")
|| stringValue.replaceAll("[^\\x00-\\x7F]", "").matches("^\\{.*\\}$"))) {
node.set(field.getName(), Jsons.deserialize(stringValue));
} else {
node.put(field.getName(), stringValue);
}
break;
default:
node.put(field.getName(), (value != null ? value.toString() : null));
}
});
return node;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,16 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.JdbcDestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.testcontainers.containers.PostgreSQLContainer;

public class PostgresDestinationAcceptanceTest extends DestinationAcceptanceTest {
public class PostgresDestinationAcceptanceTest extends JdbcDestinationAcceptanceTest {

private PostgreSQLContainer<?> db;
private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer();
Expand Down Expand Up @@ -62,7 +61,7 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv env,
throws Exception {
return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace)
.stream()
.map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText()))
.map(r -> r.get(JavaBaseConstants.COLUMN_NAME_DATA))
.collect(Collectors.toList());
}

Expand All @@ -81,41 +80,42 @@ protected boolean implementsNamespaces() {
return true;
}

@Override
protected TestDataComparator getTestDataComparator() {
return new PostgresTestDataComparator();
}

@Override
protected boolean supportBasicDataTypeTest() {
return true;
}

@Override
protected boolean supportArrayDataTypeTest() {
return true;
}

@Override
protected boolean supportObjectDataTypeTest() {
return true;
}

@Override
protected List<JsonNode> retrieveNormalizedRecords(final TestDestinationEnv env, final String streamName, final String namespace)
throws Exception {
final String tableName = namingResolver.getIdentifier(streamName);
// Temporarily disabling the behavior of the ExtendedNameTransformer, see (issue #1785) so we don't
// use quoted names
// if (!tableName.startsWith("\"")) {
// // Currently, Normalization always quote tables identifiers
// //tableName = "\"" + tableName + "\"";
// }
return retrieveRecordsFromTable(tableName, namespace);
}

@Override
protected List<String> resolveIdentifier(final String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
result.add(resolved.toUpperCase());
}
return result;
}

private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException {
return Databases.createPostgresDatabase(db.getUsername(), db.getPassword(),
db.getJdbcUrl()).query(
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.collect(Collectors.toList()));
db.getJdbcUrl()).query(ctx -> {
ctx.execute("set time zone 'UTC';");
return ctx.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(this::getJsonFromRecord)
.collect(Collectors.toList());
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.postgres;

import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;

public class PostgresTestDataComparator extends AdvancedTestDataComparator {

private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer();

private static final String POSTGRES_DATETIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'";
private static final String POSTGRES_DATETIME_WITH_TZ_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'";

@Override
protected List<String> resolveIdentifier(final String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
result.add(resolved.toUpperCase());
}
return result;
}

private LocalDate parseLocalDate(String dateTimeValue) {
if (dateTimeValue != null) {
var format = (dateTimeValue.matches(".+Z") ? POSTGRES_DATETIME_FORMAT : AIRBYTE_DATETIME_FORMAT);
return LocalDate.parse(dateTimeValue, DateTimeFormatter.ofPattern(format));
} else {
return null;
}
}

@Override
protected boolean compareDateTimeValues(String expectedValue, String actualValue) {
var destinationDate = parseLocalDate(actualValue);
var expectedDate = LocalDate.parse(expectedValue, DateTimeFormatter.ofPattern(AIRBYTE_DATETIME_FORMAT));
return expectedDate.equals(destinationDate);
}

@Override
protected ZonedDateTime parseDestinationDateWithTz(String destinationValue) {
return ZonedDateTime.of(LocalDateTime.parse(destinationValue, DateTimeFormatter.ofPattern(POSTGRES_DATETIME_WITH_TZ_FORMAT)), ZoneOffset.UTC);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.ssh.SshBastionContainer;
import io.airbyte.integrations.base.ssh.SshTunnel;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import java.util.ArrayList;
import io.airbyte.integrations.standardtest.destination.JdbcDestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
Expand All @@ -29,7 +28,7 @@
* Abstract class that allows us to avoid duplicating testing logic for testing SSH with a key file
* or with a password.
*/
public abstract class SshPostgresDestinationAcceptanceTest extends DestinationAcceptanceTest {
public abstract class SshPostgresDestinationAcceptanceTest extends JdbcDestinationAcceptanceTest {

private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer();
private static final String schemaName = RandomStringUtils.randomAlphabetic(8).toLowerCase();
Expand Down Expand Up @@ -63,7 +62,7 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv env,
throws Exception {
return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace)
.stream()
.map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText()))
.map(r -> r.get(JavaBaseConstants.COLUMN_NAME_DATA))
.collect(Collectors.toList());
}

Expand All @@ -82,32 +81,33 @@ protected boolean implementsNamespaces() {
return true;
}

@Override
protected TestDataComparator getTestDataComparator() {
return new PostgresTestDataComparator();
}

@Override
protected boolean supportBasicDataTypeTest() {
return true;
}

@Override
protected boolean supportArrayDataTypeTest() {
return true;
}

@Override
protected boolean supportObjectDataTypeTest() {
return true;
}

@Override
protected List<JsonNode> retrieveNormalizedRecords(final TestDestinationEnv env, final String streamName, final String namespace)
throws Exception {
final String tableName = namingResolver.getIdentifier(streamName);
// Temporarily disabling the behavior of the ExtendedNameTransformer, see (issue #1785) so we don't
// use quoted names
// if (!tableName.startsWith("\"")) {
// // Currently, Normalization always quote tables identifiers
// //tableName = "\"" + tableName + "\"";
// }
return retrieveRecordsFromTable(tableName, namespace);
}

@Override
protected List<String> resolveIdentifier(final String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
result.add(resolved.toUpperCase());
}
return result;
}

private static Database getDatabaseFromConfig(final JsonNode config) {
return Databases.createPostgresDatabase(
config.get("username").asText(),
Expand All @@ -123,13 +123,13 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
PostgresDestination.HOST_KEY,
PostgresDestination.PORT_KEY,
(CheckedFunction<JsonNode, List<JsonNode>, Exception>) mangledConfig -> getDatabaseFromConfig(mangledConfig)
.query(
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.collect(Collectors.toList())));
.query(ctx -> {
ctx.execute("set time zone 'UTC';");
return ctx.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(this::getJsonFromRecord)
.collect(Collectors.toList());
}));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,20 @@
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.JdbcDestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Integration test testing {@link RedshiftCopyS3Destination}. The default Redshift integration test
* credentials contain S3 credentials - this automatically causes COPY to be selected.
*/
public class RedshiftCopyDestinationAcceptanceTest extends DestinationAcceptanceTest {
public class RedshiftCopyDestinationAcceptanceTest extends JdbcDestinationAcceptanceTest {

private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftCopyDestinationAcceptanceTest.class);

Expand Down Expand Up @@ -121,29 +119,6 @@ protected List<JsonNode> retrieveNormalizedRecords(final TestDestinationEnv test
return retrieveRecordsFromTable(tableName, namespace);
}

private JsonNode getJsonFromRecord(Record record) {
ObjectNode node = mapper.createObjectNode();

Arrays.stream(record.fields()).forEach(field -> {
var value = record.get(field);

switch (field.getDataType().getTypeName()) {
case "varchar", "other":
var stringValue = (value != null ? value.toString() : null);
if (stringValue != null && (stringValue.replaceAll("[^\\x00-\\x7F]", "").matches("^\\[.*\\]$")
|| stringValue.replaceAll("[^\\x00-\\x7F]", "").matches("^\\{.*\\}$"))) {
node.set(field.getName(), Jsons.deserialize(stringValue));
} else {
node.put(field.getName(), stringValue);
}
break;
default:
node.put(field.getName(), (value != null ? value.toString() : null));
}
});
return node;
}

private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException {
return getDatabase().query(
ctx -> ctx
Expand Down