Skip to content

Bmoric/test bq standard #15270

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Aug 3, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ dependencies {
implementation(enforcedPlatform('org.junit:junit-bom:5.8.2'))
implementation 'org.junit.jupiter:junit-jupiter-api'
implementation 'org.junit.jupiter:junit-jupiter-params'
implementation 'org.mockito:mockito-core:4.6.1'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.standardtest.destination;

import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.AirbyteStreamState;
import io.airbyte.protocol.models.StreamDescriptor;
import java.util.function.Consumer;
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;

public abstract class PerStreamStateMessageTest {

protected abstract Consumer<AirbyteMessage> getMockedConsumer();

protected abstract FailureTrackingAirbyteMessageConsumer getMessageConsumer();

@Test
void ensureAllStateMessageAreEmitted() throws Exception {
final AirbyteMessage airbyteMessage1 = AirbyteMessageCreator.createStreamStateMessage("name_one", "state_one");
final AirbyteMessage airbyteMessage2 = AirbyteMessageCreator.createStreamStateMessage("name_two", "state_two");
final AirbyteMessage airbyteMessage3 = AirbyteMessageCreator.createStreamStateMessage("name_three", "state_three");
final FailureTrackingAirbyteMessageConsumer messageConsumer = getMessageConsumer();

messageConsumer.accept(airbyteMessage1);
messageConsumer.accept(airbyteMessage2);
messageConsumer.accept(airbyteMessage3);

final Consumer<AirbyteMessage> mConsumer = getMockedConsumer();
final InOrder inOrder = Mockito.inOrder(mConsumer);

inOrder.verify(mConsumer).accept(airbyteMessage1);
inOrder.verify(mConsumer).accept(airbyteMessage2);
inOrder.verify(mConsumer).accept(airbyteMessage3);
}

class AirbyteMessageCreator {

public static AirbyteMessage createStreamStateMessage(final String name, final String value) {
return new AirbyteMessage()
.withType(Type.STATE)
.withState(
new AirbyteStateMessage()
.withType(AirbyteStateType.STREAM)
.withStream(
new AirbyteStreamState()
.withStreamDescriptor(
new StreamDescriptor()
.withName(name))
.withStreamState(Jsons.jsonNode(value))));
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ dependencies {
implementation project(':airbyte-integrations:connectors:destination-gcs')
implementation ('com.github.airbytehq:json-avro-converter:1.0.1') { exclude group: 'ch.qos.logback', module: 'logback-classic'}

testImplementation project(':airbyte-integrations:bases:standard-destination-test')

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-normalization').airbyteDocker.outputs)
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-bigquery')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.airbyte.integrations.destination.bigquery;

import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.integrations.destination.bigquery.uploader.AbstractBigQueryUploader;
import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest;
import io.airbyte.protocol.models.AirbyteMessage;
import java.util.Map;
import java.util.function.Consumer;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
public class BigQueryRecordConsumerTest extends PerStreamStateMessageTest {

@Mock
private Map<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>> uploaderMap;
@Mock
private Consumer<AirbyteMessage> outputRecordCollector;

@InjectMocks
private BigQueryRecordConsumer bigQueryRecordConsumer;

@Override protected Consumer<AirbyteMessage> getMockedConsumer() {
return outputRecordCollector;
}

@Override protected FailureTrackingAirbyteMessageConsumer getMessageConsumer() {
return bigQueryRecordConsumer;
}
}