Skip to content

Commit daafb61

Browse files
authored
Add generic test to test the per stream state behavior (#15267)
* Add generic test to test the per stream state behavior * Add missing dependency * Add license * Add missing newline
1 parent f4b4863 commit daafb61

File tree

2 files changed

+64
-0
lines changed

2 files changed

+64
-0
lines changed

airbyte-integrations/bases/standard-destination-test/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,5 @@ dependencies {
1111
implementation(enforcedPlatform('org.junit:junit-bom:5.8.2'))
1212
implementation 'org.junit.jupiter:junit-jupiter-api'
1313
implementation 'org.junit.jupiter:junit-jupiter-params'
14+
implementation 'org.mockito:mockito-core:4.6.1'
1415
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.standardtest.destination;
6+
7+
import io.airbyte.commons.json.Jsons;
8+
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
9+
import io.airbyte.protocol.models.AirbyteMessage;
10+
import io.airbyte.protocol.models.AirbyteMessage.Type;
11+
import io.airbyte.protocol.models.AirbyteStateMessage;
12+
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType;
13+
import io.airbyte.protocol.models.AirbyteStreamState;
14+
import io.airbyte.protocol.models.StreamDescriptor;
15+
import java.util.function.Consumer;
16+
import org.junit.jupiter.api.Test;
17+
import org.mockito.InOrder;
18+
import org.mockito.Mockito;
19+
20+
public abstract class PerStreamStateMessageTest {
21+
22+
protected abstract Consumer<AirbyteMessage> getMockedConsumer();
23+
24+
protected abstract FailureTrackingAirbyteMessageConsumer getMessageConsumer();
25+
26+
@Test
27+
void ensureAllStateMessageAreEmitted() throws Exception {
28+
final AirbyteMessage airbyteMessage1 = AirbyteMessageCreator.createStreamStateMessage("name_one", "state_one");
29+
final AirbyteMessage airbyteMessage2 = AirbyteMessageCreator.createStreamStateMessage("name_two", "state_two");
30+
final AirbyteMessage airbyteMessage3 = AirbyteMessageCreator.createStreamStateMessage("name_three", "state_three");
31+
final FailureTrackingAirbyteMessageConsumer messageConsumer = getMessageConsumer();
32+
33+
messageConsumer.accept(airbyteMessage1);
34+
messageConsumer.accept(airbyteMessage2);
35+
messageConsumer.accept(airbyteMessage3);
36+
37+
final Consumer<AirbyteMessage> mConsumer = getMockedConsumer();
38+
final InOrder inOrder = Mockito.inOrder(mConsumer);
39+
40+
inOrder.verify(mConsumer).accept(airbyteMessage1);
41+
inOrder.verify(mConsumer).accept(airbyteMessage2);
42+
inOrder.verify(mConsumer).accept(airbyteMessage3);
43+
}
44+
45+
class AirbyteMessageCreator {
46+
47+
public static AirbyteMessage createStreamStateMessage(final String name, final String value) {
48+
return new AirbyteMessage()
49+
.withType(Type.STATE)
50+
.withState(
51+
new AirbyteStateMessage()
52+
.withType(AirbyteStateType.STREAM)
53+
.withStream(
54+
new AirbyteStreamState()
55+
.withStreamDescriptor(
56+
new StreamDescriptor()
57+
.withName(name))
58+
.withStreamState(Jsons.jsonNode(value))));
59+
}
60+
61+
}
62+
63+
}

0 commit comments

Comments
 (0)