diff --git a/airbyte-integrations/bases/standard-destination-test/build.gradle b/airbyte-integrations/bases/standard-destination-test/build.gradle index 79c7a2c8aa623..bc9aee6076feb 100644 --- a/airbyte-integrations/bases/standard-destination-test/build.gradle +++ b/airbyte-integrations/bases/standard-destination-test/build.gradle @@ -11,4 +11,5 @@ dependencies { implementation(enforcedPlatform('org.junit:junit-bom:5.8.2')) implementation 'org.junit.jupiter:junit-jupiter-api' implementation 'org.junit.jupiter:junit-jupiter-params' + implementation 'org.mockito:mockito-core:4.6.1' } diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/PerStreamStateMessageTest.java b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/PerStreamStateMessageTest.java new file mode 100644 index 0000000000000..120f9f1a8729e --- /dev/null +++ b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/PerStreamStateMessageTest.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.standardtest.destination; + +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteMessage.Type; +import io.airbyte.protocol.models.AirbyteStateMessage; +import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType; +import io.airbyte.protocol.models.AirbyteStreamState; +import io.airbyte.protocol.models.StreamDescriptor; +import java.util.function.Consumer; +import org.junit.jupiter.api.Test; +import org.mockito.InOrder; +import org.mockito.Mockito; + +public abstract class PerStreamStateMessageTest { + + protected abstract Consumer getMockedConsumer(); + + protected abstract FailureTrackingAirbyteMessageConsumer getMessageConsumer(); + + @Test + void ensureAllStateMessageAreEmitted() throws Exception { + final AirbyteMessage airbyteMessage1 = AirbyteMessageCreator.createStreamStateMessage("name_one", "state_one"); + final AirbyteMessage airbyteMessage2 = AirbyteMessageCreator.createStreamStateMessage("name_two", "state_two"); + final AirbyteMessage airbyteMessage3 = AirbyteMessageCreator.createStreamStateMessage("name_three", "state_three"); + final FailureTrackingAirbyteMessageConsumer messageConsumer = getMessageConsumer(); + + messageConsumer.accept(airbyteMessage1); + messageConsumer.accept(airbyteMessage2); + messageConsumer.accept(airbyteMessage3); + + final Consumer mConsumer = getMockedConsumer(); + final InOrder inOrder = Mockito.inOrder(mConsumer); + + inOrder.verify(mConsumer).accept(airbyteMessage1); + inOrder.verify(mConsumer).accept(airbyteMessage2); + inOrder.verify(mConsumer).accept(airbyteMessage3); + } + + class AirbyteMessageCreator { + + public static AirbyteMessage createStreamStateMessage(final String name, final String value) { + return new AirbyteMessage() + .withType(Type.STATE) + .withState( + new AirbyteStateMessage() + .withType(AirbyteStateType.STREAM) + .withStream( + new AirbyteStreamState() + .withStreamDescriptor( + new StreamDescriptor() + .withName(name)) + .withStreamState(Jsons.jsonNode(value)))); + } + + } + +} diff --git a/airbyte-integrations/connectors/destination-bigquery/build.gradle b/airbyte-integrations/connectors/destination-bigquery/build.gradle index 66bea4b266bbf..b2c8bd082e2bc 100644 --- a/airbyte-integrations/connectors/destination-bigquery/build.gradle +++ b/airbyte-integrations/connectors/destination-bigquery/build.gradle @@ -25,6 +25,8 @@ dependencies { implementation project(':airbyte-integrations:connectors:destination-gcs') implementation ('com.github.airbytehq:json-avro-converter:1.0.1') { exclude group: 'ch.qos.logback', module: 'logback-classic'} + testImplementation project(':airbyte-integrations:bases:standard-destination-test') + integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-normalization').airbyteDocker.outputs) integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-bigquery') diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumerTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumerTest.java new file mode 100644 index 0000000000000..8e87852a506b9 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumerTest.java @@ -0,0 +1,33 @@ +package io.airbyte.integrations.destination.bigquery; + +import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; +import io.airbyte.integrations.destination.bigquery.uploader.AbstractBigQueryUploader; +import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest; +import io.airbyte.protocol.models.AirbyteMessage; +import java.util.Map; +import java.util.function.Consumer; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class BigQueryRecordConsumerTest extends PerStreamStateMessageTest { + + @Mock + private Map> uploaderMap; + @Mock + private Consumer outputRecordCollector; + + @InjectMocks + private BigQueryRecordConsumer bigQueryRecordConsumer; + + @Override protected Consumer getMockedConsumer() { + return outputRecordCollector; + } + + @Override protected FailureTrackingAirbyteMessageConsumer getMessageConsumer() { + return bigQueryRecordConsumer; + } +}