Skip to content

17644 Update Destination data type test to use the new data types #19281

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.destination.s3.avro.JsonSchemaType;
import io.airbyte.integrations.standardtest.destination.NumberDataTypeTestArgumentProvider;
import io.airbyte.integrations.standardtest.destination.argproviders.NumberDataTypeTestArgumentProvider;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ dependencies {
implementation 'org.junit.jupiter:junit-jupiter-api'
implementation 'org.junit.jupiter:junit-jupiter-params'
implementation 'org.mockito:mockito-core:4.6.1'

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.airbyte.config.StandardCheckConnectionOutput.Status;
import io.airbyte.config.WorkerDestinationConfig;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.standardtest.destination.argproviders.DataArgumentsProvider;
import io.airbyte.integrations.standardtest.destination.argproviders.DataTypeTestArgumentProvider;
import io.airbyte.integrations.standardtest.destination.comparator.BasicTestDataComparator;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import io.airbyte.protocol.models.AirbyteCatalog;
Expand Down Expand Up @@ -429,13 +431,13 @@ public void testSecondSync() throws Exception {

final AirbyteCatalog catalog =
Jsons.deserialize(
MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.catalogFile),
MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.getCatalogFileVersion(getProtocolVersion())),
AirbyteCatalog.class);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(
catalog);

final List<AirbyteMessage> firstSyncMessages = MoreResources.readResource(
DataArgumentsProvider.EXCHANGE_RATE_CONFIG.messageFile).lines()
DataArgumentsProvider.EXCHANGE_RATE_CONFIG.getMessageFileVersion(getProtocolVersion())).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class))
.collect(Collectors.toList());
final JsonNode config = getConfig();
Expand All @@ -446,7 +448,7 @@ public void testSecondSync() throws Exception {
// So let's create a dummy data that will be checked after all sync. It should remain the same
final AirbyteCatalog dummyCatalog =
Jsons.deserialize(
MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.catalogFile),
MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.getCatalogFileVersion(getProtocolVersion())),
AirbyteCatalog.class);
dummyCatalog.getStreams().get(0).setName(DUMMY_CATALOG_NAME);
final ConfiguredAirbyteCatalog configuredDummyCatalog = CatalogHelpers.toDefaultConfiguredCatalog(
Expand Down Expand Up @@ -495,7 +497,7 @@ public void testSecondSync() throws Exception {
public void testLineBreakCharacters() throws Exception {
final AirbyteCatalog catalog =
Jsons.deserialize(
MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.catalogFile),
MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.getCatalogFileVersion(getProtocolVersion())),
AirbyteCatalog.class);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(
catalog);
Expand Down Expand Up @@ -560,7 +562,7 @@ public void testIncrementalSync() throws Exception {

final AirbyteCatalog catalog =
Jsons.deserialize(
MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.catalogFile),
MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.getCatalogFileVersion(getProtocolVersion())),
AirbyteCatalog.class);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(
catalog);
Expand All @@ -570,7 +572,7 @@ public void testIncrementalSync() throws Exception {
});

final List<AirbyteMessage> firstSyncMessages = MoreResources.readResource(
DataArgumentsProvider.EXCHANGE_RATE_CONFIG.messageFile).lines()
DataArgumentsProvider.EXCHANGE_RATE_CONFIG.getMessageFileVersion(getProtocolVersion())).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class))
.collect(Collectors.toList());
final JsonNode config = getConfig();
Expand Down Expand Up @@ -651,7 +653,7 @@ public void testIncrementalDedupeSync() throws Exception {

final AirbyteCatalog catalog =
Jsons.deserialize(
MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.catalogFile),
MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.getCatalogFileVersion(getProtocolVersion())),
AirbyteCatalog.class);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(
catalog);
Expand All @@ -665,7 +667,7 @@ public void testIncrementalDedupeSync() throws Exception {
});

final List<AirbyteMessage> firstSyncMessages = MoreResources.readResource(
DataArgumentsProvider.EXCHANGE_RATE_CONFIG.messageFile).lines()
DataArgumentsProvider.EXCHANGE_RATE_CONFIG.getMessageFileVersion(getProtocolVersion())).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class))
.collect(Collectors.toList());
final JsonNode config = getConfig();
Expand Down Expand Up @@ -754,12 +756,12 @@ void testSyncVeryBigRecords() throws Exception {

final AirbyteCatalog catalog =
Jsons.deserialize(
MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.catalogFile),
MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.getCatalogFileVersion(getProtocolVersion())),
AirbyteCatalog.class);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(
catalog);
final List<AirbyteMessage> messages = MoreResources.readResource(
DataArgumentsProvider.EXCHANGE_RATE_CONFIG.messageFile).lines()
DataArgumentsProvider.EXCHANGE_RATE_CONFIG.getMessageFileVersion(getProtocolVersion())).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class))
.collect(Collectors.toList());
// Add a big message that barely fits into the limits of the destination
Expand Down Expand Up @@ -946,15 +948,15 @@ void testSyncUsesAirbyteStreamNamespaceIfNotNull() throws Exception {
// TODO(davin): make these tests part of the catalog file.
final AirbyteCatalog catalog =
Jsons.deserialize(
MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.catalogFile),
MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.getCatalogFileVersion(getProtocolVersion())),
AirbyteCatalog.class);
final String namespace = "sourcenamespace";
catalog.getStreams().forEach(stream -> stream.setNamespace(namespace));
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(
catalog);

