Skip to content

🎉 GCS destination: use serialized buffer; compress csv & jsonl #11686

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.gcs.GcsDestinationConfig;
import io.airbyte.integrations.destination.gcs.GcsS3Helper;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStream;
Expand Down Expand Up @@ -164,7 +163,7 @@ void setup(final TestInfo info) throws IOException {

final GcsDestinationConfig gcsDestinationConfig = GcsDestinationConfig
.getGcsDestinationConfig(BigQueryUtils.getGcsJsonNodeConfig(config));
this.s3Client = GcsS3Helper.getGcsS3Client(gcsDestinationConfig);
this.s3Client = gcsDestinationConfig.getS3Client();

tornDown = false;
Runtime.getRuntime()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.airbyte.integrations.destination.bigquery.BigQueryUtils;
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter;
import io.airbyte.integrations.destination.gcs.GcsDestinationConfig;
import io.airbyte.integrations.destination.gcs.GcsS3Helper;
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
import io.airbyte.protocol.models.AirbyteMessage;
import java.util.List;
Expand Down Expand Up @@ -46,14 +45,14 @@ public abstract class AbstractGscBigQueryUploader<T extends DestinationFileWrite
}

@Override
public void postProcessAction(boolean hasFailed) throws Exception {
public void postProcessAction(final boolean hasFailed) throws Exception {
if (!isKeepFilesInGcs) {
deleteGcsFiles();
}
}

@Override
protected void uploadData(Consumer<AirbyteMessage> outputRecordCollector, AirbyteMessage lastStateMessage) throws Exception {
protected void uploadData(final Consumer<AirbyteMessage> outputRecordCollector, final AirbyteMessage lastStateMessage) throws Exception {
LOGGER.info("Uploading data to the tmp table {}.", tmpTable.getTable());
uploadDataFromFileToTmpTable();
super.uploadData(outputRecordCollector, lastStateMessage);
Expand All @@ -68,7 +67,7 @@ protected void uploadDataFromFileToTmpTable() throws Exception {
LOGGER.info(String.format("Started copying data from %s GCS " + getFileTypeName() + " file to %s tmp BigQuery table with schema: \n %s",
fileLocation, tmpTable, recordFormatter.getBigQuerySchema()));

LoadJobConfiguration configuration = getLoadConfiguration();
final LoadJobConfiguration configuration = getLoadConfiguration();

// For more information on Job see:
// https://googleapis.dev/java/google-cloud-clients/latest/index.html?com/google/cloud/bigquery/package-summary.html
Expand Down Expand Up @@ -96,7 +95,7 @@ private String getFileTypeName() {
private void deleteGcsFiles() {
LOGGER.info("Deleting file {}", writer.getFileLocation());
final GcsDestinationConfig gcsDestinationConfig = this.gcsDestinationConfig;
final AmazonS3 s3Client = GcsS3Helper.getGcsS3Client(gcsDestinationConfig);
final AmazonS3 s3Client = gcsDestinationConfig.getS3Client();

final String gcsBucketName = gcsDestinationConfig.getBucketName();
final String gcs_bucket_path = gcsDestinationConfig.getBucketPath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.airbyte.integrations.destination.bigquery.uploader.config.UploaderConfig;
import io.airbyte.integrations.destination.bigquery.writer.BigQueryTableWriter;
import io.airbyte.integrations.destination.gcs.GcsDestinationConfig;
import io.airbyte.integrations.destination.gcs.GcsS3Helper;
import io.airbyte.integrations.destination.gcs.avro.GcsAvroWriter;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.io.IOException;
Expand Down Expand Up @@ -120,7 +119,7 @@ private static GcsAvroWriter initGcsWriter(
throws IOException {
final Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis());

final AmazonS3 s3Client = GcsS3Helper.getGcsS3Client(gcsDestinationConfig);
final AmazonS3 s3Client = gcsDestinationConfig.getS3Client();
return new GcsAvroWriter(
gcsDestinationConfig,
s3Client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.integrations.destination.gcs.GcsDestinationConfig;
import io.airbyte.integrations.destination.gcs.GcsS3Helper;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.Field;
Expand Down Expand Up @@ -108,7 +107,7 @@ void setup(final TestInfo info) throws IOException {

final GcsDestinationConfig gcsDestinationConfig = GcsDestinationConfig
.getGcsDestinationConfig(BigQueryUtils.getGcsJsonNodeConfig(config));
this.s3Client = GcsS3Helper.getGcsS3Client(gcsDestinationConfig);
this.s3Client = gcsDestinationConfig.getS3Client();

tornDown = false;
addShutdownHook();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,18 @@ public static DatabricksDestinationConfig get(final JsonNode config) {
}

public static S3DestinationConfig getDataSource(final JsonNode dataSource) {
return new S3DestinationConfig(
"",
return S3DestinationConfig.create(
dataSource.get("s3_bucket_name").asText(),
dataSource.get("s3_bucket_path").asText(),
dataSource.get("s3_bucket_region").asText(),
dataSource.get("s3_access_key_id").asText(),
dataSource.get("s3_secret_access_key").asText(),
getDefaultParquetConfig());
dataSource.get("s3_bucket_region").asText())
.withFormatConfig(new S3ParquetFormatConfig(new ObjectMapper().createObjectNode()))
.get();
}

public String getDatabricksServerHostname() {
return databricksServerHostname;
}

private static S3ParquetFormatConfig getDefaultParquetConfig() {
return new S3ParquetFormatConfig(new ObjectMapper().createObjectNode());
}

public String getDatabricksHttpPath() {
return databricksHttpPath;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,15 +206,10 @@ public String getCurrentFile() {
* creates an {@link S3DestinationConfig} whose bucket path is <bucket-path>/<staging-folder>.
*/
static S3DestinationConfig getStagingS3DestinationConfig(final S3DestinationConfig config, final String stagingFolder) {
return new S3DestinationConfig(
config.getEndpoint(),
config.getBucketName(),
String.join("/", config.getBucketPath(), stagingFolder),
config.getBucketRegion(),
config.getAccessKeyId(),
config.getSecretAccessKey(),
// use default parquet format config
new S3ParquetFormatConfig(MAPPER.createObjectNode()));
return S3DestinationConfig.create(config)
.withBucketRegion(String.join("/", config.getBucketPath(), stagingFolder))
.withFormatConfig(new S3ParquetFormatConfig(MAPPER.createObjectNode()))
.get();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class DatabricksStreamCopierTest {
@Test
public void testGetStagingS3DestinationConfig() {
final String bucketPath = UUID.randomUUID().toString();
final S3DestinationConfig config = new S3DestinationConfig("", "", bucketPath, "", "", "", null);
final S3DestinationConfig config = S3DestinationConfig.create("", bucketPath, "").get();
final String stagingFolder = UUID.randomUUID().toString();
final S3DestinationConfig stagingConfig = DatabricksStreamCopier.getStagingS3DestinationConfig(config, stagingFolder);
assertEquals(String.format("%s/%s", bucketPath, stagingFolder), stagingConfig.getBucketPath());
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.destination.gcs.writer.GcsWriterFactory;
import io.airbyte.integrations.destination.gcs.writer.ProductionWriterFactory;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.integrations.destination.s3.S3ConsumerFactory;
import io.airbyte.integrations.destination.s3.S3Destination;
import io.airbyte.integrations.destination.s3.SerializedBufferFactory;
import io.airbyte.integrations.destination.s3.util.S3NameTransformer;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
Expand All @@ -27,6 +30,12 @@ public class GcsDestination extends BaseConnector implements Destination {
public static final String EXPECTED_ROLES = "storage.multipartUploads.abort, storage.multipartUploads.create, "
+ "storage.objects.create, storage.objects.delete, storage.objects.get, storage.objects.list";

private final NamingConventionTransformer nameTransformer;

public GcsDestination() {
this.nameTransformer = new S3NameTransformer();
}

public static void main(final String[] args) throws Exception {
new IntegrationRunner(new GcsDestination()).run(args);
}
Expand All @@ -35,7 +44,7 @@ public static void main(final String[] args) throws Exception {
public AirbyteConnectionStatus check(final JsonNode config) {
try {
final GcsDestinationConfig destinationConfig = GcsDestinationConfig.getGcsDestinationConfig(config);
final AmazonS3 s3Client = GcsS3Helper.getGcsS3Client(destinationConfig);
final AmazonS3 s3Client = destinationConfig.getS3Client();

// Test single upload (for small files) permissions
S3Destination.testSingleUpload(s3Client, destinationConfig.getBucketName());
Expand All @@ -59,9 +68,14 @@ public AirbyteConnectionStatus check(final JsonNode config) {
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog configuredCatalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
final GcsWriterFactory formatterFactory = new ProductionWriterFactory();
return new GcsConsumer(GcsDestinationConfig.getGcsDestinationConfig(config), configuredCatalog,
formatterFactory, outputRecordCollector);
final GcsDestinationConfig gcsConfig = GcsDestinationConfig.getGcsDestinationConfig(config);
return new S3ConsumerFactory().create(
outputRecordCollector,
new GcsStorageOperations(nameTransformer, gcsConfig.getS3Client(), gcsConfig),
nameTransformer,
SerializedBufferFactory.getCreateFunction(gcsConfig, FileBuffer::new),
gcsConfig,
configuredCatalog);
}

}
Loading