Skip to content

Commit 3861f0f

Browse files
Update Kafka destination to use outputRecordCollector to properly store state (#15287)
* Added tracking state to destination kafka * Bump version * auto-bump connector version [ci skip] Co-authored-by: Octavia Squidington III <[email protected]>
1 parent 8716138 commit 3861f0f

File tree

6 files changed

+37
-14
lines changed

6 files changed

+37
-14
lines changed

airbyte-config/init/src/main/resources/seed/destination_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@
133133
- name: Kafka
134134
destinationDefinitionId: 9f760101-60ae-462f-9ee6-b7a9dafd454d
135135
dockerRepository: airbyte/destination-kafka
136-
dockerImageTag: 0.1.9
136+
dockerImageTag: 0.1.10
137137
documentationUrl: https://docs.airbyte.io/integrations/destinations/kafka
138138
icon: kafka.svg
139139
releaseStage: alpha

airbyte-config/init/src/main/resources/seed/destination_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2005,7 +2005,7 @@
20052005
supportsDBT: false
20062006
supported_destination_sync_modes:
20072007
- "append"
2008-
- dockerImage: "airbyte/destination-kafka:0.1.9"
2008+
- dockerImage: "airbyte/destination-kafka:0.1.10"
20092009
spec:
20102010
documentationUrl: "https://docs.airbyte.io/integrations/destinations/kafka"
20112011
connectionSpecification:

airbyte-integrations/connectors/destination-kafka/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION destination-kafka
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=0.1.9
19+
LABEL io.airbyte.version=0.1.10
2020
LABEL io.airbyte.name=airbyte/destination-kafka

airbyte-integrations/connectors/destination-kafka/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ dependencies {
1717
implementation 'org.apache.kafka:kafka-clients:2.8.0'
1818
implementation 'org.apache.kafka:connect-json:2.8.0'
1919

20+
testImplementation project(':airbyte-integrations:bases:standard-destination-test')
21+
2022
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
2123
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-kafka')
2224
integrationTestJavaImplementation libs.connectors.testcontainers.kafka

airbyte-integrations/connectors/destination-kafka/src/main/java/io/airbyte/integrations/destination/kafka/KafkaRecordConsumer.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@ public class KafkaRecordConsumer extends FailureTrackingAirbyteMessageConsumer {
3737
private final Consumer<AirbyteMessage> outputRecordCollector;
3838
private final NamingConventionTransformer nameTransformer;
3939

40-
private AirbyteMessage lastStateMessage = null;
41-
4240
public KafkaRecordConsumer(final KafkaDestinationConfig kafkaDestinationConfig,
4341
final ConfiguredAirbyteCatalog catalog,
4442
final Consumer<AirbyteMessage> outputRecordCollector,
@@ -60,7 +58,7 @@ protected void startTracked() {
6058
@Override
6159
protected void acceptTracked(final AirbyteMessage airbyteMessage) {
6260
if (airbyteMessage.getType() == AirbyteMessage.Type.STATE) {
63-
lastStateMessage = airbyteMessage;
61+
outputRecordCollector.accept(airbyteMessage);
6462
} else if (airbyteMessage.getType() == AirbyteMessage.Type.RECORD) {
6563
final AirbyteRecordMessage recordMessage = airbyteMessage.getRecord();
6664

@@ -98,15 +96,13 @@ private void sendRecord(final ProducerRecord<String, JsonNode> record) {
9896
});
9997
if (sync) {
10098
producer.flush();
101-
outputRecordCollector.accept(lastStateMessage);
10299
}
103100
}
104101

105102
@Override
106103
protected void close(final boolean hasFailed) {
107104
producer.flush();
108105
producer.close();
109-
outputRecordCollector.accept(lastStateMessage);
110106
}
111107

112108
}

airbyte-integrations/connectors/destination-kafka/src/test/java/io/airbyte/integrations/destination/kafka/KafkaRecordConsumerTest.java

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
import static org.junit.jupiter.api.Assertions.assertEquals;
88
import static org.junit.jupiter.api.Assertions.assertThrows;
9-
import static org.mockito.Mockito.mock;
109

1110
import com.fasterxml.jackson.databind.JsonNode;
1211
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -15,7 +14,9 @@
1514
import io.airbyte.commons.jackson.MoreMappers;
1615
import io.airbyte.commons.json.Jsons;
1716
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
17+
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
1818
import io.airbyte.integrations.destination.StandardNameTransformer;
19+
import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest;
1920
import io.airbyte.protocol.models.AirbyteMessage;
2021
import io.airbyte.protocol.models.AirbyteRecordMessage;
2122
import io.airbyte.protocol.models.AirbyteStateMessage;
@@ -30,16 +31,21 @@
3031
import java.util.stream.Collectors;
3132
import java.util.stream.IntStream;
3233
import java.util.stream.Stream;
34+
import org.junit.jupiter.api.BeforeEach;
3335
import org.junit.jupiter.api.DisplayName;
3436
import org.junit.jupiter.api.Test;
37+
import org.junit.jupiter.api.extension.ExtendWith;
3538
import org.junit.jupiter.api.extension.ExtensionContext;
3639
import org.junit.jupiter.params.ParameterizedTest;
3740
import org.junit.jupiter.params.provider.Arguments;
3841
import org.junit.jupiter.params.provider.ArgumentsProvider;
3942
import org.junit.jupiter.params.provider.ArgumentsSource;
43+
import org.mockito.Mock;
44+
import org.mockito.junit.jupiter.MockitoExtension;
4045

4146
@DisplayName("KafkaRecordConsumer")
42-
public class KafkaRecordConsumerTest {
47+
@ExtendWith(MockitoExtension.class)
48+
public class KafkaRecordConsumerTest extends PerStreamStateMessageTest {
4349

4450
private static final ObjectMapper mapper = MoreMappers.initMapper();
4551
private static final String TOPIC_NAME = "test.topic";
@@ -53,16 +59,27 @@ public class KafkaRecordConsumerTest {
5359
Field.of("id", JsonSchemaType.NUMBER),
5460
Field.of("name", JsonSchemaType.STRING))));
5561

62+
@Mock
63+
private Consumer<AirbyteMessage> outputRecordCollector;
64+
65+
private KafkaRecordConsumer consumer;
66+
5667
private static final StandardNameTransformer NAMING_RESOLVER = new StandardNameTransformer();
5768

69+
@BeforeEach
70+
public void init() {
71+
final KafkaDestinationConfig config = KafkaDestinationConfig.getKafkaDestinationConfig(getConfig(TOPIC_NAME));
72+
consumer = new KafkaRecordConsumer(config, CATALOG, outputRecordCollector, NAMING_RESOLVER);
73+
}
74+
5875
@ParameterizedTest
5976
@ArgumentsSource(TopicMapArgumentsProvider.class)
6077
@SuppressWarnings("unchecked")
6178
public void testBuildTopicMap(final String topicPattern, final String expectedTopic) {
6279
final KafkaDestinationConfig config = KafkaDestinationConfig.getKafkaDestinationConfig(getConfig(topicPattern));
63-
final KafkaRecordConsumer recordConsumer = new KafkaRecordConsumer(config, CATALOG, mock(Consumer.class), NAMING_RESOLVER);
80+
consumer = new KafkaRecordConsumer(config, CATALOG, outputRecordCollector, NAMING_RESOLVER);
6481

65-
final Map<AirbyteStreamNameNamespacePair, String> topicMap = recordConsumer.buildTopicMap();
82+
final Map<AirbyteStreamNameNamespacePair, String> topicMap = consumer.buildTopicMap();
6683
assertEquals(1, topicMap.size());
6784

6885
final AirbyteStreamNameNamespacePair streamNameNamespacePair = new AirbyteStreamNameNamespacePair(STREAM_NAME, SCHEMA_NAME);
@@ -72,8 +89,6 @@ public void testBuildTopicMap(final String topicPattern, final String expectedTo
7289
@Test
7390
@SuppressWarnings("unchecked")
7491
void testCannotConnectToBrokers() throws Exception {
75-
final KafkaDestinationConfig config = KafkaDestinationConfig.getKafkaDestinationConfig(getConfig(TOPIC_NAME));
76-
final KafkaRecordConsumer consumer = new KafkaRecordConsumer(config, CATALOG, mock(Consumer.class), NAMING_RESOLVER);
7792
final List<AirbyteMessage> expectedRecords = getNRecords(10);
7893

7994
consumer.start();
@@ -133,6 +148,16 @@ private List<AirbyteMessage> getNRecords(final int n) {
133148

134149
}
135150

151+
@Override
152+
protected Consumer<AirbyteMessage> getMockedConsumer() {
153+
return outputRecordCollector;
154+
}
155+
156+
@Override
157+
protected FailureTrackingAirbyteMessageConsumer getMessageConsumer() {
158+
return consumer;
159+
}
160+
136161
public static class TopicMapArgumentsProvider implements ArgumentsProvider {
137162

138163
@Override

0 commit comments

Comments
 (0)