final List<AirbyteMessage> messages = MoreResources.readResource(
DataArgumentsProvider.EXCHANGE_RATE_CONFIG.messageFile).lines()
DataArgumentsProvider.EXCHANGE_RATE_CONFIG.getMessageFileVersion(getProtocolVersion())).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class))
.collect(Collectors.toList());
final List<AirbyteMessage> messagesWithNewNamespace = getRecordMessagesWithNewNamespace(
Expand All @@ -978,7 +980,7 @@ void testSyncWriteSameTableNameDifferentNamespace() throws Exception {
// TODO(davin): make these tests part of the catalog file.
final var catalog =
Jsons.deserialize(
MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.catalogFile),
MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.getCatalogFileVersion(getProtocolVersion())),
AirbyteCatalog.class);
final var namespace1 = "sourcenamespace";
catalog.getStreams().forEach(stream -> stream.setNamespace(namespace1));
Expand All @@ -995,14 +997,12 @@ void testSyncWriteSameTableNameDifferentNamespace() throws Exception {
catalog.getStreams().addAll(diffNamespaceStreams);

final var configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog);

final var ns1Messages = MoreResources.readResource(
DataArgumentsProvider.EXCHANGE_RATE_CONFIG.messageFile).lines()
final var messageFile = DataArgumentsProvider.EXCHANGE_RATE_CONFIG.getMessageFileVersion(getProtocolVersion());
final var ns1Messages = MoreResources.readResource(messageFile).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class))
.collect(Collectors.toList());
final var ns1MessagesAtNamespace1 = getRecordMessagesWithNewNamespace(ns1Messages, namespace1);
final var ns2Messages = MoreResources.readResource(
DataArgumentsProvider.EXCHANGE_RATE_CONFIG.messageFile).lines()
final var ns2Messages = MoreResources.readResource(messageFile).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class))
.collect(Collectors.toList());
final var ns2MessagesAtNamespace2 = getRecordMessagesWithNewNamespace(ns2Messages, namespace2);
Expand All @@ -1016,23 +1016,6 @@ void testSyncWriteSameTableNameDifferentNamespace() throws Exception {
retrieveRawRecordsAndAssertSameMessages(catalog, allMessages, defaultSchema);
}

public static class NamespaceTestCaseProvider implements ArgumentsProvider {

@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context)
throws Exception {
final JsonNode testCases =
Jsons.deserialize(MoreResources.readResource("namespace_test_cases.json"));
return MoreIterators.toList(testCases.elements()).stream()
.filter(testCase -> testCase.get("enabled").asBoolean())
.map(testCase -> Arguments.of(
testCase.get("id").asText(),
testCase.get("namespace").asText(),
testCase.get("normalized").asText()));
}

}

@ParameterizedTest
@ArgumentsSource(NamespaceTestCaseProvider.class)
public void testNamespaces(final String testCaseId,
Expand All @@ -1049,14 +1032,14 @@ public void testNamespaces(final String testCaseId,
}

final AirbyteCatalog catalog = Jsons.deserialize(
MoreResources.readResource(DataArgumentsProvider.NAMESPACE_CONFIG.catalogFile),
MoreResources.readResource(DataArgumentsProvider.NAMESPACE_CONFIG.getCatalogFileVersion(getProtocolVersion())),
AirbyteCatalog.class);
catalog.getStreams().forEach(stream -> stream.setNamespace(namespace));
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(
catalog);

final List<AirbyteMessage> messages = MoreResources.readResource(
DataArgumentsProvider.NAMESPACE_CONFIG.messageFile).lines()
DataArgumentsProvider.NAMESPACE_CONFIG.getMessageFileVersion(getProtocolVersion())).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class))
.collect(Collectors.toList());
final List<AirbyteMessage> messagesWithNewNamespace = getRecordMessagesWithNewNamespace(
Expand Down Expand Up @@ -1105,13 +1088,13 @@ public void testSyncNotFailsWithNewFields() throws Exception {

final AirbyteCatalog catalog =
Jsons.deserialize(
MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.catalogFile),
MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.getCatalogFileVersion(getProtocolVersion())),
AirbyteCatalog.class);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(
catalog);

final List<AirbyteMessage> firstSyncMessages = MoreResources.readResource(
DataArgumentsProvider.EXCHANGE_RATE_CONFIG.messageFile).lines()
DataArgumentsProvider.EXCHANGE_RATE_CONFIG.getMessageFileVersion(getProtocolVersion())).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class))
.collect(Collectors.toList());
final JsonNode config = getConfig();
Expand Down Expand Up @@ -1563,6 +1546,18 @@ protected boolean supportObjectDataTypeTest() {
return false;
}

