Skip to content

Commit 1ce7b90

Browse files
committed
wip
1 parent d7bbaad commit 1ce7b90

File tree

3 files changed

+97
-13
lines changed

3 files changed

+97
-13
lines changed

airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@
1616
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
1717
import io.airbyte.integrations.destination.s3.S3Destination;
1818
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
19+
import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig;
20+
import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig.Flattening;
21+
import io.airbyte.integrations.destination.s3.csv.S3CsvWriter;
1922
import io.airbyte.protocol.models.AirbyteRecordMessage;
23+
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
2024
import io.airbyte.protocol.models.DestinationSyncMode;
2125
import java.io.IOException;
2226
import java.io.PrintWriter;
@@ -58,45 +62,66 @@ public abstract class S3StreamCopier implements StreamCopier {
5862
protected final JdbcDatabase db;
5963
private final ExtendedNameTransformer nameTransformer;
6064
private final SqlOperations sqlOperations;
65+
private final ConfiguredAirbyteStream configuredAirbyteStream;
66+
private final Timestamp uploadTime;
6167
protected final Set<String> s3StagingFiles = new HashSet<>();
6268
private final Map<String, StreamTransferManager> multipartUploadManagers = new HashMap<>();
6369
private final Map<String, MultiPartOutputStream> outputStreams = new HashMap<>();
6470
private final Map<String, CSVPrinter> csvPrinters = new HashMap<>();
6571
protected final String stagingFolder;
6672
private final StagingFilenameGenerator filenameGenerator;
73+
private final Map<String, S3CsvWriter> stagingWriters = new HashMap<>();
6774

6875
public S3StreamCopier(final String stagingFolder,
69-
final DestinationSyncMode destSyncMode,
7076
final String schema,
71-
final String streamName,
7277
final AmazonS3 client,
7378
final JdbcDatabase db,
7479
final S3DestinationConfig s3Config,
7580
final ExtendedNameTransformer nameTransformer,
76-
final SqlOperations sqlOperations) {
77-
this.destSyncMode = destSyncMode;
81+
final SqlOperations sqlOperations,
82+
final ConfiguredAirbyteStream configuredAirbyteStream,
83+
final Timestamp uploadTime) {
84+
this.destSyncMode = configuredAirbyteStream.getDestinationSyncMode();
7885
this.schemaName = schema;
79-
this.streamName = streamName;
86+
this.streamName = configuredAirbyteStream.getStream().getName();
8087
this.stagingFolder = stagingFolder;
8188
this.db = db;
8289
this.nameTransformer = nameTransformer;
8390
this.sqlOperations = sqlOperations;
84-
this.tmpTableName = nameTransformer.getTmpTableName(streamName);
91+
this.configuredAirbyteStream = configuredAirbyteStream;
92+
this.uploadTime = uploadTime;
93+
this.tmpTableName = nameTransformer.getTmpTableName(this.streamName);
8594
this.s3Client = client;
8695
this.s3Config = s3Config;
87-
this.filenameGenerator = new StagingFilenameGenerator(streamName, MAX_PARTS_PER_FILE);
96+
this.filenameGenerator = new StagingFilenameGenerator(this.streamName, MAX_PARTS_PER_FILE);
8897
}
8998

9099
private String prepareS3StagingFile() {
91100
return String.join("/", stagingFolder, schemaName, filenameGenerator.getStagingFilename());
92101
}
93102

103+
/*
104+
* old behavior: create s3://bucket/randomUuid/(namespace|schemaName)/generatedFilename
105+
* S3CsvWiter: create s3://bucket/bucketPath(/namespace)?/streamName/time.csv
106+
*/
94107
@Override
95108
public String prepareStagingFile() {
96109
final var name = prepareS3StagingFile();
97-
if (!s3StagingFiles.contains(name)) {
98-
s3StagingFiles.add(name);
110+
if (!stagingWriters.containsKey(name)) {
99111
LOGGER.info("S3 upload part size: {} MB", s3Config.getPartSize());
112+
113+
try {
114+
final S3CsvWriter writer = new S3CsvWriter(
115+
s3Config.cloneWithFormatConfig(new S3CsvFormatConfig(Flattening.ROOT_LEVEL, (long) s3Config.getPartSize())),
116+
s3Client,
117+
configuredAirbyteStream,
118+
uploadTime
119+
);
120+
stagingWriters.put(name, writer);
121+
} catch (final IOException e) {
122+
throw new RuntimeException(e);
123+
}
124+
100125
// The stream transfer manager lets us greedily stream into S3. The native AWS SDK does not
101126
// have support for streaming multipart uploads;
102127
// The alternative is first writing the entire output to disk before loading into S3. This is not
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package io.airbyte.integrations.destination.jdbc.copy.s3;
2+
3+
import com.amazonaws.services.s3.AmazonS3;
4+
import io.airbyte.db.jdbc.JdbcDatabase;
5+
import io.airbyte.integrations.destination.ExtendedNameTransformer;
6+
import io.airbyte.integrations.destination.jdbc.SqlOperations;
7+
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
8+
import io.airbyte.integrations.destination.jdbc.copy.StreamCopierFactory;
9+
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
10+
import io.airbyte.protocol.models.AirbyteStream;
11+
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
12+
import io.airbyte.protocol.models.DestinationSyncMode;
13+
14+
public abstract class S3StreamCopierFactory implements StreamCopierFactory<S3DestinationConfig> {
15+
16+
/**
17+
* Used by the copy consumer.
18+
*/
19+
@Override
20+
public StreamCopier create(final String configuredSchema,
21+
final S3DestinationConfig s3Config,
22+
final String stagingFolder,
23+
final ConfiguredAirbyteStream configuredStream,
24+
final ExtendedNameTransformer nameTransformer,
25+
final JdbcDatabase db,
26+
final SqlOperations sqlOperations) {
27+
try {
28+
final AirbyteStream stream = configuredStream.getStream();
29+
final DestinationSyncMode syncMode = configuredStream.getDestinationSyncMode();
30+
final String schema = StreamCopierFactory.getSchema(stream.getNamespace(), configuredSchema, nameTransformer);
31+
final AmazonS3 s3Client = s3Config.getS3Client();
32+
33+
return create(stagingFolder, syncMode, schema, stream.getName(), s3Client, db, s3Config, nameTransformer, sqlOperations);
34+
} catch (final Exception e) {
35+
throw new RuntimeException(e);
36+
}
37+
}
38+
39+
/**
40+
* For specific copier suppliers to implement.
41+
*/
42+
public abstract StreamCopier create(String stagingFolder,
43+
DestinationSyncMode syncMode,
44+
String schema,
45+
String streamName,
46+
AmazonS3 s3Client,
47+
JdbcDatabase db,
48+
S3DestinationConfig s3Config,
49+
ExtendedNameTransformer nameTransformer,
50+
SqlOperations sqlOperations)
51+
throws Exception;
52+
}

airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierTest.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@
1919
import io.airbyte.integrations.destination.jdbc.SqlOperations;
2020
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
2121
import io.airbyte.protocol.models.AirbyteRecordMessage;
22+
import io.airbyte.protocol.models.AirbyteStream;
23+
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
2224
import io.airbyte.protocol.models.DestinationSyncMode;
25+
import java.sql.Timestamp;
26+
import java.time.Instant;
2327
import java.util.UUID;
2428
import org.junit.jupiter.api.BeforeEach;
2529
import org.junit.jupiter.api.Test;
@@ -44,9 +48,7 @@ public void setup() {
4448

4549
copier = new S3StreamCopier(
4650
"fake-staging-folder",
47-
DestinationSyncMode.OVERWRITE,
4851
"fake-schema",
49-
"fake-stream",
5052
s3Client,
5153
db,
5254
new S3DestinationConfig(
@@ -59,7 +61,11 @@ public void setup() {
5961
null
6062
),
6163
new ExtendedNameTransformer(),
62-
sqlOperations
64+
sqlOperations,
65+
new ConfiguredAirbyteStream()
66+
.withDestinationSyncMode(DestinationSyncMode.APPEND)
67+
.withStream(new AirbyteStream().withName("fake-stream")),
68+
Timestamp.from(Instant.now())
6369
) {
6470
@Override
6571
public void copyS3CsvFileIntoTable(
@@ -108,7 +114,8 @@ public void closesS3Upload_when_stagingUploaderClosedFailingly() throws Exceptio
108114
final RuntimeException exception = assertThrows(RuntimeException.class, () -> copier.closeStagingUploader(true));
109115

110116
// the wrapping chain is RuntimeException -> ExecutionException -> RuntimeException -> InterruptedException
111-
assertEquals(InterruptedException.class, exception.getCause().getCause().getCause().getClass(), "Original exception: " + ExceptionUtils.readStackTrace(exception));
117+
assertEquals(InterruptedException.class, exception.getCause().getCause().getCause().getClass(),
118+
"Original exception: " + ExceptionUtils.readStackTrace(exception));
112119
}
113120

114121
@Test

0 commit comments

Comments
 (0)