Skip to content

Commit 39db316

Browse files
Update Cassandra destination to use outputRecordCollector to properly store state (#15294)
* Update Cassandra destination to use outputRecordCollector to properly store state * Bump version * auto-bump connector version [ci skip] Co-authored-by: Octavia Squidington III <[email protected]>
1 parent 117c346 commit 39db316

File tree

8 files changed

+70
-21
lines changed

8 files changed

+70
-21
lines changed

airbyte-config/init/src/main/resources/seed/destination_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
- name: Cassandra
5454
destinationDefinitionId: 707456df-6f4f-4ced-b5c6-03f73bcad1c5
5555
dockerRepository: airbyte/destination-cassandra
56-
dockerImageTag: 0.1.2
56+
dockerImageTag: 0.1.3
5757
documentationUrl: https://docs.airbyte.io/integrations/destinations/cassandra
5858
icon: cassandra.svg
5959
releaseStage: alpha

airbyte-config/init/src/main/resources/seed/destination_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -693,7 +693,7 @@
693693
supported_destination_sync_modes:
694694
- "overwrite"
695695
- "append"
696-
- dockerImage: "airbyte/destination-cassandra:0.1.2"
696+
- dockerImage: "airbyte/destination-cassandra:0.1.3"
697697
spec:
698698
documentationUrl: "https://docs.airbyte.io/integrations/destinations/cassandra"
699699
connectionSpecification:

airbyte-integrations/connectors/destination-cassandra/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION destination-cassandra
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=0.1.2
19+
LABEL io.airbyte.version=0.1.3
2020
LABEL io.airbyte.name=airbyte/destination-cassandra

airbyte-integrations/connectors/destination-cassandra/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ dependencies {
2626
// https://mvnrepository.com/artifact/org.assertj/assertj-core
2727
testImplementation "org.assertj:assertj-core:${assertVersion}"
2828
testImplementation libs.connectors.testcontainers.cassandra
29+
testImplementation project(':airbyte-integrations:bases:standard-destination-test')
2930

3031

3132
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')

airbyte-integrations/connectors/destination-cassandra/src/main/java/io/airbyte/integrations/destination/cassandra/CassandraDestination.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public static void main(String[] args) throws Exception {
2626
}
2727

2828
@Override
29-
public AirbyteConnectionStatus check(JsonNode config) {
29+
public AirbyteConnectionStatus check(final JsonNode config) {
3030
var cassandraConfig = new CassandraConfig(config);
3131
// add random uuid to avoid conflicts with existing tables.
3232
String tableName = "table_" + UUID.randomUUID().toString().replace("-", "");
@@ -55,10 +55,12 @@ public AirbyteConnectionStatus check(JsonNode config) {
5555
}
5656

5757
@Override
58-
public AirbyteMessageConsumer getConsumer(JsonNode config,
59-
ConfiguredAirbyteCatalog configuredCatalog,
60-
Consumer<AirbyteMessage> outputRecordCollector) {
61-
return new CassandraMessageConsumer(new CassandraConfig(config), configuredCatalog, outputRecordCollector);
58+
public AirbyteMessageConsumer getConsumer(final JsonNode config,
59+
final ConfiguredAirbyteCatalog configuredCatalog,
60+
final Consumer<AirbyteMessage> outputRecordCollector) {
61+
final CassandraConfig cassandraConfig = new CassandraConfig(config);
62+
final CassandraCqlProvider cassandraCqlProvider = new CassandraCqlProvider(cassandraConfig);
63+
return new CassandraMessageConsumer(cassandraConfig, configuredCatalog, cassandraCqlProvider, outputRecordCollector);
6264
}
6365

6466
}

airbyte-integrations/connectors/destination-cassandra/src/main/java/io/airbyte/integrations/destination/cassandra/CassandraMessageConsumer.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,13 @@ class CassandraMessageConsumer extends FailureTrackingAirbyteMessageConsumer {
2727

2828
private final CassandraCqlProvider cassandraCqlProvider;
2929

30-
private AirbyteMessage lastMessage = null;
31-
32-
public CassandraMessageConsumer(CassandraConfig cassandraConfig,
33-
ConfiguredAirbyteCatalog configuredCatalog,
34-
Consumer<AirbyteMessage> outputRecordCollector) {
30+
public CassandraMessageConsumer(final CassandraConfig cassandraConfig,
31+
final ConfiguredAirbyteCatalog configuredCatalog,
32+
final CassandraCqlProvider provider,
33+
final Consumer<AirbyteMessage> outputRecordCollector) {
3534
this.cassandraConfig = cassandraConfig;
3635
this.outputRecordCollector = outputRecordCollector;
37-
this.cassandraCqlProvider = new CassandraCqlProvider(cassandraConfig);
36+
this.cassandraCqlProvider = provider;
3837
var nameTransformer = new CassandraNameTransformer(cassandraConfig);
3938
this.cassandraStreams = configuredCatalog.getStreams().stream()
4039
.collect(Collectors.toUnmodifiableMap(
@@ -55,7 +54,7 @@ protected void startTracked() {
5554
}
5655

5756
@Override
58-
protected void acceptTracked(AirbyteMessage message) {
57+
protected void acceptTracked(final AirbyteMessage message) {
5958
if (message.getType() == AirbyteMessage.Type.RECORD) {
6059
var messageRecord = message.getRecord();
6160
var streamConfig =
@@ -66,14 +65,14 @@ protected void acceptTracked(AirbyteMessage message) {
6665
var data = Jsons.serialize(messageRecord.getData());
6766
cassandraCqlProvider.insert(streamConfig.getKeyspace(), streamConfig.getTempTableName(), data);
6867
} else if (message.getType() == AirbyteMessage.Type.STATE) {
69-
this.lastMessage = message;
68+
outputRecordCollector.accept(message);
7069
} else {
7170
LOGGER.warn("Unsupported airbyte message type: {}", message.getType());
7271
}
7372
}
7473

7574
@Override
76-
protected void close(boolean hasFailed) {
75+
protected void close(final boolean hasFailed) {
7776
if (!hasFailed) {
7877
cassandraStreams.forEach((k, v) -> {
7978
try {
@@ -88,17 +87,16 @@ protected void close(boolean hasFailed) {
8887
}
8988
default -> throw new UnsupportedOperationException();
9089
}
91-
} catch (Exception e) {
90+
} catch (final Exception e) {
9291
LOGGER.error("Error while copying data to table {}: : ", v.getTableName(), e);
9392
}
9493
});
95-
outputRecordCollector.accept(lastMessage);
9694
}
9795

9896
cassandraStreams.forEach((k, v) -> {
9997
try {
10098
cassandraCqlProvider.dropTableIfExists(v.getKeyspace(), v.getTempTableName());
101-
} catch (Exception e) {
99+
} catch (final Exception e) {
102100
LOGGER.error("Error while deleting temp table {} with reason: ", v.getTempTableName(), e);
103101
}
104102
});

airbyte-integrations/connectors/destination-cassandra/src/test-integration/java/io/airbyte/integrations/destination/cassandra/CassandraMessageConsumerIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ void setup() {
5353

5454
var catalog = TestDataFactory.createConfiguredAirbyteCatalog(cStream1, cStream2);
5555

56-
cassandraMessageConsumer = new CassandraMessageConsumer(cassandraConfig, catalog, message -> {});
5756
cassandraCqlProvider = new CassandraCqlProvider(cassandraConfig);
57+
cassandraMessageConsumer = new CassandraMessageConsumer(cassandraConfig, catalog, cassandraCqlProvider, message -> {});
5858
nameTransformer = new CassandraNameTransformer(cassandraConfig);
5959
}
6060

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.cassandra;
6+
7+
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
8+
import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest;
9+
import io.airbyte.protocol.models.AirbyteMessage;
10+
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
11+
import java.util.function.Consumer;
12+
import org.junit.jupiter.api.BeforeEach;
13+
import org.junit.jupiter.api.extension.ExtendWith;
14+
import org.mockito.InjectMocks;
15+
import org.mockito.Mock;
16+
import org.mockito.junit.jupiter.MockitoExtension;
17+
18+
@ExtendWith(MockitoExtension.class)
19+
public class CassandraRecordConsumerTest extends PerStreamStateMessageTest {
20+
21+
@Mock
22+
private Consumer<AirbyteMessage> outputRecordCollector;
23+
24+
@InjectMocks
25+
private CassandraMessageConsumer consumer;
26+
@Mock
27+
private CassandraConfig config;
28+
@Mock
29+
private ConfiguredAirbyteCatalog catalog;
30+
@Mock
31+
private CassandraCqlProvider provider;
32+
33+
@BeforeEach
34+
public void init() {
35+
consumer = new CassandraMessageConsumer(config, catalog, provider, outputRecordCollector);
36+
}
37+
38+
@Override
39+
protected Consumer<AirbyteMessage> getMockedConsumer() {
40+
return outputRecordCollector;
41+
}
42+
43+
@Override
44+
protected FailureTrackingAirbyteMessageConsumer getMessageConsumer() {
45+
return consumer;
46+
}
47+
48+
}

0 commit comments

Comments
 (0)