From 112873e7adcddebbbe569f8a9ecc1926ed43f4fa Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Wed, 3 Aug 2022 14:20:51 -0700 Subject: [PATCH 1/5] Add generic test to test the per stream state behavior --- .../PerStreamStateMessageTest.java | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/PerStreamStateMessageTest.java diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/PerStreamStateMessageTest.java b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/PerStreamStateMessageTest.java new file mode 100644 index 0000000000000..9f2c0cb450b52 --- /dev/null +++ b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/PerStreamStateMessageTest.java @@ -0,0 +1,59 @@ +package io.airbyte.integrations.standardtest.destination; + +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteMessage.Type; +import io.airbyte.protocol.models.AirbyteStateMessage; +import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType; +import io.airbyte.protocol.models.AirbyteStreamState; +import io.airbyte.protocol.models.StreamDescriptor; +import java.util.function.Consumer; +import org.junit.jupiter.api.Test; +import org.mockito.InOrder; +import org.mockito.Mockito; + +public abstract class PerStreamStateMessageTest { + + protected abstract Consumer getMockedConsumer(); + + protected abstract FailureTrackingAirbyteMessageConsumer getMessageConsumer(); + + @Test + void ensureAllStateMessageAreEmitted() throws Exception { + final AirbyteMessage airbyteMessage1 = AirbyteMessageCreator.createStreamStateMessage("name_one", "state_one"); + final AirbyteMessage airbyteMessage2 = AirbyteMessageCreator.createStreamStateMessage("name_two", "state_two"); + final AirbyteMessage airbyteMessage3 = AirbyteMessageCreator.createStreamStateMessage("name_three", "state_three"); + final FailureTrackingAirbyteMessageConsumer messageConsumer = getMessageConsumer(); + + messageConsumer.accept(airbyteMessage1); + messageConsumer.accept(airbyteMessage2); + messageConsumer.accept(airbyteMessage3); + + final Consumer mConsumer = getMockedConsumer(); + final InOrder inOrder = Mockito.inOrder(mConsumer); + + inOrder.verify(mConsumer).accept(airbyteMessage1); + inOrder.verify(mConsumer).accept(airbyteMessage2); + inOrder.verify(mConsumer).accept(airbyteMessage3); + } + + class AirbyteMessageCreator { + + public static AirbyteMessage createStreamStateMessage(final String name, final String value) { + return new AirbyteMessage() + .withType(Type.STATE) + .withState( + new AirbyteStateMessage() + .withType(AirbyteStateType.STREAM) + .withStream( + new AirbyteStreamState() + .withStreamDescriptor( + new StreamDescriptor() + .withName(name)) + .withStreamState(Jsons.jsonNode(value)))); + } + + } + +} From f6ba1a7a71a46b505a0522513f1d059e0495b272 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Wed, 3 Aug 2022 14:39:56 -0700 Subject: [PATCH 2/5] Add missing dependency --- .../bases/standard-destination-test/build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte-integrations/bases/standard-destination-test/build.gradle b/airbyte-integrations/bases/standard-destination-test/build.gradle index 79c7a2c8aa623..bc9aee6076feb 100644 --- a/airbyte-integrations/bases/standard-destination-test/build.gradle +++ b/airbyte-integrations/bases/standard-destination-test/build.gradle @@ -11,4 +11,5 @@ dependencies { implementation(enforcedPlatform('org.junit:junit-bom:5.8.2')) implementation 'org.junit.jupiter:junit-jupiter-api' implementation 'org.junit.jupiter:junit-jupiter-params' + implementation 'org.mockito:mockito-core:4.6.1' } From f7fad660ae17da9ebab0a6dcf1a9534b4a48487d Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Wed, 3 Aug 2022 14:50:41 -0700 Subject: [PATCH 3/5] Add test for the big query record consumer --- .../destination-bigquery/build.gradle | 2 ++ .../bigquery/BigQueryRecordConsumerTest.java | 33 +++++++++++++++++++ 2 files changed, 35 insertions(+) create mode 100644 airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumerTest.java diff --git a/airbyte-integrations/connectors/destination-bigquery/build.gradle b/airbyte-integrations/connectors/destination-bigquery/build.gradle index 66bea4b266bbf..b2c8bd082e2bc 100644 --- a/airbyte-integrations/connectors/destination-bigquery/build.gradle +++ b/airbyte-integrations/connectors/destination-bigquery/build.gradle @@ -25,6 +25,8 @@ dependencies { implementation project(':airbyte-integrations:connectors:destination-gcs') implementation ('com.github.airbytehq:json-avro-converter:1.0.1') { exclude group: 'ch.qos.logback', module: 'logback-classic'} + testImplementation project(':airbyte-integrations:bases:standard-destination-test') + integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-normalization').airbyteDocker.outputs) integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-bigquery') diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumerTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumerTest.java new file mode 100644 index 0000000000000..8e87852a506b9 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumerTest.java @@ -0,0 +1,33 @@ +package io.airbyte.integrations.destination.bigquery; + +import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; +import io.airbyte.integrations.destination.bigquery.uploader.AbstractBigQueryUploader; +import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest; +import io.airbyte.protocol.models.AirbyteMessage; +import java.util.Map; +import java.util.function.Consumer; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class BigQueryRecordConsumerTest extends PerStreamStateMessageTest { + + @Mock + private Map> uploaderMap; + @Mock + private Consumer outputRecordCollector; + + @InjectMocks + private BigQueryRecordConsumer bigQueryRecordConsumer; + + @Override protected Consumer getMockedConsumer() { + return outputRecordCollector; + } + + @Override protected FailureTrackingAirbyteMessageConsumer getMessageConsumer() { + return bigQueryRecordConsumer; + } +} From cad6d2ce3b7a396e2f671b9923f43f1287313490 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Wed, 3 Aug 2022 15:21:24 -0700 Subject: [PATCH 4/5] Add license --- .../standardtest/destination/PerStreamStateMessageTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/PerStreamStateMessageTest.java b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/PerStreamStateMessageTest.java index 9f2c0cb450b52..708247867c0b9 100644 --- a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/PerStreamStateMessageTest.java +++ b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/PerStreamStateMessageTest.java @@ -1,3 +1,6 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ package io.airbyte.integrations.standardtest.destination; import io.airbyte.commons.json.Jsons; From 600532daac7e825d5cff3b6a02aa3cf4f34139fc Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Wed, 3 Aug 2022 15:44:17 -0700 Subject: [PATCH 5/5] Add missing newline --- .../standardtest/destination/PerStreamStateMessageTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/PerStreamStateMessageTest.java b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/PerStreamStateMessageTest.java index 708247867c0b9..120f9f1a8729e 100644 --- a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/PerStreamStateMessageTest.java +++ b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/PerStreamStateMessageTest.java @@ -1,6 +1,7 @@ /* * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ + package io.airbyte.integrations.standardtest.destination; import io.airbyte.commons.json.Jsons;