/**
* The method should be overridden if destination connector support newer protocol version otherwise
* {@link io.airbyte.integrations.standardtest.destination.ProtocolVersion#V0} is used
* <p>
* NOTE: Method should be public in a sake of java reflection
*
* @return
*/
public ProtocolVersion getProtocolVersion() {
return ProtocolVersion.V0;
}

private boolean checkTestCompatibility(
final DataTypeTestArgumentProvider.TestCompatibility testCompatibility) {
return testCompatibility.isTestCompatible(supportBasicDataTypeTest(),
Expand Down Expand Up @@ -1639,4 +1634,23 @@ private static List<AirbyteMessage> getRecordMessagesWithNewNamespace(
return airbyteMessages;
}

public static class NamespaceTestCaseProvider implements ArgumentsProvider {

public static final String NAMESPACE_TEST_CASES_JSON = "namespace_test_cases.json";

@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context)
throws Exception {
final JsonNode testCases =
Jsons.deserialize(MoreResources.readResource(NAMESPACE_TEST_CASES_JSON));
return MoreIterators.toList(testCases.elements()).stream()
.filter(testCase -> testCase.get("enabled").asBoolean())
.map(testCase -> Arguments.of(
testCase.get("id").asText(),
testCase.get("namespace").asText(),
testCase.get("normalized").asText()));
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.standardtest.destination;

public enum ProtocolVersion {

V0("v0"),
V1("v1");

private final String prefix;

ProtocolVersion(String prefix) {
this.prefix = prefix;
}

public String getPrefix() {
return prefix;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.standardtest.destination;
package io.airbyte.integrations.standardtest.destination.argproviders;

import static io.airbyte.integrations.standardtest.destination.argproviders.util.ArgumentProviderUtil.getProtocolVersion;
import static io.airbyte.integrations.standardtest.destination.argproviders.util.ArgumentProviderUtil.prefixFileNameByVersion;

import io.airbyte.integrations.standardtest.destination.ProtocolVersion;
import java.util.stream.Stream;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.provider.Arguments;
Expand All @@ -24,10 +28,11 @@ public class DataArgumentsProvider implements ArgumentsProvider {
new CatalogMessageTestConfigPair("namespace_catalog.json", "namespace_messages.txt");

@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) throws Exception {
ProtocolVersion protocolVersion = getProtocolVersion(context);
return Stream.of(
Arguments.of(EXCHANGE_RATE_CONFIG.messageFile, EXCHANGE_RATE_CONFIG.catalogFile),
Arguments.of(EDGE_CASE_CONFIG.messageFile, EDGE_CASE_CONFIG.catalogFile)
Arguments.of(EXCHANGE_RATE_CONFIG.getMessageFileVersion(protocolVersion), EXCHANGE_RATE_CONFIG.getCatalogFileVersion(protocolVersion)),
Arguments.of(EDGE_CASE_CONFIG.getMessageFileVersion(protocolVersion), EDGE_CASE_CONFIG.getCatalogFileVersion(protocolVersion))
// todo - need to use the new protocol to capture this.
// Arguments.of("stripe_messages.txt", "stripe_schema.json")
);
Expand All @@ -44,6 +49,14 @@ public CatalogMessageTestConfigPair(final String catalogFile, final String messa
this.messageFile = messageFile;
}

public String getCatalogFileVersion(ProtocolVersion protocolVersion) {
return prefixFileNameByVersion(catalogFile, protocolVersion);
}

public String getMessageFileVersion(ProtocolVersion protocolVersion) {
return prefixFileNameByVersion(messageFile, protocolVersion);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.standardtest.destination;
package io.airbyte.integrations.standardtest.destination.argproviders;

import static io.airbyte.integrations.standardtest.destination.argproviders.util.ArgumentProviderUtil.getProtocolVersion;

import io.airbyte.integrations.standardtest.destination.ProtocolVersion;
import java.util.stream.Stream;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.provider.Arguments;
Expand All @@ -27,9 +30,11 @@ public class DataTypeTestArgumentProvider implements ArgumentsProvider {
public static final CatalogMessageTestConfigWithCompatibility OBJECT_WITH_ARRAY_TEST =
new CatalogMessageTestConfigWithCompatibility("data_type_array_object_test_catalog.json", "data_type_array_object_test_messages.txt",
new TestCompatibility(true, true, true));
private ProtocolVersion protocolVersion;

@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) throws Exception {
protocolVersion = getProtocolVersion(context);
return Stream.of(
getArguments(BASIC_TEST),
getArguments(ARRAY_TEST),
Expand All @@ -38,7 +43,8 @@ public Stream<? extends Arguments> provideArguments(ExtensionContext context) th
}

private Arguments getArguments(CatalogMessageTestConfigWithCompatibility testConfig) {
return Arguments.of(testConfig.messageFile, testConfig.catalogFile, testConfig.testCompatibility);
return Arguments.of(testConfig.getMessageFileVersion(protocolVersion), testConfig.getCatalogFileVersion(protocolVersion),
testConfig.testCompatibility);
}

public record TestCompatibility(boolean requireBasicCompatibility,
Expand Down
Loading