Skip to content

Commit ed1d26a

Browse files
authored
S3, GCS Destinations : Fix interface duplication (#9577)
* Interface reorganization * add missing impl * upd related classes * review remarks
1 parent 7806dbc commit ed1d26a

File tree

24 files changed

+134
-86
lines changed

24 files changed

+134
-86
lines changed

airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractBigQueryUploader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import io.airbyte.integrations.base.JavaBaseConstants;
2020
import io.airbyte.integrations.destination.bigquery.BigQueryUtils;
2121
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter;
22-
import io.airbyte.integrations.destination.gcs.writer.CommonWriter;
22+
import io.airbyte.integrations.destination.s3.writer.DestinationWriter;
2323
import io.airbyte.protocol.models.AirbyteMessage;
2424
import java.io.IOException;
2525
import java.util.function.Consumer;
@@ -28,7 +28,7 @@
2828
import org.slf4j.Logger;
2929
import org.slf4j.LoggerFactory;
3030

31-
public abstract class AbstractBigQueryUploader<T extends CommonWriter> {
31+
public abstract class AbstractBigQueryUploader<T extends DestinationWriter> {
3232

3333
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractBigQueryUploader.class);
3434

airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractGscBigQueryUploader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@
1818
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter;
1919
import io.airbyte.integrations.destination.gcs.GcsDestinationConfig;
2020
import io.airbyte.integrations.destination.gcs.GcsS3Helper;
21-
import io.airbyte.integrations.destination.gcs.writer.GscWriter;
21+
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
2222
import io.airbyte.protocol.models.AirbyteMessage;
2323
import java.util.List;
2424
import java.util.function.Consumer;
2525
import org.slf4j.Logger;
2626
import org.slf4j.LoggerFactory;
2727

28-
public abstract class AbstractGscBigQueryUploader<T extends GscWriter> extends AbstractBigQueryUploader<GscWriter> {
28+
public abstract class AbstractGscBigQueryUploader<T extends DestinationFileWriter> extends AbstractBigQueryUploader<DestinationFileWriter> {
2929

3030
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractGscBigQueryUploader.class);
3131

airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/writer/BigQueryTableWriter.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,15 @@
88
import com.google.cloud.bigquery.TableDataWriteChannel;
99
import com.google.common.base.Charsets;
1010
import io.airbyte.commons.json.Jsons;
11-
import io.airbyte.integrations.destination.gcs.writer.CommonWriter;
11+
import io.airbyte.integrations.destination.s3.writer.DestinationWriter;
12+
import io.airbyte.protocol.models.AirbyteRecordMessage;
1213
import java.io.IOException;
1314
import java.nio.ByteBuffer;
15+
import java.util.UUID;
1416
import org.slf4j.Logger;
1517
import org.slf4j.LoggerFactory;
1618

17-
public class BigQueryTableWriter implements CommonWriter {
19+
public class BigQueryTableWriter implements DestinationWriter {
1820

1921
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryTableWriter.class);
2022

@@ -27,13 +29,18 @@ public BigQueryTableWriter(TableDataWriteChannel writeChannel) {
2729
@Override
2830
public void initialize() throws IOException {}
2931

32+
@Override
33+
public void write(UUID id, AirbyteRecordMessage recordMessage) {
34+
throw new RuntimeException("This write method is not used!");
35+
}
36+
3037
@Override
3138
public void write(JsonNode formattedData) throws IOException {
3239
writeChannel.write(ByteBuffer.wrap((Jsons.serialize(formattedData) + "\n").getBytes(Charsets.UTF_8)));
3340
}
3441

3542
@Override
36-
public void close(boolean hasFailed) throws Exception {
43+
public void close(boolean hasFailed) throws IOException {
3744
this.writeChannel.close();
3845
}
3946

airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsConsumer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
1010
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
1111
import io.airbyte.integrations.destination.gcs.writer.GcsWriterFactory;
12-
import io.airbyte.integrations.destination.s3.writer.S3Writer;
12+
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
1313
import io.airbyte.protocol.models.AirbyteMessage;
1414
import io.airbyte.protocol.models.AirbyteMessage.Type;
1515
import io.airbyte.protocol.models.AirbyteRecordMessage;
@@ -28,7 +28,7 @@ public class GcsConsumer extends FailureTrackingAirbyteMessageConsumer {
2828
private final ConfiguredAirbyteCatalog configuredCatalog;
2929
private final GcsWriterFactory writerFactory;
3030
private final Consumer<AirbyteMessage> outputRecordCollector;
31-
private final Map<AirbyteStreamNameNamespacePair, S3Writer> streamNameAndNamespaceToWriters;
31+
private final Map<AirbyteStreamNameNamespacePair, DestinationFileWriter> streamNameAndNamespaceToWriters;
3232

3333
private AirbyteMessage lastStateMessage = null;
3434

@@ -50,7 +50,7 @@ protected void startTracked() throws Exception {
5050
final Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis());
5151

5252
for (final ConfiguredAirbyteStream configuredStream : configuredCatalog.getStreams()) {
53-
final S3Writer writer = writerFactory
53+
final DestinationFileWriter writer = writerFactory
5454
.create(gcsDestinationConfig, s3Client, configuredStream, uploadTimestamp);
5555
writer.initialize();
5656

@@ -87,7 +87,7 @@ protected void acceptTracked(final AirbyteMessage airbyteMessage) throws Excepti
8787

8888
@Override
8989
protected void close(final boolean hasFailed) throws Exception {
90-
for (final S3Writer handler : streamNameAndNamespaceToWriters.values()) {
90+
for (final DestinationFileWriter handler : streamNameAndNamespaceToWriters.values()) {
9191
handler.close(hasFailed);
9292
}
9393
// Gcs stream uploader is all or nothing if a failure happens in the destination.

airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/avro/GcsAvroWriter.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,12 @@
1111
import io.airbyte.integrations.destination.gcs.GcsDestinationConfig;
1212
import io.airbyte.integrations.destination.gcs.util.GcsUtils;
1313
import io.airbyte.integrations.destination.gcs.writer.BaseGcsWriter;
14-
import io.airbyte.integrations.destination.gcs.writer.CommonWriter;
15-
import io.airbyte.integrations.destination.gcs.writer.GscWriter;
1614
import io.airbyte.integrations.destination.s3.S3Format;
1715
import io.airbyte.integrations.destination.s3.avro.AvroRecordFactory;
1816
import io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter;
1917
import io.airbyte.integrations.destination.s3.avro.S3AvroFormatConfig;
2018
import io.airbyte.integrations.destination.s3.util.S3StreamTransferManagerHelper;
21-
import io.airbyte.integrations.destination.s3.writer.S3Writer;
19+
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
2220
import io.airbyte.protocol.models.AirbyteRecordMessage;
2321
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
2422
import java.io.IOException;
@@ -33,7 +31,7 @@
3331
import org.slf4j.LoggerFactory;
3432
import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
3533

36-
public class GcsAvroWriter extends BaseGcsWriter implements S3Writer, GscWriter, CommonWriter {
34+
public class GcsAvroWriter extends BaseGcsWriter implements DestinationFileWriter {
3735

3836
protected static final Logger LOGGER = LoggerFactory.getLogger(GcsAvroWriter.class);
3937

airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvWriter.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,11 @@
1010
import com.fasterxml.jackson.databind.JsonNode;
1111
import io.airbyte.integrations.destination.gcs.GcsDestinationConfig;
1212
import io.airbyte.integrations.destination.gcs.writer.BaseGcsWriter;
13-
import io.airbyte.integrations.destination.gcs.writer.CommonWriter;
14-
import io.airbyte.integrations.destination.gcs.writer.GscWriter;
1513
import io.airbyte.integrations.destination.s3.S3Format;
1614
import io.airbyte.integrations.destination.s3.csv.CsvSheetGenerator;
1715
import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig;
1816
import io.airbyte.integrations.destination.s3.util.S3StreamTransferManagerHelper;
19-
import io.airbyte.integrations.destination.s3.writer.S3Writer;
17+
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
2018
import io.airbyte.protocol.models.AirbyteRecordMessage;
2119
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
2220
import java.io.IOException;
@@ -30,7 +28,7 @@
3028
import org.slf4j.Logger;
3129
import org.slf4j.LoggerFactory;
3230

33-
public class GcsCsvWriter extends BaseGcsWriter implements S3Writer, GscWriter, CommonWriter {
31+
public class GcsCsvWriter extends BaseGcsWriter implements DestinationFileWriter {
3432

3533
private static final Logger LOGGER = LoggerFactory.getLogger(GcsCsvWriter.class);
3634

airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/jsonl/GcsJsonlWriter.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,9 @@
1515
import io.airbyte.integrations.base.JavaBaseConstants;
1616
import io.airbyte.integrations.destination.gcs.GcsDestinationConfig;
1717
import io.airbyte.integrations.destination.gcs.writer.BaseGcsWriter;
18-
import io.airbyte.integrations.destination.gcs.writer.CommonWriter;
19-
import io.airbyte.integrations.destination.gcs.writer.GscWriter;
2018
import io.airbyte.integrations.destination.s3.S3Format;
2119
import io.airbyte.integrations.destination.s3.util.S3StreamTransferManagerHelper;
22-
import io.airbyte.integrations.destination.s3.writer.S3Writer;
20+
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
2321
import io.airbyte.protocol.models.AirbyteRecordMessage;
2422
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
2523
import java.io.IOException;
@@ -30,7 +28,7 @@
3028
import org.slf4j.Logger;
3129
import org.slf4j.LoggerFactory;
3230

33-
public class GcsJsonlWriter extends BaseGcsWriter implements S3Writer, GscWriter, CommonWriter {
31+
public class GcsJsonlWriter extends BaseGcsWriter implements DestinationFileWriter {
3432

3533
protected static final Logger LOGGER = LoggerFactory.getLogger(GcsJsonlWriter.class);
3634

airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/parquet/GcsParquetWriter.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,10 @@
1010
import io.airbyte.integrations.destination.gcs.GcsDestinationConfig;
1111
import io.airbyte.integrations.destination.gcs.credential.GcsHmacKeyCredentialConfig;
1212
import io.airbyte.integrations.destination.gcs.writer.BaseGcsWriter;
13-
import io.airbyte.integrations.destination.gcs.writer.CommonWriter;
14-
import io.airbyte.integrations.destination.gcs.writer.GscWriter;
1513
import io.airbyte.integrations.destination.s3.S3Format;
1614
import io.airbyte.integrations.destination.s3.avro.AvroRecordFactory;
1715
import io.airbyte.integrations.destination.s3.parquet.S3ParquetFormatConfig;
18-
import io.airbyte.integrations.destination.s3.writer.S3Writer;
16+
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
1917
import io.airbyte.protocol.models.AirbyteRecordMessage;
2018
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
2119
import java.io.IOException;
@@ -35,7 +33,7 @@
3533
import org.slf4j.LoggerFactory;
3634
import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
3735

38-
public class GcsParquetWriter extends BaseGcsWriter implements S3Writer, GscWriter, CommonWriter {
36+
public class GcsParquetWriter extends BaseGcsWriter implements DestinationFileWriter {
3937

4038
private static final Logger LOGGER = LoggerFactory.getLogger(GcsParquetWriter.class);
4139
private static final ObjectMapper MAPPER = new ObjectMapper();

airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/BaseGcsWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import io.airbyte.integrations.destination.s3.S3DestinationConstants;
1313
import io.airbyte.integrations.destination.s3.S3Format;
1414
import io.airbyte.integrations.destination.s3.util.S3OutputPathHelper;
15-
import io.airbyte.integrations.destination.s3.writer.S3Writer;
15+
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
1616
import io.airbyte.protocol.models.AirbyteStream;
1717
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
1818
import io.airbyte.protocol.models.DestinationSyncMode;
@@ -33,7 +33,7 @@
3333
* <li>Create the bucket and prepare the bucket path.</li>
3434
* </ul>
3535
*/
36-
public abstract class BaseGcsWriter implements S3Writer, CommonWriter {
36+
public abstract class BaseGcsWriter implements DestinationFileWriter {
3737

3838
private static final Logger LOGGER = LoggerFactory.getLogger(BaseGcsWriter.class);
3939

airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/CommonWriter.java

Lines changed: 0 additions & 18 deletions
This file was deleted.

airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/GcsWriterFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
import com.amazonaws.services.s3.AmazonS3;
88
import io.airbyte.integrations.destination.gcs.GcsDestinationConfig;
9-
import io.airbyte.integrations.destination.s3.writer.S3Writer;
9+
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
1010
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
1111
import java.sql.Timestamp;
1212

@@ -15,7 +15,7 @@
1515
*/
1616
public interface GcsWriterFactory {
1717

18-
S3Writer create(GcsDestinationConfig config,
18+
DestinationFileWriter create(GcsDestinationConfig config,
1919
AmazonS3 s3Client,
2020
ConfiguredAirbyteStream configuredStream,
2121
Timestamp uploadTimestamp)

airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/ProductionWriterFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import io.airbyte.integrations.destination.s3.S3Format;
1414
import io.airbyte.integrations.destination.s3.avro.AvroConstants;
1515
import io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter;
16-
import io.airbyte.integrations.destination.s3.writer.S3Writer;
16+
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
1717
import io.airbyte.protocol.models.AirbyteStream;
1818
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
1919
import java.sql.Timestamp;
@@ -26,7 +26,7 @@ public class ProductionWriterFactory implements GcsWriterFactory {
2626
protected static final Logger LOGGER = LoggerFactory.getLogger(ProductionWriterFactory.class);
2727

2828
@Override
29-
public S3Writer create(final GcsDestinationConfig config,
29+
public DestinationFileWriter create(final GcsDestinationConfig config,
3030
final AmazonS3 s3Client,
3131
final ConfiguredAirbyteStream configuredStream,
3232
final Timestamp uploadTimestamp)

airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig;
1515
import io.airbyte.integrations.destination.s3.csv.S3CsvWriter;
1616
import io.airbyte.integrations.destination.s3.csv.StagingDatabaseCsvSheetGenerator;
17-
import io.airbyte.integrations.destination.s3.writer.S3Writer;
17+
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
1818
import io.airbyte.protocol.models.AirbyteRecordMessage;
1919
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
2020
import io.airbyte.protocol.models.DestinationSyncMode;
@@ -47,7 +47,7 @@ public abstract class S3StreamCopier implements StreamCopier {
4747
private final ConfiguredAirbyteStream configuredAirbyteStream;
4848
private final Timestamp uploadTime;
4949
protected final String stagingFolder;
50-
protected final Map<String, S3Writer> stagingWritersByFile = new HashMap<>();
50+
protected final Map<String, DestinationFileWriter> stagingWritersByFile = new HashMap<>();
5151
private final boolean purgeStagingData;
5252

5353
// The number of batches of records that will be inserted into each file.
@@ -129,7 +129,7 @@ public void write(final UUID id, final AirbyteRecordMessage recordMessage, final
129129

130130
@Override
131131
public void closeStagingUploader(final boolean hasFailed) throws Exception {
132-
for (final S3Writer writer : stagingWritersByFile.values()) {
132+
for (final DestinationFileWriter writer : stagingWritersByFile.values()) {
133133
writer.close(hasFailed);
134134
}
135135
}
@@ -149,7 +149,7 @@ public void createTemporaryTable() throws Exception {
149149
@Override
150150
public void copyStagingFileToTemporaryTable() throws Exception {
151151
LOGGER.info("Starting copy to tmp table: {} in destination for stream: {}, schema: {}, .", tmpTableName, streamName, schemaName);
152-
for (final Map.Entry<String, S3Writer> entry : stagingWritersByFile.entrySet()) {
152+
for (final Map.Entry<String, DestinationFileWriter> entry : stagingWritersByFile.entrySet()) {
153153
final String objectKey = entry.getValue().getOutputPath();
154154
copyS3CsvFileIntoTable(db, getFullS3Path(s3Config.getBucketName(), objectKey), schemaName, tmpTableName, s3Config);
155155
}
@@ -181,7 +181,7 @@ public String generateMergeStatement(final String destTableName) {
181181
@Override
182182
public void removeFileAndDropTmpTable() throws Exception {
183183
if (purgeStagingData) {
184-
for (final Map.Entry<String, S3Writer> entry : stagingWritersByFile.entrySet()) {
184+
for (final Map.Entry<String, DestinationFileWriter> entry : stagingWritersByFile.entrySet()) {
185185
final String suffix = entry.getKey();
186186
final String objectKey = entry.getValue().getOutputPath();
187187

@@ -208,7 +208,7 @@ public String getTmpTableName() {
208208
}
209209

210210
@VisibleForTesting
211-
public Map<String, S3Writer> getStagingWritersByFile() {
211+
public Map<String, DestinationFileWriter> getStagingWritersByFile() {
212212
return stagingWritersByFile;
213213
}
214214

airbyte-integrations/connectors/destination-s3/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,5 @@ As a community contributor, you will need access to AWS to run the integration t
2323
- Modify `spec.json` to specify the configuration of this new format.
2424
- Update `S3FormatConfigs` to be able to construct a config for this new format.
2525
- Create a new package under `io.airbyte.integrations.destination.s3`.
26-
- Implement a new `S3Writer`. The implementation can extend `BaseS3Writer`.
26+
- Implement a new `DestinationFileWriter`. The implementation can extend `BaseS3Writer`.
2727
- Write an acceptance test for the new output format. The test can extend `S3DestinationAcceptanceTest`.

airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Consumer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import io.airbyte.commons.json.Jsons;
99
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
1010
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
11-
import io.airbyte.integrations.destination.s3.writer.S3Writer;
11+
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
1212
import io.airbyte.integrations.destination.s3.writer.S3WriterFactory;
1313
import io.airbyte.protocol.models.AirbyteMessage;
1414
import io.airbyte.protocol.models.AirbyteMessage.Type;
@@ -28,7 +28,7 @@ public class S3Consumer extends FailureTrackingAirbyteMessageConsumer {
2828
private final ConfiguredAirbyteCatalog configuredCatalog;
2929
private final S3WriterFactory writerFactory;
3030
private final Consumer<AirbyteMessage> outputRecordCollector;
31-
private final Map<AirbyteStreamNameNamespacePair, S3Writer> streamNameAndNamespaceToWriters;
31+
private final Map<AirbyteStreamNameNamespacePair, DestinationFileWriter> streamNameAndNamespaceToWriters;
3232

3333
private AirbyteMessage lastStateMessage = null;
3434

@@ -49,7 +49,7 @@ protected void startTracked() throws Exception {
4949
final Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis());
5050

5151
for (final ConfiguredAirbyteStream configuredStream : configuredCatalog.getStreams()) {
52-
final S3Writer writer = writerFactory
52+
final DestinationFileWriter writer = writerFactory
5353
.create(s3DestinationConfig, s3Client, configuredStream, uploadTimestamp);
5454
writer.initialize();
5555

@@ -85,7 +85,7 @@ protected void acceptTracked(final AirbyteMessage airbyteMessage) throws Excepti
8585

8686
@Override
8787
protected void close(final boolean hasFailed) throws Exception {
88-
for (final S3Writer handler : streamNameAndNamespaceToWriters.values()) {
88+
for (final DestinationFileWriter handler : streamNameAndNamespaceToWriters.values()) {
8989
handler.close(hasFailed);
9090
}
9191
// S3 stream uploader is all or nothing if a failure happens in the destination.

0 commit comments

Comments
 (0)