Skip to content

Commit 211d331

Browse files
authored
15302: Destination Azure Blob Storage: Handle per-stream state (#15318)
* 15302: Azure blob destination consumer fixed * 15302: Unit test added * 15302: Unit test added * 15318: test fix
1 parent b5dc550 commit 211d331

File tree

3 files changed

+57
-14
lines changed

3 files changed

+57
-14
lines changed

airbyte-integrations/connectors/destination-azure-blob-storage/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ dependencies {
1919
implementation 'com.azure:azure-storage-blob:12.12.0'
2020
implementation 'org.apache.commons:commons-csv:1.4'
2121

22+
testImplementation project(':airbyte-integrations:bases:standard-destination-test')
23+
2224
testImplementation 'org.apache.commons:commons-lang3:3.11'
2325

2426
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')

airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,11 @@ public class AzureBlobStorageConsumer extends FailureTrackingAirbyteMessageConsu
4444
private final Consumer<AirbyteMessage> outputRecordCollector;
4545
private final Map<AirbyteStreamNameNamespacePair, AzureBlobStorageWriter> streamNameAndNamespaceToWriters;
4646

47-
private AirbyteMessage lastStateMessage = null;
48-
4947
public AzureBlobStorageConsumer(
50-
final AzureBlobStorageDestinationConfig azureBlobStorageDestinationConfig,
51-
final ConfiguredAirbyteCatalog configuredCatalog,
52-
final AzureBlobStorageWriterFactory writerFactory,
53-
final Consumer<AirbyteMessage> outputRecordCollector) {
48+
final AzureBlobStorageDestinationConfig azureBlobStorageDestinationConfig,
49+
final ConfiguredAirbyteCatalog configuredCatalog,
50+
final AzureBlobStorageWriterFactory writerFactory,
51+
final Consumer<AirbyteMessage> outputRecordCollector) {
5452
this.azureBlobStorageDestinationConfig = azureBlobStorageDestinationConfig;
5553
this.configuredCatalog = configuredCatalog;
5654
this.writerFactory = writerFactory;
@@ -93,8 +91,8 @@ protected void startTracked() throws Exception {
9391
}
9492

9593
private void createContainers(final SpecializedBlobClientBuilder specializedBlobClientBuilder,
96-
final AppendBlobClient appendBlobClient,
97-
final ConfiguredAirbyteStream configuredStream) {
94+
final AppendBlobClient appendBlobClient,
95+
final ConfiguredAirbyteStream configuredStream) {
9896
// create container if absent (aka SQl Schema)
9997
final BlobContainerClient containerClient = appendBlobClient.getContainerClient();
10098
if (!containerClient.exists()) {
@@ -103,7 +101,7 @@ private void createContainers(final SpecializedBlobClientBuilder specializedBlob
103101
if (DestinationSyncMode.OVERWRITE.equals(configuredStream.getDestinationSyncMode())) {
104102
LOGGER.info("Sync mode is selected to OVERRIDE mode. New container will be automatically"
105103
+ " created or all data would be overridden (if any) for stream:" + configuredStream
106-
.getStream().getName());
104+
.getStream().getName());
107105
var blobItemList = StreamSupport.stream(containerClient.listBlobs().spliterator(), false)
108106
.collect(Collectors.toList());
109107
blobItemList.forEach(blob -> {
@@ -121,7 +119,7 @@ private void createContainers(final SpecializedBlobClientBuilder specializedBlob
121119
@Override
122120
protected void acceptTracked(final AirbyteMessage airbyteMessage) throws Exception {
123121
if (airbyteMessage.getType() == Type.STATE) {
124-
this.lastStateMessage = airbyteMessage;
122+
outputRecordCollector.accept(airbyteMessage);
125123
return;
126124
} else if (airbyteMessage.getType() != Type.RECORD) {
127125
return;
@@ -154,10 +152,6 @@ protected void close(final boolean hasFailed) throws Exception {
154152
for (final AzureBlobStorageWriter handler : streamNameAndNamespaceToWriters.values()) {
155153
handler.close(hasFailed);
156154
}
157-
158-
if (!hasFailed) {
159-
outputRecordCollector.accept(lastStateMessage);
160-
}
161155
}
162156

163157
private static String getOutputFilename(final Timestamp timestamp) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package io.airbyte.integrations.destination.azure_blob_storage;
2+
3+
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
4+
import io.airbyte.integrations.destination.azure_blob_storage.writer.AzureBlobStorageWriterFactory;
5+
import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest;
6+
import io.airbyte.protocol.models.AirbyteMessage;
7+
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
8+
import java.util.function.Consumer;
9+
import org.junit.jupiter.api.BeforeEach;
10+
import org.junit.jupiter.api.DisplayName;
11+
import org.junit.jupiter.api.extension.ExtendWith;
12+
import org.mockito.InjectMocks;
13+
import org.mockito.Mock;
14+
import org.mockito.junit.jupiter.MockitoExtension;
15+
16+
@DisplayName("AzureBlobRecordConsumer")
17+
@ExtendWith(MockitoExtension.class)
18+
public class AzureBlobRecordConsumerTest extends PerStreamStateMessageTest {
19+
@Mock
20+
private Consumer<AirbyteMessage> outputRecordCollector;
21+
22+
private AzureBlobStorageConsumer consumer;
23+
24+
@Mock
25+
private AzureBlobStorageDestinationConfig azureBlobStorageDestinationConfig;
26+
27+
@Mock
28+
private ConfiguredAirbyteCatalog configuredCatalog;
29+
30+
@Mock
31+
private AzureBlobStorageWriterFactory writerFactory;
32+
33+
@BeforeEach
34+
public void init() {
35+
consumer = new AzureBlobStorageConsumer(azureBlobStorageDestinationConfig, configuredCatalog, writerFactory, outputRecordCollector);
36+
}
37+
38+
@Override
39+
protected Consumer<AirbyteMessage> getMockedConsumer() {
40+
return outputRecordCollector;
41+
}
42+
43+
@Override
44+
protected FailureTrackingAirbyteMessageConsumer getMessageConsumer() {
45+
return consumer;
46+
}
47+
}

0 commit comments

Comments
 (0)